/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTest;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.util.TestBoundedMultipleInputOperator;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsMapContaining;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class MultipleInputStreamTaskTest {
    private static final List<String> LIFE_CYCLE_EVENTS = new ArrayList<String>();

    @Before
    public void setUp() {
        LIFE_CYCLE_EVENTS.clear();
    }

    @Test
    public void testBasicProcessing() throws Exception {
        try (StreamTaskMailboxTestHarness<String> testHarness = MultipleInputStreamTaskTest.buildTestHarness();){
            long initialTime = 0L;
            ArrayDeque<StreamRecord> expectedOutput = new ArrayDeque<StreamRecord>();
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, 42, 43);
            expectedOutput.add(new StreamRecord((Object)"42", Long.MIN_VALUE));
            expectedOutput.add(new StreamRecord((Object)"43", Long.MIN_VALUE));
            testHarness.processElement(new StreamRecord((Object)"Hello", initialTime + 1L), 0);
            expectedOutput.add(new StreamRecord((Object)"Hello", initialTime + 1L));
            testHarness.processElement(new StreamRecord((Object)42.44, initialTime + 3L), 1);
            expectedOutput.add(new StreamRecord((Object)"42.44", initialTime + 3L));
            testHarness.endInput();
            testHarness.waitForTaskCompletion();
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.containsInAnyOrder((Object[])expectedOutput.toArray()));
        }
    }

    @Test
    public void testCheckpointBarriers() throws Exception {
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO, 2).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).build();){
            ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
            long initialTime = 0L;
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
            testHarness.processElement(new StreamRecord((Object)"Ciao-0-0", initialTime), 0, 1);
            expectedOutput.add(new StreamRecord((Object)"Ciao-0-0", initialTime));
            testHarness.processElement(new StreamRecord((Object)11, initialTime), 1, 1);
            testHarness.processElement(new StreamRecord((Object)1.0, initialTime), 2, 0);
            expectedOutput.add(new StreamRecord((Object)"11", initialTime));
            expectedOutput.add(new StreamRecord((Object)"1.0", initialTime));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 0);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 1);
            expectedOutput.add(new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
        }
    }

    @Test
    public void testOvertakingCheckpointBarriers() throws Exception {
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO, 2).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).build();){
            ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
            long initialTime = 0L;
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
            testHarness.processElement(new StreamRecord((Object)"Witam-0-1", initialTime), 0, 1);
            testHarness.processElement(new StreamRecord((Object)42, initialTime), 1, 1);
            testHarness.processElement(new StreamRecord((Object)1.0, initialTime), 2, 1);
            expectedOutput.add(new StreamRecord((Object)"Witam-0-1", initialTime));
            expectedOutput.add(new StreamRecord((Object)"42", initialTime));
            expectedOutput.add(new StreamRecord((Object)"1.0", initialTime));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 0);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 0);
            expectedOutput.add(new CancelCheckpointMarker(0L));
            expectedOutput.add(new CheckpointBarrier(1L, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 0, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 0);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 1, 1);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 0);
            testHarness.processEvent((AbstractEvent)new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation()), 2, 1);
            testHarness.waitForTaskCompletion();
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
        }
    }

    @Test
    public void testMetrics() throws Exception {
        final HashMap operatorMetrics = new HashMap();
        UnregisteredMetricGroups.UnregisteredTaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup(){

            public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
                OperatorMetricGroup operatorMetricGroup = new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, (TaskMetricGroup)this, operatorID, name);
                operatorMetrics.put(name, operatorMetricGroup);
                return operatorMetricGroup;
            }
        };
        String mainOperatorName = "MainOperator";
        try (StreamTaskMailboxTestHarness<String> testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(config -> config.enableObjectReuse()).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addSourceInput(new SourceOperatorFactory((Source)new LifeCycleTrackingMockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks())).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).setupOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).name(mainOperatorName).chain(new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).chain(new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).chain(new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish().setTaskMetricGroup((TaskMetricGroup)taskMetricGroup).build();){
            int x;
            Assert.assertTrue((boolean)operatorMetrics.containsKey(mainOperatorName));
            OperatorMetricGroup mainOperatorMetrics = (OperatorMetricGroup)operatorMetrics.get(mainOperatorName);
            Counter numRecordsInCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsInCounter();
            Counter numRecordsOutCounter = taskMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
            int numRecords1 = 5;
            int numRecords2 = 3;
            int numRecords3 = 2;
            for (x = 0; x < numRecords2; ++x) {
                MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, 42);
            }
            for (x = 0; x < numRecords1; ++x) {
                testHarness.processElement(new StreamRecord((Object)"hello"), 0, 0);
            }
            for (x = 0; x < numRecords3; ++x) {
                testHarness.processElement(new StreamRecord((Object)"hello"), 1, 0);
            }
            int networkRecordsIn = numRecords1 + numRecords3;
            int mainOperatorRecordsIn = networkRecordsIn + numRecords2;
            int totalRecordsOut = mainOperatorRecordsIn * 2 * 2 * 2;
            Assert.assertEquals((long)mainOperatorRecordsIn, (long)mainOperatorMetrics.getIOMetricGroup().getNumRecordsInCounter().getCount());
            Assert.assertEquals((long)networkRecordsIn, (long)numRecordsInCounter.getCount());
            Assert.assertEquals((long)totalRecordsOut, (long)numRecordsOutCounter.getCount());
            testHarness.waitForTaskCompletion();
        }
    }

    @Test
    public void testLifeCycleOrder() throws Exception {
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(config -> config.enableObjectReuse()).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addSourceInput(new SourceOperatorFactory((Source)new LifeCycleTrackingMockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks())).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO).setupOperatorChain((StreamOperatorFactory<?>)new LifeCycleTrackingMapToStringMultipleInputOperatorFactory()).chain(new LifeCycleTrackingMap(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish().build();){
            testHarness.waitForTaskCompletion();
        }
        MatcherAssert.assertThat(LIFE_CYCLE_EVENTS, (Matcher)Matchers.contains((Object[])new String[]{"LifeCycleTrackingMap#open", "MultipleInputOperator#open", "SourceReader#start", "MultipleInputOperator#endInput", "MultipleInputOperator#endInput", "MultipleInputOperator#endInput", "SourceReader#close", "MultipleInputOperator#close", "LifeCycleTrackingMap#endInput", "LifeCycleTrackingMap#close"}));
    }

    @Test
    public void testInputFairness() throws Exception {
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).build();){
            ArrayDeque<StreamRecord> expectedOutput = new ArrayDeque<StreamRecord>();
            testHarness.setAutoProcess(false);
            testHarness.processElement(new StreamRecord((Object)"0"), 0);
            testHarness.processElement(new StreamRecord((Object)"1"), 0);
            testHarness.processElement(new StreamRecord((Object)"2"), 0);
            testHarness.processElement(new StreamRecord((Object)"3"), 0);
            testHarness.processElement(new StreamRecord((Object)"0"), 2);
            testHarness.processElement(new StreamRecord((Object)"1"), 2);
            testHarness.processAll();
            expectedOutput.add(new StreamRecord((Object)"0"));
            expectedOutput.add(new StreamRecord((Object)"0"));
            expectedOutput.add(new StreamRecord((Object)"1"));
            expectedOutput.add(new StreamRecord((Object)"1"));
            expectedOutput.add(new StreamRecord((Object)"2"));
            expectedOutput.add(new StreamRecord((Object)"3"));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
        }
    }

    @Test
    public void testWatermark() throws Exception {
        try (StreamTaskMailboxTestHarness<String> testHarness = MultipleInputStreamTaskTest.buildWatermarkTestHarness(2, false);){
            ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
            int initialTime = 0;
            testHarness.processElement(new Watermark((long)initialTime), 0, 0);
            testHarness.processElement(new Watermark((long)initialTime), 0, 1);
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, initialTime);
            expectedOutput.add(new StreamRecord((Object)("" + initialTime), Long.MIN_VALUE));
            testHarness.processElement(new Watermark((long)initialTime), 1, 0);
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.processElement(new Watermark((long)initialTime), 1, 1);
            expectedOutput.add(new Watermark((long)initialTime));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.processElement(new StreamRecord((Object)"Hello", (long)initialTime), 0, 0);
            testHarness.processElement(new StreamRecord((Object)42.0, (long)initialTime), 1, 1);
            expectedOutput.add(new StreamRecord((Object)"Hello", (long)initialTime));
            expectedOutput.add(new StreamRecord((Object)"42.0", (long)initialTime));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.processElement(new Watermark((long)(initialTime + 4)), 0, 0);
            testHarness.processElement(new Watermark((long)(initialTime + 3)), 0, 1);
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, initialTime + 3);
            expectedOutput.add(new StreamRecord((Object)("" + (initialTime + 3)), Long.MIN_VALUE));
            testHarness.processElement(new Watermark((long)(initialTime + 3)), 1, 0);
            testHarness.processElement(new Watermark((long)(initialTime + 2)), 1, 1);
            expectedOutput.add(new Watermark((long)(initialTime + 2)));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.processElement(new Watermark((long)(initialTime + 4)), 1, 1);
            expectedOutput.add(new Watermark((long)(initialTime + 3)));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.processElement(new Watermark((long)(initialTime + 4)), 0, 1);
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, initialTime + 4);
            expectedOutput.add(new StreamRecord((Object)("" + (initialTime + 4)), Long.MIN_VALUE));
            testHarness.processElement(new Watermark((long)(initialTime + 4)), 1, 0);
            expectedOutput.add(new Watermark((long)(initialTime + 4)));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            List resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
            Assert.assertEquals((long)5L, (long)resultElements.size());
        }
    }

    @Test
    public void testWatermarkAndStreamStatusForwarding() throws Exception {
        try (StreamTaskMailboxTestHarness<String> testHarness = MultipleInputStreamTaskTest.buildWatermarkTestHarness(2, true);){
            ArrayDeque<Object> expectedOutput = new ArrayDeque<Object>();
            int initialTime = 0;
            testHarness.processElement(StreamStatus.IDLE, 0, 1);
            testHarness.processElement(new Watermark((long)(initialTime + 6)), 0, 0);
            testHarness.processElement(new Watermark((long)(initialTime + 5)), 1, 1);
            testHarness.processElement(StreamStatus.IDLE, 1, 0);
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, initialTime + 5);
            testHarness.processAll();
            expectedOutput.add(new StreamRecord((Object)("" + (initialTime + 5)), Long.MIN_VALUE));
            expectedOutput.add(new Watermark((long)(initialTime + 5)));
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.processElement(StreamStatus.IDLE, 0, 0);
            testHarness.processElement(StreamStatus.IDLE, 1, 1);
            expectedOutput.add(StreamStatus.IDLE);
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, initialTime + 10);
            expectedOutput.add(new StreamRecord((Object)("" + (initialTime + 10)), Long.MIN_VALUE));
            expectedOutput.add(StreamStatus.ACTIVE);
            expectedOutput.add(StreamStatus.IDLE);
            testHarness.processAll();
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.processElement(StreamStatus.ACTIVE, 0, 1);
            expectedOutput.add(StreamStatus.ACTIVE);
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
        }
    }

    @Test
    public void testAdvanceToEndOfEventTime() throws Exception {
        try (StreamTaskMailboxTestHarness<String> testHarness = MultipleInputStreamTaskTest.buildWatermarkTestHarness(2, false);){
            testHarness.processElement(Watermark.MAX_WATERMARK, 0, 0);
            testHarness.processElement(Watermark.MAX_WATERMARK, 0, 1);
            testHarness.getStreamTask().advanceToEndOfEventTime();
            testHarness.processElement(Watermark.MAX_WATERMARK, 1, 0);
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.not((Matcher)Matchers.contains((Object[])new Object[]{Watermark.MAX_WATERMARK})));
            testHarness.processElement(Watermark.MAX_WATERMARK, 1, 1);
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])new Object[]{Watermark.MAX_WATERMARK}));
        }
    }

    @Test
    public void testWatermarkMetrics() throws Exception {
        final OperatorID mainOperatorId = new OperatorID();
        final OperatorID chainedOperatorId = new OperatorID();
        final InterceptingOperatorMetricGroup mainOperatorMetricGroup = new InterceptingOperatorMetricGroup();
        final InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
        InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup(){

            public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
                if (id.equals((Object)mainOperatorId)) {
                    return mainOperatorMetricGroup;
                }
                if (id.equals((Object)chainedOperatorId)) {
                    return chainedOperatorMetricGroup;
                }
                return super.getOrAddOperator(id, name);
            }
        };
        try (StreamTaskMailboxTestHarness<String> testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(config -> config.enableObjectReuse()).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addSourceInput(new SourceOperatorFactory((Source)new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 2, true, false), WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new RecordToWatermarkGenerator()))).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO).setupOperatorChain(mainOperatorId, (StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).chain(chainedOperatorId, new OneInputStreamTaskTest.WatermarkMetricOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())).finish().setTaskMetricGroup((TaskMetricGroup)taskMetricGroup).build();){
            Gauge taskInputWatermarkGauge = (Gauge)taskMetricGroup.get("currentInputWatermark");
            Gauge mainInput1WatermarkGauge = (Gauge)mainOperatorMetricGroup.get(MetricNames.currentInputWatermarkName((int)1));
            Gauge mainInput2WatermarkGauge = (Gauge)mainOperatorMetricGroup.get(MetricNames.currentInputWatermarkName((int)2));
            Gauge mainInput3WatermarkGauge = (Gauge)mainOperatorMetricGroup.get(MetricNames.currentInputWatermarkName((int)3));
            Gauge mainInputWatermarkGauge = (Gauge)mainOperatorMetricGroup.get("currentInputWatermark");
            Gauge mainOutputWatermarkGauge = (Gauge)mainOperatorMetricGroup.get("currentOutputWatermark");
            Gauge chainedInputWatermarkGauge = (Gauge)chainedOperatorMetricGroup.get("currentInputWatermark");
            Gauge chainedOutputWatermarkGauge = (Gauge)chainedOperatorMetricGroup.get("currentOutputWatermark");
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)taskInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)mainInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)mainInput1WatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)mainInput2WatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)mainInput3WatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)mainOutputWatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedOutputWatermarkGauge.getValue()));
            testHarness.processElement(new Watermark(1L), 0);
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)taskInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)mainInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)1L, (long)((Long)mainInput1WatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)mainInput2WatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)mainInput3WatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)mainOutputWatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedOutputWatermarkGauge.getValue()));
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, 2);
            testHarness.processAll();
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)taskInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)mainInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)1L, (long)((Long)mainInput1WatermarkGauge.getValue()));
            Assert.assertEquals((long)2L, (long)((Long)mainInput2WatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)mainInput3WatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)mainOutputWatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)Long.MIN_VALUE, (long)((Long)chainedOutputWatermarkGauge.getValue()));
            testHarness.processElement(new Watermark(2L), 1);
            Assert.assertEquals((long)1L, (long)((Long)taskInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)1L, (long)((Long)mainInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)1L, (long)((Long)mainInput1WatermarkGauge.getValue()));
            Assert.assertEquals((long)2L, (long)((Long)mainInput2WatermarkGauge.getValue()));
            Assert.assertEquals((long)2L, (long)((Long)mainInput3WatermarkGauge.getValue()));
            Assert.assertEquals((long)1L, (long)((Long)mainOutputWatermarkGauge.getValue()));
            Assert.assertEquals((long)1L, (long)((Long)chainedInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)2L, (long)((Long)chainedOutputWatermarkGauge.getValue()));
            testHarness.processElement(new Watermark(4L), 0);
            MultipleInputStreamTaskTest.addSourceRecords(testHarness, 1, 3);
            testHarness.processAll();
            Assert.assertEquals((long)2L, (long)((Long)taskInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)2L, (long)((Long)mainInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)4L, (long)((Long)mainInput1WatermarkGauge.getValue()));
            Assert.assertEquals((long)3L, (long)((Long)mainInput2WatermarkGauge.getValue()));
            Assert.assertEquals((long)2L, (long)((Long)mainInput3WatermarkGauge.getValue()));
            Assert.assertEquals((long)2L, (long)((Long)mainOutputWatermarkGauge.getValue()));
            Assert.assertEquals((long)2L, (long)((Long)chainedInputWatermarkGauge.getValue()));
            Assert.assertEquals((long)4L, (long)((Long)chainedOutputWatermarkGauge.getValue()));
            this.finishAddingRecords(testHarness, 1);
            testHarness.endInput();
            testHarness.waitForTaskCompletion();
        }
    }

    @Test
    public void testCheckpointBarrierMetrics() throws Exception {
        ConcurrentHashMap<String, Metric> metrics = new ConcurrentHashMap<String, Metric>();
        StreamTaskTestHarness.TestTaskMetricGroup taskMetricGroup = new StreamTaskTestHarness.TestTaskMetricGroup(metrics);
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO, 2).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO, 2).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).setTaskMetricGroup(taskMetricGroup).build();){
            MatcherAssert.assertThat(metrics, (Matcher)IsMapContaining.hasKey((Object)"checkpointAlignmentTime"));
            MatcherAssert.assertThat(metrics, (Matcher)IsMapContaining.hasKey((Object)"checkpointStartDelayNanos"));
            testHarness.endInput();
            testHarness.waitForTaskCompletion();
        }
    }

    @Test
    public void testLatencyMarker() throws Exception {
        ConcurrentHashMap<String, Metric> metrics = new ConcurrentHashMap<String, Metric>();
        StreamTaskTestHarness.TestTaskMetricGroup taskMetricGroup = new StreamTaskTestHarness.TestTaskMetricGroup(metrics);
        try (StreamTaskMailboxTestHarness testHarness = new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).setTaskMetricGroup(taskMetricGroup).build();){
            ArrayDeque<LatencyMarker> expectedOutput = new ArrayDeque<LatencyMarker>();
            OperatorID sourceId = new OperatorID();
            LatencyMarker latencyMarker = new LatencyMarker(42L, sourceId, 0);
            testHarness.processElement(latencyMarker);
            expectedOutput.add(latencyMarker);
            MatcherAssert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
            testHarness.endInput();
            testHarness.waitForTaskCompletion();
        }
    }

    static StreamTaskMailboxTestHarness<String> buildTestHarness() throws Exception {
        return MultipleInputStreamTaskTest.buildTestHarness(false);
    }

    static StreamTaskMailboxTestHarness<String> buildTestHarness(boolean unaligned) throws Exception {
        return new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(config -> config.enableObjectReuse()).modifyStreamConfig(config -> config.setUnalignedCheckpointsEnabled(unaligned)).modifyStreamConfig(config -> config.setAlignmentTimeout(Duration.ZERO)).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO).addSourceInput(new SourceOperatorFactory((Source)new MockSource(Boundedness.BOUNDED, 1), WatermarkStrategy.noWatermarks())).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).build();
    }

    static void addSourceRecords(StreamTaskMailboxTestHarness<String> testHarness, int sourceId, int ... records) throws Exception {
        OperatorID sourceOperatorID = MultipleInputStreamTaskTest.getSourceOperatorID(testHarness, sourceId);
        MockSourceSplit split = new MockSourceSplit(0, 0, records.length);
        for (int record : records) {
            split.addRecord(record);
        }
        AddSplitEvent addSplitEvent = new AddSplitEvent(Collections.singletonList(split), (SimpleVersionedSerializer)new MockSourceSplitSerializer());
        testHarness.getStreamTask().dispatchOperatorEvent(sourceOperatorID, new SerializedValue((Object)addSplitEvent));
    }

    private static StreamTaskMailboxTestHarness<String> buildWatermarkTestHarness(int inputChannels, boolean readerMarkIdleOnNoSplits) throws Exception {
        return new StreamTaskMailboxTestHarnessBuilder(MultipleInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).modifyExecutionConfig(config -> config.enableObjectReuse()).addInput((TypeInformation<?>)BasicTypeInfo.STRING_TYPE_INFO, inputChannels).addSourceInput(new SourceOperatorFactory((Source)new MockSource(Boundedness.CONTINUOUS_UNBOUNDED, 2, true, readerMarkIdleOnNoSplits), WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new RecordToWatermarkGenerator()))).addInput((TypeInformation<?>)BasicTypeInfo.DOUBLE_TYPE_INFO, inputChannels).setupOutputForSingletonOperatorChain((StreamOperatorFactory<?>)new MapToStringMultipleInputOperatorFactory(3)).build();
    }

    private static OperatorID getSourceOperatorID(StreamTaskMailboxTestHarness<String> testHarness, int sourceId) {
        StreamConfig.InputConfig[] inputs = testHarness.getStreamTask().getConfiguration().getInputs(testHarness.getClass().getClassLoader());
        StreamConfig.SourceInputConfig input = (StreamConfig.SourceInputConfig)inputs[sourceId];
        return testHarness.getStreamTask().operatorChain.getSourceTaskInput(input).getOperatorID();
    }

    private void finishAddingRecords(StreamTaskMailboxTestHarness<String> testHarness, int sourceId) throws Exception {
        testHarness.getStreamTask().dispatchOperatorEvent(MultipleInputStreamTaskTest.getSourceOperatorID(testHarness, sourceId), new SerializedValue((Object)new NoMoreSplitsEvent()));
    }

    private static class RecordToWatermarkGenerator
    implements WatermarkGenerator<Integer>,
    Serializable {
        private RecordToWatermarkGenerator() {
        }

        public void onEvent(Integer event, long eventTimestamp, WatermarkOutput output) {
            output.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark((long)event.intValue()));
        }

        public void onPeriodicEmit(WatermarkOutput output) {
        }
    }

    static class LifeCycleTrackingMap<T>
    extends AbstractStreamOperator<T>
    implements OneInputStreamOperator<T, T>,
    BoundedOneInput {
        public static final String OPEN = "LifeCycleTrackingMap#open";
        public static final String CLOSE = "LifeCycleTrackingMap#close";
        public static final String END_INPUT = "LifeCycleTrackingMap#endInput";

        LifeCycleTrackingMap() {
        }

        public void processElement(StreamRecord<T> element) throws Exception {
            this.output.collect(element);
        }

        public void open() throws Exception {
            LIFE_CYCLE_EVENTS.add(OPEN);
            super.open();
        }

        public void close() throws Exception {
            LIFE_CYCLE_EVENTS.add(CLOSE);
            super.close();
        }

        public void endInput() throws Exception {
            LIFE_CYCLE_EVENTS.add(END_INPUT);
        }
    }

    static class LifeCycleTrackingMockSourceReader
    extends MockSourceReader {
        public static final String START = "SourceReader#start";
        public static final String CLOSE = "SourceReader#close";

        LifeCycleTrackingMockSourceReader() {
        }

        public void start() {
            LIFE_CYCLE_EVENTS.add(START);
            super.start();
        }

        public void close() throws Exception {
            LIFE_CYCLE_EVENTS.add(CLOSE);
            super.close();
        }
    }

    static class LifeCycleTrackingMockSource
    extends MockSource {
        public LifeCycleTrackingMockSource(Boundedness boundedness, int numSplits) {
            super(boundedness, numSplits);
        }

        public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) {
            LifeCycleTrackingMockSourceReader sourceReader = new LifeCycleTrackingMockSourceReader();
            this.createdReaders.add(sourceReader);
            return sourceReader;
        }
    }

    static class LifeCycleTrackingMapToStringMultipleInputOperatorFactory
    extends AbstractStreamOperatorFactory<String> {
        LifeCycleTrackingMapToStringMultipleInputOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            return (T)((Object)new LifeCycleTrackingMapToStringMultipleInputOperator(parameters));
        }

        public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
            return LifeCycleTrackingMapToStringMultipleInputOperator.class;
        }
    }

    static class LifeCycleTrackingMapToStringMultipleInputOperator
    extends MapToStringMultipleInputOperator
    implements BoundedMultiInput {
        public static final String OPEN = "MultipleInputOperator#open";
        public static final String CLOSE = "MultipleInputOperator#close";
        public static final String END_INPUT = "MultipleInputOperator#endInput";
        private static final long serialVersionUID = 1L;

        public LifeCycleTrackingMapToStringMultipleInputOperator(StreamOperatorParameters<String> parameters) {
            super(parameters, 3);
        }

        @Override
        public void open() throws Exception {
            LIFE_CYCLE_EVENTS.add(OPEN);
            super.open();
        }

        @Override
        public void close() throws Exception {
            LIFE_CYCLE_EVENTS.add(CLOSE);
            super.close();
        }

        public void endInput(int inputId) {
            LIFE_CYCLE_EVENTS.add(END_INPUT);
        }
    }

    protected static class MapToStringMultipleInputOperatorFactory
    extends AbstractStreamOperatorFactory<String> {
        private final int numberOfInputs;

        public MapToStringMultipleInputOperatorFactory(int numberOfInputs) {
            this.numberOfInputs = numberOfInputs;
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            return (T)((Object)new MapToStringMultipleInputOperator(parameters, this.numberOfInputs));
        }

        public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
            return MapToStringMultipleInputOperator.class;
        }
    }

    private static class DuplicatingOperatorFactory
    extends AbstractStreamOperatorFactory<String> {
        private DuplicatingOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            return (T)((Object)new DuplicatingOperator(parameters));
        }

        public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
            return DuplicatingOperator.class;
        }
    }

    private static class TestBoundedMultipleInputOperatorFactory
    extends AbstractStreamOperatorFactory<String> {
        private TestBoundedMultipleInputOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            return (T)((Object)new TestBoundedMultipleInputOperator("Operator0", parameters));
        }

        public Class<? extends StreamOperator<String>> getStreamOperatorClass(ClassLoader classLoader) {
            return TestBoundedMultipleInputOperator.class;
        }
    }

    protected static class MapToStringMultipleInputOperator
    extends AbstractStreamOperatorV2<String>
    implements MultipleInputStreamOperator<String> {
        private static final long serialVersionUID = 1L;
        private final int numberOfInputs;
        private boolean openCalled;
        private boolean closeCalled;

        public MapToStringMultipleInputOperator(StreamOperatorParameters<String> parameters, int numberOfInputs) {
            super(parameters, numberOfInputs);
            this.numberOfInputs = numberOfInputs;
        }

        public void open() throws Exception {
            super.open();
            if (this.closeCalled) {
                Assert.fail((String)"Close called before open.");
            }
            this.openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            if (!this.openCalled) {
                Assert.fail((String)"Open was not called before close.");
            }
            this.closeCalled = true;
        }

        public List<Input> getInputs() {
            Preconditions.checkArgument((this.numberOfInputs <= 3 ? 1 : 0) != 0);
            return Arrays.asList(new Input[]{new MapToStringInput(this, 1), new MapToStringInput(this, 2), new MapToStringInput(this, 3)}).subList(0, this.numberOfInputs);
        }

        public boolean wasCloseCalled() {
            return this.closeCalled;
        }

        public class MapToStringInput<T>
        extends AbstractInput<T, String> {
            public MapToStringInput(AbstractStreamOperatorV2<String> owner, int inputId) {
                super(owner, inputId);
            }

            public void processElement(StreamRecord<T> element) throws Exception {
                if (!MapToStringMultipleInputOperator.this.openCalled) {
                    Assert.fail((String)"Open was not called before run.");
                }
                if (element.hasTimestamp()) {
                    this.output.collect((Object)new StreamRecord((Object)element.getValue().toString(), element.getTimestamp()));
                } else {
                    this.output.collect((Object)new StreamRecord((Object)element.getValue().toString()));
                }
            }
        }
    }

    static class DuplicatingOperator
    extends AbstractStreamOperatorV2<String>
    implements MultipleInputStreamOperator<String> {
        public DuplicatingOperator(StreamOperatorParameters<String> parameters) {
            super(parameters, 3);
        }

        public List<Input> getInputs() {
            return Arrays.asList(new Input[]{new DuplicatingInput(this, 1), new DuplicatingInput(this, 2), new DuplicatingInput(this, 3)});
        }

        class DuplicatingInput
        extends AbstractInput<String, String> {
            public DuplicatingInput(AbstractStreamOperatorV2<String> owner, int inputId) {
                super(owner, inputId);
            }

            public void processElement(StreamRecord<String> element) throws Exception {
                this.output.collect(element);
                this.output.collect(element);
            }
        }
    }
}

