/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class DefaultExecutionGraphConstructionTest {
    private ExecutionGraph createDefaultExecutionGraph(List<JobVertex> vertices) throws Exception {
        return TestingDefaultExecutionGraphBuilder.newBuilder().setVertexParallelismStore(SchedulerBase.computeVertexParallelismStore(vertices)).build();
    }

    @Test
    public void testExecutionAttemptIdInTwoIdenticalJobsIsNotSame() throws Exception {
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        v1.setParallelism(5);
        v2.setParallelism(7);
        v3.setParallelism(2);
        v1.setInvokableClass(AbstractInvokable.class);
        v2.setInvokableClass(AbstractInvokable.class);
        v3.setInvokableClass(AbstractInvokable.class);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
        ExecutionGraph eg1 = this.createDefaultExecutionGraph(ordered);
        ExecutionGraph eg2 = this.createDefaultExecutionGraph(ordered);
        eg1.attachJobGraph(ordered);
        eg2.attachJobGraph(ordered);
        Assert.assertThat((Object)Sets.intersection(eg1.getRegisteredExecutions().keySet(), eg2.getRegisteredExecutions().keySet()), (Matcher)Matchers.is((Matcher)Matchers.empty()));
    }

    @Test
    public void testCreateSimpleGraphBipartite() throws Exception {
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        JobVertex v4 = new JobVertex("vertex4");
        JobVertex v5 = new JobVertex("vertex5");
        v1.setParallelism(5);
        v2.setParallelism(7);
        v3.setParallelism(2);
        v4.setParallelism(11);
        v5.setParallelism(4);
        v1.setInvokableClass(AbstractInvokable.class);
        v2.setInvokableClass(AbstractInvokable.class);
        v3.setInvokableClass(AbstractInvokable.class);
        v4.setInvokableClass(AbstractInvokable.class);
        v5.setInvokableClass(AbstractInvokable.class);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
        ExecutionGraph eg = this.createDefaultExecutionGraph(ordered);
        try {
            eg.attachJobGraph(ordered);
        }
        catch (JobException e) {
            e.printStackTrace();
            Assert.fail((String)("Job failed with exception: " + e.getMessage()));
        }
        this.verifyTestGraph(eg, v1, v2, v3, v4, v5);
    }

    @Test
    public void testAttachViaDataSets() throws Exception {
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        v1.setParallelism(5);
        v2.setParallelism(7);
        v3.setParallelism(2);
        v1.setInvokableClass(AbstractInvokable.class);
        v2.setInvokableClass(AbstractInvokable.class);
        v3.setInvokableClass(AbstractInvokable.class);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        IntermediateDataSet v2result = v2.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        IntermediateDataSet v3result_1 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        JobVertex v4 = new JobVertex("vertex4");
        JobVertex v5 = new JobVertex("vertex5");
        v4.setParallelism(11);
        v5.setParallelism(4);
        v4.setInvokableClass(AbstractInvokable.class);
        v5.setInvokableClass(AbstractInvokable.class);
        v4.connectDataSetAsInput(v2result, DistributionPattern.ALL_TO_ALL);
        v4.connectDataSetAsInput(v3result_1, DistributionPattern.ALL_TO_ALL);
        v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v5.connectDataSetAsInput(v3result_2, DistributionPattern.ALL_TO_ALL);
        List<JobVertex> ordered = Arrays.asList(v1, v2, v3);
        List<JobVertex> ordered2 = Arrays.asList(v4, v5);
        ExecutionGraph eg = this.createDefaultExecutionGraph(Stream.concat(ordered.stream(), ordered2.stream()).collect(Collectors.toList()));
        try {
            eg.attachJobGraph(ordered);
        }
        catch (JobException e) {
            e.printStackTrace();
            Assert.fail((String)("Job failed with exception: " + e.getMessage()));
        }
        try {
            eg.attachJobGraph(ordered2);
        }
        catch (JobException e) {
            e.printStackTrace();
            Assert.fail((String)("Job failed with exception: " + e.getMessage()));
        }
        this.verifyTestGraph(eg, v1, v2, v3, v4, v5);
    }

    @Test
    public void testAttachViaIds() throws Exception {
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        v1.setParallelism(5);
        v2.setParallelism(7);
        v3.setParallelism(2);
        v1.setInvokableClass(AbstractInvokable.class);
        v2.setInvokableClass(AbstractInvokable.class);
        v3.setInvokableClass(AbstractInvokable.class);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        IntermediateDataSet v2result = v2.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        IntermediateDataSet v3result_1 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        JobVertex v4 = new JobVertex("vertex4");
        JobVertex v5 = new JobVertex("vertex5");
        v4.setParallelism(11);
        v5.setParallelism(4);
        v4.setInvokableClass(AbstractInvokable.class);
        v5.setInvokableClass(AbstractInvokable.class);
        v4.connectIdInput(v2result.getId(), DistributionPattern.ALL_TO_ALL);
        v4.connectIdInput(v3result_1.getId(), DistributionPattern.ALL_TO_ALL);
        v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v5.connectIdInput(v3result_2.getId(), DistributionPattern.ALL_TO_ALL);
        List<JobVertex> ordered = Arrays.asList(v1, v2, v3);
        List<JobVertex> ordered2 = Arrays.asList(v4, v5);
        ExecutionGraph eg = this.createDefaultExecutionGraph(Stream.concat(ordered.stream(), ordered2.stream()).collect(Collectors.toList()));
        try {
            eg.attachJobGraph(ordered);
        }
        catch (JobException e) {
            e.printStackTrace();
            Assert.fail((String)("Job failed with exception: " + e.getMessage()));
        }
        try {
            eg.attachJobGraph(ordered2);
        }
        catch (JobException e) {
            e.printStackTrace();
            Assert.fail((String)("Job failed with exception: " + e.getMessage()));
        }
        this.verifyTestGraph(eg, v1, v2, v3, v4, v5);
    }

    private void verifyTestGraph(ExecutionGraph eg, JobVertex v1, JobVertex v2, JobVertex v3, JobVertex v4, JobVertex v5) {
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(eg, v1, null, Collections.singletonList(v2));
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(eg, v2, Collections.singletonList(v1), Collections.singletonList(v4));
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(eg, v3, null, Arrays.asList(v4, v5));
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(eg, v4, Arrays.asList(v2, v3), Collections.singletonList(v5));
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(eg, v5, Arrays.asList(v4, v3), null);
    }

    @Test
    public void testCannotConnectMissingId() throws Exception {
        JobVertex v1 = new JobVertex("vertex1");
        v1.setParallelism(7);
        v1.setInvokableClass(AbstractInvokable.class);
        JobVertex v2 = new JobVertex("vertex2");
        v2.setInvokableClass(AbstractInvokable.class);
        v2.connectIdInput(new IntermediateDataSetID(), DistributionPattern.ALL_TO_ALL);
        List<JobVertex> ordered = Arrays.asList(v1);
        List<JobVertex> ordered2 = Arrays.asList(v2);
        ExecutionGraph eg = this.createDefaultExecutionGraph(Stream.concat(ordered.stream(), ordered2.stream()).collect(Collectors.toList()));
        try {
            eg.attachJobGraph(ordered);
        }
        catch (JobException e) {
            e.printStackTrace();
            Assert.fail((String)("Job failed with exception: " + e.getMessage()));
        }
        try {
            eg.attachJobGraph(ordered2);
            Assert.fail((String)"Attached wrong jobgraph");
        }
        catch (JobException jobException) {
            // empty catch block
        }
    }

    @Test
    public void testCannotConnectWrongOrder() throws Exception {
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        JobVertex v4 = new JobVertex("vertex4");
        JobVertex v5 = new JobVertex("vertex5");
        v1.setParallelism(5);
        v2.setParallelism(7);
        v3.setParallelism(2);
        v4.setParallelism(11);
        v5.setParallelism(4);
        v1.setInvokableClass(AbstractInvokable.class);
        v2.setInvokableClass(AbstractInvokable.class);
        v3.setInvokableClass(AbstractInvokable.class);
        v4.setInvokableClass(AbstractInvokable.class);
        v5.setInvokableClass(AbstractInvokable.class);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
        ExecutionGraph eg = this.createDefaultExecutionGraph(ordered);
        try {
            eg.attachJobGraph(ordered);
            Assert.fail((String)"Attached wrong jobgraph");
        }
        catch (JobException jobException) {
            // empty catch block
        }
    }

    @Test
    public void testSetupInputSplits() {
        try {
            InputSplit[] emptySplits = new InputSplit[]{};
            InputSplitAssigner assigner1 = (InputSplitAssigner)Mockito.mock(InputSplitAssigner.class);
            InputSplitAssigner assigner2 = (InputSplitAssigner)Mockito.mock(InputSplitAssigner.class);
            InputSplitSource source1 = (InputSplitSource)Mockito.mock(InputSplitSource.class);
            InputSplitSource source2 = (InputSplitSource)Mockito.mock(InputSplitSource.class);
            Mockito.when((Object)source1.createInputSplits(org.mockito.Matchers.anyInt())).thenReturn((Object)emptySplits);
            Mockito.when((Object)source2.createInputSplits(org.mockito.Matchers.anyInt())).thenReturn((Object)emptySplits);
            Mockito.when((Object)source1.getInputSplitAssigner(emptySplits)).thenReturn((Object)assigner1);
            Mockito.when((Object)source2.getInputSplitAssigner(emptySplits)).thenReturn((Object)assigner2);
            JobID jobId = new JobID();
            String jobName = "Test Job Sample Name";
            Configuration cfg = new Configuration();
            JobVertex v1 = new JobVertex("vertex1");
            JobVertex v2 = new JobVertex("vertex2");
            JobVertex v3 = new JobVertex("vertex3");
            JobVertex v4 = new JobVertex("vertex4");
            JobVertex v5 = new JobVertex("vertex5");
            v1.setParallelism(5);
            v2.setParallelism(7);
            v3.setParallelism(2);
            v4.setParallelism(11);
            v5.setParallelism(4);
            v1.setInvokableClass(AbstractInvokable.class);
            v2.setInvokableClass(AbstractInvokable.class);
            v3.setInvokableClass(AbstractInvokable.class);
            v4.setInvokableClass(AbstractInvokable.class);
            v5.setInvokableClass(AbstractInvokable.class);
            v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            v3.setInputSplitSource(source1);
            v5.setInputSplitSource(source2);
            ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
            ExecutionGraph eg = this.createDefaultExecutionGraph(ordered);
            try {
                eg.attachJobGraph(ordered);
            }
            catch (JobException e) {
                e.printStackTrace();
                Assert.fail((String)("Job failed with exception: " + e.getMessage()));
            }
            Assert.assertEquals((Object)assigner1, (Object)((ExecutionJobVertex)eg.getAllVertices().get(v3.getID())).getSplitAssigner());
            Assert.assertEquals((Object)assigner2, (Object)((ExecutionJobVertex)eg.getAllVertices().get(v5.getID())).getSplitAssigner());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMoreThanOneConsumerForIntermediateResult() {
        try {
            JobVertex v1 = new JobVertex("vertex1");
            JobVertex v2 = new JobVertex("vertex2");
            JobVertex v3 = new JobVertex("vertex3");
            v1.setParallelism(5);
            v2.setParallelism(7);
            v3.setParallelism(2);
            IntermediateDataSet result = v1.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
            v2.connectDataSetAsInput(result, DistributionPattern.ALL_TO_ALL);
            v3.connectDataSetAsInput(result, DistributionPattern.ALL_TO_ALL);
            ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
            ExecutionGraph eg = this.createDefaultExecutionGraph(ordered);
            try {
                eg.attachJobGraph(ordered);
                Assert.fail((String)"Should not be possible");
            }
            catch (RuntimeException runtimeException) {}
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }
}

