/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StreamTaskTimerTest
extends TestLogger {
    private StreamTaskTestHarness<?> testHarness;
    private ProcessingTimeService timeService;

    @Before
    public void setup() throws Exception {
        this.testHarness = this.startTestHarness();
        StreamTask<?, ?> task = this.testHarness.getTask();
        this.timeService = task.getProcessingTimeServiceFactory().createProcessingTimeService(task.getMailboxExecutorFactory().createExecutor(this.testHarness.getStreamConfig().getChainIndex()));
    }

    @After
    public void teardown() throws Exception {
        this.stopTestHarness(this.testHarness, 4000L);
    }

    @Test
    public void testOpenCloseAndTimestamps() {
        this.timeService.registerTimer(System.currentTimeMillis(), timestamp -> {});
        Assert.assertEquals((long)1L, (long)StreamTask.TRIGGER_THREAD_GROUP.activeCount());
    }

    @Test
    public void testErrorReporting() throws Exception {
        AtomicReference errorRef = new AtomicReference();
        OneShotLatch latch = new OneShotLatch();
        this.testHarness.getEnvironment().setExternalExceptionHandler(ex -> {
            errorRef.set(ex);
            latch.trigger();
        });
        ProcessingTimeCallback callback = timestamp -> {
            throw new Exception("Exception in Timer");
        };
        this.timeService.registerTimer(System.currentTimeMillis(), callback);
        latch.await();
        Assert.assertThat(errorRef.get(), (Matcher)Matchers.instanceOf(Exception.class));
    }

    @Test
    public void checkScheduledTimestamps() throws Exception {
        AtomicReference errorRef = new AtomicReference();
        long t1 = System.currentTimeMillis();
        long t2 = System.currentTimeMillis() - 200L;
        long t3 = System.currentTimeMillis() + 100L;
        long t4 = System.currentTimeMillis() + 200L;
        this.timeService.registerTimer(t1, (ProcessingTimeCallback)new ValidatingProcessingTimeCallback(errorRef, t1, 0));
        this.timeService.registerTimer(t2, (ProcessingTimeCallback)new ValidatingProcessingTimeCallback(errorRef, t2, 1));
        this.timeService.registerTimer(t3, (ProcessingTimeCallback)new ValidatingProcessingTimeCallback(errorRef, t3, 2));
        this.timeService.registerTimer(t4, (ProcessingTimeCallback)new ValidatingProcessingTimeCallback(errorRef, t4, 3));
        long deadline = System.currentTimeMillis() + 20000L;
        while (errorRef.get() == null && ValidatingProcessingTimeCallback.numInSequence < 4 && System.currentTimeMillis() < deadline) {
            Thread.sleep(100L);
        }
        StreamTaskTimerTest.verifyNoException((Throwable)errorRef.get());
        Assert.assertEquals((long)4L, (long)ValidatingProcessingTimeCallback.numInSequence);
    }

    private static void verifyNoException(@Nullable Throwable exception) {
        if (exception != null) {
            exception.printStackTrace();
            Assert.fail((String)exception.getMessage());
        }
    }

    private StreamTaskTestHarness<?> startTestHarness() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        streamConfig.setChainIndex(0);
        streamConfig.setStreamOperator((StreamOperator)new StreamMap(new DummyMapFunction()));
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        return testHarness;
    }

    private void stopTestHarness(StreamTaskTestHarness<?> testHarness, long timeout) throws Exception {
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        long deadline = System.currentTimeMillis() + timeout;
        while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((String)"Trigger timer thread did not properly shut down", (long)0L, (long)StreamTask.TRIGGER_THREAD_GROUP.activeCount());
    }

    public static class DummyMapFunction<T>
    implements MapFunction<T, T> {
        public T map(T value) {
            return value;
        }
    }

    private static class ValidatingProcessingTimeCallback
    implements ProcessingTimeCallback {
        static int numInSequence;
        private final AtomicReference<Throwable> errorRef;
        private final long expectedTimestamp;
        private final int expectedInSequence;

        private ValidatingProcessingTimeCallback(AtomicReference<Throwable> errorRef, long expectedTimestamp, int expectedInSequence) {
            this.errorRef = errorRef;
            this.expectedTimestamp = expectedTimestamp;
            this.expectedInSequence = expectedInSequence;
        }

        public void onProcessingTime(long timestamp) {
            try {
                Assert.assertEquals((long)this.expectedTimestamp, (long)timestamp);
                Assert.assertEquals((long)this.expectedInSequence, (long)numInSequence);
                ++numInSequence;
            }
            catch (Throwable t) {
                this.errorRef.compareAndSet(null, t);
            }
        }
    }
}

