/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

public class IntermediateResultPartitionTest
extends TestLogger {
    @Test
    public void testPipelinedPartitionConsumable() throws Exception {
        IntermediateResult result = IntermediateResultPartitionTest.createResult(ResultPartitionType.PIPELINED, 2);
        IntermediateResultPartition partition1 = result.getPartitions()[0];
        IntermediateResultPartition partition2 = result.getPartitions()[1];
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        partition1.markDataProduced();
        Assert.assertTrue((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        result.resetForNewExecution();
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
    }

    @Test
    public void testBlockingPartitionConsumable() throws Exception {
        IntermediateResult result = IntermediateResultPartitionTest.createResult(ResultPartitionType.BLOCKING, 2);
        IntermediateResultPartition partition1 = result.getPartitions()[0];
        IntermediateResultPartition partition2 = result.getPartitions()[1];
        ConsumedPartitionGroup consumedPartitionGroup = (ConsumedPartitionGroup)partition1.getConsumedPartitionGroups().get(0);
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        Assert.assertFalse((boolean)consumedPartitionGroup.areAllPartitionsFinished());
        partition1.markFinished();
        Assert.assertTrue((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        Assert.assertFalse((boolean)consumedPartitionGroup.areAllPartitionsFinished());
        partition2.markFinished();
        Assert.assertTrue((boolean)partition1.isConsumable());
        Assert.assertTrue((boolean)partition2.isConsumable());
        Assert.assertTrue((boolean)consumedPartitionGroup.areAllPartitionsFinished());
        result.resetForNewExecution();
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        Assert.assertFalse((boolean)consumedPartitionGroup.areAllPartitionsFinished());
    }

    @Test
    public void testBlockingPartitionResetting() throws Exception {
        IntermediateResult result = IntermediateResultPartitionTest.createResult(ResultPartitionType.BLOCKING, 2);
        IntermediateResultPartition partition1 = result.getPartitions()[0];
        IntermediateResultPartition partition2 = result.getPartitions()[1];
        ConsumedPartitionGroup consumedPartitionGroup = (ConsumedPartitionGroup)partition1.getConsumedPartitionGroups().get(0);
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        partition1.markFinished();
        Assert.assertEquals((long)1L, (long)consumedPartitionGroup.getNumberOfUnfinishedPartitions());
        Assert.assertTrue((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        Assert.assertFalse((boolean)consumedPartitionGroup.areAllPartitionsFinished());
        result.resetForNewExecution();
        Assert.assertEquals((long)2L, (long)consumedPartitionGroup.getNumberOfUnfinishedPartitions());
        partition2.markFinished();
        Assert.assertEquals((long)1L, (long)consumedPartitionGroup.getNumberOfUnfinishedPartitions());
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertTrue((boolean)partition2.isConsumable());
        Assert.assertFalse((boolean)consumedPartitionGroup.areAllPartitionsFinished());
        partition1.markFinished();
        Assert.assertEquals((long)0L, (long)consumedPartitionGroup.getNumberOfUnfinishedPartitions());
        Assert.assertTrue((boolean)partition1.isConsumable());
        Assert.assertTrue((boolean)partition2.isConsumable());
        Assert.assertTrue((boolean)consumedPartitionGroup.areAllPartitionsFinished());
        result.resetForNewExecution();
        Assert.assertEquals((long)2L, (long)consumedPartitionGroup.getNumberOfUnfinishedPartitions());
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        Assert.assertFalse((boolean)consumedPartitionGroup.areAllPartitionsFinished());
    }

    private static IntermediateResult createResult(ResultPartitionType resultPartitionType, int parallelism) throws Exception {
        JobVertex source = new JobVertex("v1");
        source.setInvokableClass(NoOpInvokable.class);
        source.setParallelism(parallelism);
        JobVertex sink = new JobVertex("v2");
        sink.setInvokableClass(NoOpInvokable.class);
        sink.setParallelism(parallelism);
        sink.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, resultPartitionType);
        DirectScheduledExecutorService executorService = new DirectScheduledExecutorService();
        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, sink);
        DefaultScheduler scheduler = SchedulerTestingUtils.newSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread()).setIoExecutor(executorService).setFutureExecutor(executorService).build();
        ExecutionJobVertex ejv = scheduler.getExecutionJobVertex(source.getID());
        return ejv.getProducedDataSets()[0];
    }
}

