/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io.checkpointing;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TestingConnectionManager;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.testutils.DummyCheckpointInvokable;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.SyncMailboxExecutor;
import org.apache.flink.streaming.runtime.io.MockInputGate;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointSequenceValidator;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.checkpointing.UnalignedCheckpointsTest;
import org.apache.flink.streaming.runtime.io.checkpointing.ValidatingCheckpointHandler;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.apache.flink.util.clock.SystemClock;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

public class CheckpointBarrierTrackerTest {
    private static final int PAGE_SIZE = 512;
    private CheckpointedInputGate inputGate;

    @After
    public void ensureEmpty() throws Exception {
        Assert.assertFalse((boolean)this.inputGate.pollNext().isPresent());
        Assert.assertTrue((boolean)this.inputGate.isFinished());
    }

    @Test
    public void testSingleChannelNoBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0)};
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(1, sequence);
        for (BufferOrEvent boe : sequence) {
            Assert.assertEquals((Object)boe, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testMultiChannelNoBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(3), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(2)};
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(4, sequence);
        for (BufferOrEvent boe : sequence) {
            Assert.assertEquals((Object)boe, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testSingleChannelWithBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(1L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(2L, 0), CheckpointBarrierTrackerTest.createBarrier(3L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(4L, 0), CheckpointBarrierTrackerTest.createBarrier(5L, 0), CheckpointBarrierTrackerTest.createBarrier(6L, 0), CheckpointBarrierTrackerTest.createBuffer(0)};
        CheckpointSequenceValidator validator = new CheckpointSequenceValidator(1L, 2L, 3L, 4L, 5L, 6L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(1, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            Assert.assertEquals((Object)boe, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testSingleChannelWithSkippedBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(1L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(3L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(4L, 0), CheckpointBarrierTrackerTest.createBarrier(6L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(7L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(10L, 0), CheckpointBarrierTrackerTest.createBuffer(0)};
        CheckpointSequenceValidator validator = new CheckpointSequenceValidator(1L, 3L, 4L, 6L, 7L, 10L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(1, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            Assert.assertEquals((Object)boe, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testMultiChannelWithBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(1L, 1), CheckpointBarrierTrackerTest.createBarrier(1L, 2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBarrier(1L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(2L, 0), CheckpointBarrierTrackerTest.createBarrier(2L, 1), CheckpointBarrierTrackerTest.createBarrier(2L, 2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(3L, 2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(3L, 0), CheckpointBarrierTrackerTest.createBarrier(3L, 1), CheckpointBarrierTrackerTest.createBarrier(4L, 1), CheckpointBarrierTrackerTest.createBarrier(4L, 2), CheckpointBarrierTrackerTest.createBarrier(4L, 0), CheckpointBarrierTrackerTest.createBuffer(0)};
        CheckpointSequenceValidator validator = new CheckpointSequenceValidator(1L, 2L, 3L, 4L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(3, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            Assert.assertEquals((Object)boe, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testMultiChannelSkippingCheckpoints() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(1L, 1), CheckpointBarrierTrackerTest.createBarrier(1L, 2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBarrier(1L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(2L, 0), CheckpointBarrierTrackerTest.createBarrier(2L, 1), CheckpointBarrierTrackerTest.createBarrier(2L, 2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(3L, 2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(4L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(4L, 1), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBarrier(4L, 2), CheckpointBarrierTrackerTest.createBuffer(0)};
        CheckpointSequenceValidator validator = new CheckpointSequenceValidator(1L, 2L, 4L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(3, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            Assert.assertEquals((Object)boe, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testCompleteCheckpointsOnLateBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(2L, 1), CheckpointBarrierTrackerTest.createBarrier(2L, 0), CheckpointBarrierTrackerTest.createBarrier(2L, 2), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(3L, 1), CheckpointBarrierTrackerTest.createBarrier(3L, 2), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(4L, 2), CheckpointBarrierTrackerTest.createBarrier(4L, 1), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(3L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(4L, 0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(5L, 1), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(5L, 0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBarrier(5L, 2), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBarrier(6L, 1), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(6L, 0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBarrier(7L, 1), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(7L, 2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(8L, 2), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(8L, 1), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBarrier(9L, 1), CheckpointBarrierTrackerTest.createBarrier(7L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(9L, 2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(10L, 2), CheckpointBarrierTrackerTest.createBarrier(8L, 0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(9L, 0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(10L, 0), CheckpointBarrierTrackerTest.createBarrier(10L, 1)};
        CheckpointSequenceValidator validator = new CheckpointSequenceValidator(2L, 3L, 4L, 5L, 7L, 8L, 9L, 10L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(3, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            Assert.assertEquals((Object)boe, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testNextFirstCheckpointBarrierOvertakesCancellationBarrier() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBarrier(1L, 1), CheckpointBarrierTrackerTest.createBarrier(2L, 1), CheckpointBarrierTrackerTest.createCancellationBarrier(1L, 0), CheckpointBarrierTrackerTest.createBarrier(2L, 0)};
        ValidatingCheckpointHandler validator = new ValidatingCheckpointHandler();
        ManualClock manualClock = new ManualClock();
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(2, sequence, validator, (Clock)manualClock);
        for (BufferOrEvent boe : sequence) {
            Assert.assertEquals((Object)boe, this.inputGate.pollNext().get());
            manualClock.advanceTime(Duration.ofSeconds(1L));
        }
        Assert.assertEquals((long)Duration.ofSeconds(2L).toNanos(), (long)validator.lastAlignmentDurationNanos.get());
    }

    @Test
    public void testSingleChannelAbortCheckpoint() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(1L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(2L, 0), CheckpointBarrierTrackerTest.createCancellationBarrier(4L, 0), CheckpointBarrierTrackerTest.createBarrier(5L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createCancellationBarrier(6L, 0), CheckpointBarrierTrackerTest.createBuffer(0)};
        CheckpointSequenceValidator validator = new CheckpointSequenceValidator(1L, 2L, -4L, 5L, -6L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(1, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            Assert.assertEquals((Object)boe, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testMultiChannelAbortCheckpoint() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(1L, 1), CheckpointBarrierTrackerTest.createBarrier(1L, 2), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBarrier(1L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(2L, 0), CheckpointBarrierTrackerTest.createBarrier(2L, 2), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createCancellationBarrier(2L, 1), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBarrier(3L, 1), CheckpointBarrierTrackerTest.createBarrier(3L, 2), CheckpointBarrierTrackerTest.createBarrier(3L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createCancellationBarrier(4L, 1), CheckpointBarrierTrackerTest.createBarrier(4L, 2), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBarrier(4L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createBuffer(2), CheckpointBarrierTrackerTest.createBarrier(5L, 2), CheckpointBarrierTrackerTest.createBarrier(5L, 1), CheckpointBarrierTrackerTest.createBarrier(5L, 0), CheckpointBarrierTrackerTest.createBuffer(0), CheckpointBarrierTrackerTest.createBuffer(1), CheckpointBarrierTrackerTest.createCancellationBarrier(6L, 1), CheckpointBarrierTrackerTest.createCancellationBarrier(6L, 2), CheckpointBarrierTrackerTest.createBarrier(6L, 0), CheckpointBarrierTrackerTest.createBuffer(0)};
        CheckpointSequenceValidator validator = new CheckpointSequenceValidator(1L, -2L, 3L, -4L, 5L, -6L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(3, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            Assert.assertEquals((Object)boe, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testInterleavedCancellationBarriers() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBarrier(1L, 0), CheckpointBarrierTrackerTest.createCancellationBarrier(2L, 0), CheckpointBarrierTrackerTest.createCancellationBarrier(1L, 1), CheckpointBarrierTrackerTest.createCancellationBarrier(2L, 1), CheckpointBarrierTrackerTest.createCancellationBarrier(1L, 2), CheckpointBarrierTrackerTest.createCancellationBarrier(2L, 2), CheckpointBarrierTrackerTest.createBuffer(0)};
        CheckpointSequenceValidator validator = new CheckpointSequenceValidator(-1L, -2L);
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(3, sequence, validator);
        for (BufferOrEvent boe : sequence) {
            Assert.assertEquals((Object)boe, this.inputGate.pollNext().get());
        }
    }

    @Test
    public void testMetrics() throws Exception {
        ArrayList<BufferOrEvent> output = new ArrayList<BufferOrEvent>();
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
        int numberOfChannels = 3;
        this.inputGate = this.createCheckpointedInputGate(numberOfChannels, handler);
        int[] sequenceNumbers = new int[numberOfChannels];
        int bufferSize = 100;
        long checkpointId = 1L;
        long sleepTime = 10L;
        long checkpointBarrierCreation = System.currentTimeMillis();
        long alignmentStartNanos = System.nanoTime();
        Thread.sleep(sleepTime);
        UnalignedCheckpointsTest.addSequence(this.inputGate, output, sequenceNumbers, CheckpointBarrierTrackerTest.createBuffer(0, bufferSize), CheckpointBarrierTrackerTest.createBuffer(1, bufferSize), CheckpointBarrierTrackerTest.createBuffer(2, bufferSize), CheckpointBarrierTrackerTest.createBarrier(checkpointId, 1, checkpointBarrierCreation), CheckpointBarrierTrackerTest.createBuffer(0, bufferSize), CheckpointBarrierTrackerTest.createBuffer(2, bufferSize), CheckpointBarrierTrackerTest.createBarrier(checkpointId, 0), CheckpointBarrierTrackerTest.createBuffer(2, bufferSize));
        Thread.sleep(sleepTime);
        UnalignedCheckpointsTest.addSequence(this.inputGate, output, sequenceNumbers, CheckpointBarrierTrackerTest.createBarrier(checkpointId, 2), CheckpointBarrierTrackerTest.createBuffer(0, bufferSize), CheckpointBarrierTrackerTest.createBuffer(1, bufferSize), CheckpointBarrierTrackerTest.createBuffer(2, bufferSize), CheckpointBarrierTrackerTest.createEndOfPartition(0), CheckpointBarrierTrackerTest.createEndOfPartition(1), CheckpointBarrierTrackerTest.createEndOfPartition(2));
        long startDelay = System.currentTimeMillis() - checkpointBarrierCreation;
        long alignmentDuration = System.nanoTime() - alignmentStartNanos;
        Assert.assertThat((Object)(this.inputGate.getCheckpointStartDelayNanos() / 1000000L), (Matcher)Matchers.is((Matcher)Matchers.both((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(sleepTime))).and(Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(startDelay)))));
        Assert.assertTrue((boolean)handler.getLastAlignmentDurationNanos().isDone());
        Assert.assertThat((Object)(handler.getLastAlignmentDurationNanos().get() / 1000000L), (Matcher)Matchers.is((Matcher)Matchers.both((Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(sleepTime))).and(Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(alignmentDuration)))));
        Assert.assertTrue((boolean)handler.getLastBytesProcessedDuringAlignment().isDone());
        Assert.assertThat((Object)handler.getLastBytesProcessedDuringAlignment().get(), (Matcher)Matchers.equalTo((Object)(3L * (long)bufferSize)));
    }

    @Test
    public void testSingleChannelMetrics() throws Exception {
        ArrayList<BufferOrEvent> output = new ArrayList<BufferOrEvent>();
        ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
        int numberOfChannels = 1;
        this.inputGate = this.createCheckpointedInputGate(numberOfChannels, handler);
        int[] sequenceNumbers = new int[numberOfChannels];
        int bufferSize = 100;
        long checkpointId = 1L;
        long sleepTime = 10L;
        long checkpointBarrierCreation = System.currentTimeMillis();
        Thread.sleep(sleepTime);
        UnalignedCheckpointsTest.addSequence(this.inputGate, output, sequenceNumbers, CheckpointBarrierTrackerTest.createBuffer(0, bufferSize), CheckpointBarrierTrackerTest.createBarrier(checkpointId, 0, checkpointBarrierCreation), CheckpointBarrierTrackerTest.createBuffer(0, bufferSize), CheckpointBarrierTrackerTest.createEndOfPartition(0));
        long startDelay = System.currentTimeMillis() - checkpointBarrierCreation;
        Assert.assertThat((Object)(this.inputGate.getCheckpointStartDelayNanos() / 1000000L), (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(sleepTime)));
        Assert.assertThat((Object)(this.inputGate.getCheckpointStartDelayNanos() / 1000000L), (Matcher)Matchers.lessThanOrEqualTo((Comparable)Long.valueOf(startDelay)));
        Assert.assertTrue((boolean)handler.getLastAlignmentDurationNanos().isDone());
        Assert.assertThat((Object)handler.getLastAlignmentDurationNanos().get(), (Matcher)Matchers.equalTo((Object)0L));
        Assert.assertTrue((boolean)handler.getLastBytesProcessedDuringAlignment().isDone());
        Assert.assertThat((Object)handler.getLastBytesProcessedDuringAlignment().get(), (Matcher)Matchers.equalTo((Object)0L));
    }

    @Test
    public void testTwoLastBarriersOneByOne() throws Exception {
        BufferOrEvent[] sequence = new BufferOrEvent[]{CheckpointBarrierTrackerTest.createBarrier(1L, 1), CheckpointBarrierTrackerTest.createBarrier(2L, 1), CheckpointBarrierTrackerTest.createBarrier(1L, 0), CheckpointBarrierTrackerTest.createBarrier(2L, 0)};
        ValidatingCheckpointHandler validator = new ValidatingCheckpointHandler();
        ManualClock manualClock = new ManualClock();
        this.inputGate = CheckpointBarrierTrackerTest.createCheckpointedInputGate(2, sequence, validator, (Clock)manualClock);
        for (BufferOrEvent boe : sequence) {
            Assert.assertEquals((Object)boe, this.inputGate.pollNext().get());
            manualClock.advanceTime(Duration.ofSeconds(1L));
        }
        Assert.assertEquals((long)Duration.ofSeconds(2L).toNanos(), (long)validator.lastAlignmentDurationNanos.get());
    }

    private CheckpointedInputGate createCheckpointedInputGate(int numberOfChannels, AbstractInvokable toNotify) throws IOException {
        NettyShuffleEnvironment environment = new NettyShuffleEnvironmentBuilder().build();
        SingleInputGate gate = new SingleInputGateBuilder().setNumberOfChannels(numberOfChannels).setupBufferPoolFactory(environment).build();
        gate.setInputChannels((InputChannel[])IntStream.range(0, numberOfChannels).mapToObj(channelIndex -> InputChannelBuilder.newBuilder().setChannelIndex(channelIndex).setupFromNettyShuffleEnvironment(environment).setConnectionManager((ConnectionManager)new TestingConnectionManager()).buildRemoteChannel(gate)).toArray(RemoteInputChannel[]::new));
        gate.setup();
        gate.requestPartitions();
        return CheckpointBarrierTrackerTest.createCheckpointedInputGate((IndexedInputGate)gate, toNotify);
    }

    private static CheckpointedInputGate createCheckpointedInputGate(int numberOfChannels, BufferOrEvent[] sequence) {
        return CheckpointBarrierTrackerTest.createCheckpointedInputGate(numberOfChannels, sequence, (AbstractInvokable)new DummyCheckpointInvokable());
    }

    private static CheckpointedInputGate createCheckpointedInputGate(int numberOfChannels, BufferOrEvent[] sequence, @Nullable AbstractInvokable toNotifyOnCheckpoint) {
        MockInputGate gate = new MockInputGate(numberOfChannels, Arrays.asList(sequence));
        return CheckpointBarrierTrackerTest.createCheckpointedInputGate(gate, toNotifyOnCheckpoint);
    }

    private static CheckpointedInputGate createCheckpointedInputGate(int numberOfChannels, BufferOrEvent[] sequence, @Nullable AbstractInvokable toNotifyOnCheckpoint, Clock clock) {
        MockInputGate gate = new MockInputGate(numberOfChannels, Arrays.asList(sequence));
        return CheckpointBarrierTrackerTest.createCheckpointedInputGate(gate, toNotifyOnCheckpoint, clock);
    }

    private static CheckpointedInputGate createCheckpointedInputGate(IndexedInputGate inputGate, @Nullable AbstractInvokable toNotifyOnCheckpoint) {
        return CheckpointBarrierTrackerTest.createCheckpointedInputGate(inputGate, toNotifyOnCheckpoint, (Clock)SystemClock.getInstance());
    }

    private static CheckpointedInputGate createCheckpointedInputGate(IndexedInputGate inputGate, @Nullable AbstractInvokable toNotifyOnCheckpoint, Clock clock) {
        return new CheckpointedInputGate((InputGate)inputGate, (CheckpointBarrierHandler)new CheckpointBarrierTracker(inputGate.getNumberOfInputChannels(), toNotifyOnCheckpoint, clock), (MailboxExecutor)new SyncMailboxExecutor());
    }

    private static BufferOrEvent createBarrier(long checkpointId, int channel) {
        return CheckpointBarrierTrackerTest.createBarrier(checkpointId, channel, System.currentTimeMillis());
    }

    private static BufferOrEvent createBarrier(long checkpointId, int channel, long creationTimestamp) {
        return new BufferOrEvent((AbstractEvent)new CheckpointBarrier(checkpointId, creationTimestamp, CheckpointOptions.forCheckpointWithDefaultLocation()), new InputChannelInfo(0, channel));
    }

    private static BufferOrEvent createCancellationBarrier(long id, int channel) {
        return new BufferOrEvent((AbstractEvent)new CancelCheckpointMarker(id), new InputChannelInfo(0, channel));
    }

    private static BufferOrEvent createBuffer(int channel) {
        return new BufferOrEvent((Buffer)new NetworkBuffer(MemorySegmentFactory.wrap((byte[])new byte[]{1, 2}), FreeingBufferRecycler.INSTANCE), new InputChannelInfo(0, channel));
    }

    private static BufferOrEvent createBuffer(int channel, int size) {
        return new BufferOrEvent(TestBufferFactory.createBuffer((int)size), new InputChannelInfo(0, channel));
    }

    private static BufferOrEvent createEndOfPartition(int channel) {
        return new BufferOrEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, new InputChannelInfo(0, channel));
    }
}

