/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.MockStreamStatusMaintainer;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.CollectorOutput;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class StreamSourceContextIdleDetectionTests {
    private TestMethod testMethod;

    public StreamSourceContextIdleDetectionTests(TestMethod testMethod) {
        this.testMethod = testMethod;
    }

    @Test
    public void testManualWatermarkContext() throws Exception {
        long idleTimeout = 100L;
        long initialTime = 0L;
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        processingTimeService.setCurrentTime(initialTime);
        ArrayList<StreamElement> output = new ArrayList<StreamElement>();
        MockStreamStatusMaintainer mockStreamStatusMaintainer = new MockStreamStatusMaintainer();
        SourceFunction.SourceContext context = StreamSourceContexts.getSourceContext((TimeCharacteristic)TimeCharacteristic.EventTime, (ProcessingTimeService)processingTimeService, (Object)new Object(), (StreamStatusMaintainer)mockStreamStatusMaintainer, new CollectorOutput(output), (long)0L, (long)idleTimeout);
        processingTimeService.setCurrentTime(initialTime + idleTimeout);
        Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isIdle());
        processingTimeService.setCurrentTime(initialTime + 2L * idleTimeout);
        processingTimeService.setCurrentTime(initialTime + 3L * idleTimeout);
        Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isIdle());
        processingTimeService.setCurrentTime(initialTime + 3L * idleTimeout + idleTimeout / 10L);
        switch (this.testMethod) {
            case COLLECT: {
                context.collect((Object)"msg");
                break;
            }
            case COLLECT_WITH_TIMESTAMP: {
                context.collectWithTimestamp((Object)"msg", processingTimeService.getCurrentProcessingTime());
                break;
            }
            case EMIT_WATERMARK: {
                context.emitWatermark(new Watermark(processingTimeService.getCurrentProcessingTime()));
            }
        }
        Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isActive());
        processingTimeService.setCurrentTime(initialTime + 3L * idleTimeout + 2L * idleTimeout / 10L);
        switch (this.testMethod) {
            case COLLECT: {
                context.collect((Object)"msg");
                break;
            }
            case COLLECT_WITH_TIMESTAMP: {
                context.collectWithTimestamp((Object)"msg", processingTimeService.getCurrentProcessingTime());
                break;
            }
            case EMIT_WATERMARK: {
                context.emitWatermark(new Watermark(processingTimeService.getCurrentProcessingTime()));
            }
        }
        Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isActive());
        processingTimeService.setCurrentTime(initialTime + 4L * idleTimeout + idleTimeout / 10L);
        Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isActive());
        processingTimeService.setCurrentTime(initialTime + 5L * idleTimeout + idleTimeout / 10L);
        Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isIdle());
    }

    @Test
    public void testAutomaticWatermarkContext() throws Exception {
        long watermarkInterval = 40L;
        long idleTimeout = 100L;
        long initialTime = 20L;
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        processingTimeService.setCurrentTime(initialTime);
        MockStreamStatusMaintainer mockStreamStatusMaintainer = new MockStreamStatusMaintainer();
        ArrayList<StreamElement> output = new ArrayList<StreamElement>();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        SourceFunction.SourceContext context = StreamSourceContexts.getSourceContext((TimeCharacteristic)TimeCharacteristic.IngestionTime, (ProcessingTimeService)processingTimeService, (Object)new Object(), (StreamStatusMaintainer)mockStreamStatusMaintainer, new CollectorOutput(output), (long)watermarkInterval, (long)idleTimeout);
        processingTimeService.setCurrentTime(initialTime + watermarkInterval);
        expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - processingTimeService.getCurrentProcessingTime() % watermarkInterval));
        processingTimeService.setCurrentTime(initialTime + 2L * watermarkInterval);
        expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - processingTimeService.getCurrentProcessingTime() % watermarkInterval));
        processingTimeService.setCurrentTime(initialTime + idleTimeout);
        Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isIdle());
        Assert.assertEquals(expectedOutput, output);
        processingTimeService.setCurrentTime(initialTime + 3L * watermarkInterval);
        processingTimeService.setCurrentTime(initialTime + 4L * watermarkInterval);
        processingTimeService.setCurrentTime(initialTime + 2L * idleTimeout);
        processingTimeService.setCurrentTime(initialTime + 6L * watermarkInterval);
        processingTimeService.setCurrentTime(initialTime + 7L * watermarkInterval);
        processingTimeService.setCurrentTime(initialTime + 3L * idleTimeout);
        Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isIdle());
        Assert.assertEquals(expectedOutput, output);
        processingTimeService.setCurrentTime(initialTime + 3L * idleTimeout + idleTimeout / 10L);
        switch (this.testMethod) {
            case COLLECT: {
                context.collect((Object)"msg");
                expectedOutput.add(new StreamRecord((Object)"msg", processingTimeService.getCurrentProcessingTime()));
                expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - processingTimeService.getCurrentProcessingTime() % watermarkInterval));
                Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isActive());
                Assert.assertEquals(expectedOutput, output);
                break;
            }
            case COLLECT_WITH_TIMESTAMP: {
                context.collectWithTimestamp((Object)"msg", processingTimeService.getCurrentProcessingTime());
                expectedOutput.add(new StreamRecord((Object)"msg", processingTimeService.getCurrentProcessingTime()));
                expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - processingTimeService.getCurrentProcessingTime() % watermarkInterval));
                Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isActive());
                Assert.assertEquals(expectedOutput, output);
                break;
            }
            case EMIT_WATERMARK: {
                context.emitWatermark(new Watermark(processingTimeService.getCurrentProcessingTime()));
                Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isIdle());
                Assert.assertEquals(expectedOutput, output);
            }
        }
        processingTimeService.setCurrentTime(initialTime + 8L * watermarkInterval);
        processingTimeService.setCurrentTime(initialTime + 3L * idleTimeout + 3L * idleTimeout / 10L);
        switch (this.testMethod) {
            case COLLECT: {
                context.collect((Object)"msg");
                expectedOutput.add(new StreamRecord((Object)"msg", processingTimeService.getCurrentProcessingTime()));
                Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isActive());
                Assert.assertEquals(expectedOutput, output);
                break;
            }
            case COLLECT_WITH_TIMESTAMP: {
                context.collectWithTimestamp((Object)"msg", processingTimeService.getCurrentProcessingTime());
                expectedOutput.add(new StreamRecord((Object)"msg", processingTimeService.getCurrentProcessingTime()));
                Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isActive());
                Assert.assertEquals(expectedOutput, output);
                break;
            }
            case EMIT_WATERMARK: {
                context.emitWatermark(new Watermark(processingTimeService.getCurrentProcessingTime()));
                Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isIdle());
                Assert.assertEquals(expectedOutput, output);
            }
        }
        processingTimeService.setCurrentTime(initialTime + 9L * watermarkInterval);
        switch (this.testMethod) {
            case COLLECT: 
            case COLLECT_WITH_TIMESTAMP: {
                expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - processingTimeService.getCurrentProcessingTime() % watermarkInterval));
                Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isActive());
                Assert.assertEquals(expectedOutput, output);
                break;
            }
            case EMIT_WATERMARK: {
                Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isIdle());
                Assert.assertEquals(expectedOutput, output);
            }
        }
        processingTimeService.setCurrentTime(initialTime + 10L * watermarkInterval);
        switch (this.testMethod) {
            case COLLECT: 
            case COLLECT_WITH_TIMESTAMP: {
                expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - processingTimeService.getCurrentProcessingTime() % watermarkInterval));
                Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isActive());
                Assert.assertEquals(expectedOutput, output);
                break;
            }
            case EMIT_WATERMARK: {
                Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isIdle());
                Assert.assertEquals(expectedOutput, output);
            }
        }
        processingTimeService.setCurrentTime(initialTime + 4L * idleTimeout + idleTimeout / 10L);
        switch (this.testMethod) {
            case COLLECT: 
            case COLLECT_WITH_TIMESTAMP: {
                Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isActive());
                Assert.assertEquals(expectedOutput, output);
                break;
            }
            case EMIT_WATERMARK: {
                Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isIdle());
                Assert.assertEquals(expectedOutput, output);
            }
        }
        processingTimeService.setCurrentTime(initialTime + 11L * watermarkInterval);
        Assert.assertTrue((boolean)mockStreamStatusMaintainer.getStreamStatus().isIdle());
        Assert.assertEquals(expectedOutput, output);
    }

    @Parameterized.Parameters(name="TestMethod = {0}")
    public static Collection<TestMethod[]> timeCharacteristic() {
        return Arrays.asList(new TestMethod[][]{{TestMethod.COLLECT}, {TestMethod.COLLECT_WITH_TIMESTAMP}, {TestMethod.EMIT_WATERMARK}});
    }

    private static enum TestMethod {
        COLLECT,
        COLLECT_WITH_TIMESTAMP,
        EMIT_WATERMARK;

    }
}

