/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

public class IntermediateResultPartitionTest
extends TestLogger {
    @Test
    public void testPipelinedPartitionConsumable() throws Exception {
        IntermediateResult result = IntermediateResultPartitionTest.createResult(ResultPartitionType.PIPELINED, 2);
        IntermediateResultPartition partition1 = result.getPartitions()[0];
        IntermediateResultPartition partition2 = result.getPartitions()[1];
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        partition1.markDataProduced();
        Assert.assertTrue((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        result.resetForNewExecution();
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
    }

    @Test
    public void testBlockingPartitionConsumable() throws Exception {
        IntermediateResult result = IntermediateResultPartitionTest.createResult(ResultPartitionType.BLOCKING, 2);
        IntermediateResultPartition partition1 = result.getPartitions()[0];
        IntermediateResultPartition partition2 = result.getPartitions()[1];
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        partition1.markFinished();
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        Assert.assertFalse((boolean)result.areAllPartitionsFinished());
        partition2.markFinished();
        Assert.assertTrue((boolean)partition1.isConsumable());
        Assert.assertTrue((boolean)partition2.isConsumable());
        Assert.assertTrue((boolean)result.areAllPartitionsFinished());
        result.resetForNewExecution();
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        Assert.assertFalse((boolean)result.areAllPartitionsFinished());
    }

    @Test
    public void testBlockingPartitionResetting() throws Exception {
        IntermediateResult result = IntermediateResultPartitionTest.createResult(ResultPartitionType.BLOCKING, 2);
        IntermediateResultPartition partition1 = result.getPartitions()[0];
        IntermediateResultPartition partition2 = result.getPartitions()[1];
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        partition1.markFinished();
        Assert.assertEquals((long)1L, (long)result.getNumberOfRunningProducers());
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        Assert.assertFalse((boolean)result.areAllPartitionsFinished());
        result.resetForNewExecution();
        Assert.assertEquals((long)2L, (long)result.getNumberOfRunningProducers());
        partition2.markFinished();
        Assert.assertEquals((long)1L, (long)result.getNumberOfRunningProducers());
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        Assert.assertFalse((boolean)result.areAllPartitionsFinished());
        partition1.markFinished();
        Assert.assertEquals((long)0L, (long)result.getNumberOfRunningProducers());
        Assert.assertTrue((boolean)partition1.isConsumable());
        Assert.assertTrue((boolean)partition2.isConsumable());
        Assert.assertTrue((boolean)result.areAllPartitionsFinished());
        result.resetForNewExecution();
        Assert.assertEquals((long)2L, (long)result.getNumberOfRunningProducers());
        Assert.assertFalse((boolean)partition1.isConsumable());
        Assert.assertFalse((boolean)partition2.isConsumable());
        Assert.assertFalse((boolean)result.areAllPartitionsFinished());
    }

    private static IntermediateResult createResult(ResultPartitionType resultPartitionType, int producerCount) throws Exception {
        JobVertex jobVertex = new JobVertex("v1");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(producerCount);
        jobVertex.createAndAddResultDataSet(resultPartitionType);
        ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex, (ScheduledExecutorService)new DirectScheduledExecutorService());
        IntermediateResult result = ejv.getProducedDataSets()[0];
        return result;
    }
}

