/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.util.ArrayList;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.streaming.util.CollectorOutput;
import org.apache.flink.streaming.util.MockStreamTask;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class StreamSourceOperatorWatermarksTest {
    @Test
    public void testEmitMaxWatermarkForFiniteSource() throws Exception {
        StreamSource sourceOperator = new StreamSource((SourceFunction)new FiniteSource());
        StreamTaskTestHarness testHarness = StreamSourceOperatorWatermarksTest.setupSourceStreamTask(sourceOperator, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.invoke();
        testHarness.waitForTaskCompletion();
        Assert.assertEquals((long)1L, (long)testHarness.getOutput().size());
        Assert.assertEquals((Object)Watermark.MAX_WATERMARK, (Object)testHarness.getOutput().peek());
    }

    @Test
    public void testMaxWatermarkIsForwardedLastForFiniteSource() throws Exception {
        StreamSource sourceOperator = new StreamSource((SourceFunction)new FiniteSource(true));
        StreamTaskTestHarness testHarness = StreamSourceOperatorWatermarksTest.setupSourceStreamTask(sourceOperator, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.invoke();
        testHarness.waitForTaskCompletion();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)"Hello"));
        expectedOutput.add(Watermark.MAX_WATERMARK);
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
    }

    @Test
    public void testNoMaxWatermarkOnImmediateCancel() throws Exception {
        StreamTaskTestHarness testHarness;
        block2: {
            StreamSource sourceOperator = new StreamSource(new InfiniteSource());
            testHarness = StreamSourceOperatorWatermarksTest.setupSourceStreamTask(sourceOperator, BasicTypeInfo.STRING_TYPE_INFO, true);
            testHarness.invoke();
            try {
                testHarness.waitForTaskCompletion();
                Assert.fail((String)"should throw an exception");
            }
            catch (Throwable t) {
                if (ExceptionUtils.findThrowable((Throwable)t, CancelTaskException.class).isPresent()) break block2;
                throw t;
            }
        }
        Assert.assertTrue((boolean)testHarness.getOutput().isEmpty());
    }

    @Test
    public void testNoMaxWatermarkOnAsyncCancel() throws Exception {
        StreamTaskTestHarness testHarness;
        block2: {
            StreamSource sourceOperator = new StreamSource(new InfiniteSource());
            testHarness = StreamSourceOperatorWatermarksTest.setupSourceStreamTask(sourceOperator, BasicTypeInfo.STRING_TYPE_INFO);
            testHarness.invoke();
            testHarness.waitForTaskRunning();
            Thread.sleep(200L);
            testHarness.getTask().cancel();
            try {
                testHarness.waitForTaskCompletion();
            }
            catch (Throwable t) {
                if (ExceptionUtils.findThrowable((Throwable)t, CancelTaskException.class).isPresent()) break block2;
                throw t;
            }
        }
        Assert.assertTrue((boolean)testHarness.getOutput().isEmpty());
    }

    @Test
    public void testAutomaticWatermarkContext() throws Exception {
        StreamSource operator = new StreamSource(new InfiniteSource());
        long watermarkInterval = 10L;
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        processingTimeService.setCurrentTime(0L);
        MockStreamTask task = StreamSourceOperatorWatermarksTest.setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, (TimerService)processingTimeService);
        ArrayList<StreamElement> output = new ArrayList<StreamElement>();
        StreamSourceContexts.getSourceContext((TimeCharacteristic)TimeCharacteristic.IngestionTime, (ProcessingTimeService)processingTimeService, (Object)task.getCheckpointLock(), (StreamStatusMaintainer)operator.getContainingTask().getStreamStatusMaintainer(), new CollectorOutput(output), (long)operator.getExecutionConfig().getAutoWatermarkInterval(), (long)-1L);
        for (long i = 1L; i < 100L; i += watermarkInterval) {
            processingTimeService.setCurrentTime(i);
        }
        Assert.assertEquals((long)9L, (long)output.size());
        long nextWatermark = 0L;
        for (StreamElement el : output) {
            Watermark wm = (Watermark)el;
            Assert.assertEquals((long)wm.getTimestamp(), (long)(nextWatermark += watermarkInterval));
        }
    }

    private static <T> MockStreamTask setupSourceOperator(StreamSource<T, ?> operator, TimeCharacteristic timeChar, long watermarkInterval, TimerService timeProvider) throws Exception {
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setAutoWatermarkInterval(watermarkInterval);
        StreamConfig cfg = new StreamConfig(new Configuration());
        cfg.setStateBackend((StateBackend)new MemoryStateBackend());
        cfg.setTimeCharacteristic(timeChar);
        cfg.setOperatorID(new OperatorID());
        DummyEnvironment env = new DummyEnvironment("MockTwoInputTask", 1, 0);
        StreamStatusMaintainer streamStatusMaintainer = (StreamStatusMaintainer)Mockito.mock(StreamStatusMaintainer.class);
        Mockito.when((Object)streamStatusMaintainer.getStreamStatus()).thenReturn((Object)StreamStatus.ACTIVE);
        MockStreamTask mockTask = new MockStreamTaskBuilder((Environment)env).setConfig(cfg).setExecutionConfig(executionConfig).setStreamStatusMaintainer(streamStatusMaintainer).setTimerService(timeProvider).build();
        operator.setup((StreamTask)mockTask, cfg, (Output)Mockito.mock(Output.class));
        return mockTask;
    }

    private static <T> StreamTaskTestHarness<T> setupSourceStreamTask(StreamSource<T, ?> sourceOperator, TypeInformation<T> outputType) {
        return StreamSourceOperatorWatermarksTest.setupSourceStreamTask(sourceOperator, outputType, false);
    }

    private static <T> StreamTaskTestHarness<T> setupSourceStreamTask(StreamSource<T, ?> sourceOperator, TypeInformation<T> outputType, boolean cancelImmediatelyAfterCreation) {
        StreamTaskTestHarness<T> testHarness = new StreamTaskTestHarness<T>(env -> {
            SourceStreamTask sourceTask = new SourceStreamTask(env);
            if (cancelImmediatelyAfterCreation) {
                try {
                    sourceTask.cancel();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            return sourceTask;
        }, outputType);
        testHarness.setupOutputForSingletonOperatorChain();
        StreamConfig streamConfig = testHarness.getStreamConfig();
        streamConfig.setStreamOperator(sourceOperator);
        streamConfig.setOperatorID(new OperatorID());
        streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
        return testHarness;
    }

    private static final class InfiniteSource<T>
    implements SourceFunction<T> {
        private volatile boolean running = true;

        private InfiniteSource() {
        }

        public void run(SourceFunction.SourceContext<T> ctx) throws Exception {
            while (this.running) {
                Thread.sleep(20L);
            }
        }

        public void cancel() {
            this.running = false;
        }
    }

    private static final class FiniteSource
    extends RichSourceFunction<String> {
        private volatile transient boolean canceled = false;
        private transient SourceFunction.SourceContext<String> context;
        private final boolean outputingARecordWhenClosing;

        public FiniteSource() {
            this(false);
        }

        public FiniteSource(boolean outputingARecordWhenClosing) {
            this.outputingARecordWhenClosing = outputingARecordWhenClosing;
        }

        public void run(SourceFunction.SourceContext<String> ctx) {
            this.context = ctx;
        }

        public void close() {
            if (!this.canceled && this.outputingARecordWhenClosing) {
                this.context.collect((Object)"Hello");
            }
        }

        public void cancel() {
            this.canceled = true;
        }
    }
}

