/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.LocalInputPreferredSlotSharingStrategy;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class LocalInputPreferredSlotSharingStrategyTest
extends TestLogger {
    private TestingSchedulingTopology topology;
    private static final JobVertexID JOB_VERTEX_ID_1 = new JobVertexID();
    private static final JobVertexID JOB_VERTEX_ID_2 = new JobVertexID();
    private TestingSchedulingExecutionVertex ev11;
    private TestingSchedulingExecutionVertex ev12;
    private TestingSchedulingExecutionVertex ev21;
    private TestingSchedulingExecutionVertex ev22;
    private Set<SlotSharingGroup> slotSharingGroups;

    @Before
    public void setUp() throws Exception {
        this.topology = new TestingSchedulingTopology();
        this.ev11 = this.topology.newExecutionVertex(JOB_VERTEX_ID_1, 0);
        this.ev12 = this.topology.newExecutionVertex(JOB_VERTEX_ID_1, 1);
        this.ev21 = this.topology.newExecutionVertex(JOB_VERTEX_ID_2, 0);
        this.ev22 = this.topology.newExecutionVertex(JOB_VERTEX_ID_2, 1);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_1);
        slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_2);
        this.slotSharingGroups = Collections.singleton(slotSharingGroup);
    }

    @Test
    public void testCoLocationConstraintIsRespected() {
        this.topology.connect(this.ev11, this.ev22);
        this.topology.connect(this.ev12, this.ev21);
        TestingCoLocationGroup coLocationGroup = new TestingCoLocationGroup(new JobVertexID[]{JOB_VERTEX_ID_1, JOB_VERTEX_ID_2});
        Set<TestingCoLocationGroup> coLocationGroups = Collections.singleton(coLocationGroup);
        LocalInputPreferredSlotSharingStrategy strategy = new LocalInputPreferredSlotSharingStrategy((SchedulingTopology)this.topology, this.slotSharingGroups, coLocationGroups);
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroups(), (Matcher)Matchers.hasSize((int)2));
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroup(this.ev11.getId()).getExecutionVertexIds(), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{this.ev11.getId(), this.ev21.getId()}));
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroup(this.ev12.getId()).getExecutionVertexIds(), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{this.ev12.getId(), this.ev22.getId()}));
    }

    @Test
    public void testInputLocalityIsRespectedWithRescaleEdge() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex ev11 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 0);
        TestingSchedulingExecutionVertex ev12 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 1);
        TestingSchedulingExecutionVertex ev21 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 0);
        TestingSchedulingExecutionVertex ev22 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 1);
        TestingSchedulingExecutionVertex ev23 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 2);
        topology.connect(ev11, ev21);
        topology.connect(ev11, ev22);
        topology.connect(ev12, ev23);
        LocalInputPreferredSlotSharingStrategy strategy = new LocalInputPreferredSlotSharingStrategy((SchedulingTopology)topology, this.slotSharingGroups, Collections.emptySet());
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroups(), (Matcher)Matchers.hasSize((int)3));
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroup(ev21.getId()).getExecutionVertexIds(), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{ev11.getId(), ev21.getId()}));
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroup(ev22.getId()).getExecutionVertexIds(), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{ev22.getId()}));
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroup(ev23.getId()).getExecutionVertexIds(), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{ev12.getId(), ev23.getId()}));
    }

    @Test
    public void testInputLocalityIsRespectedWithAllToAllEdge() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        List<TestingSchedulingExecutionVertex> producer = topology.addExecutionVertices().withParallelism(2).withJobVertexID(JOB_VERTEX_ID_1).finish();
        List<TestingSchedulingExecutionVertex> consumer = topology.addExecutionVertices().withParallelism(2).withJobVertexID(JOB_VERTEX_ID_2).finish();
        topology.connectAllToAll(producer, consumer).withResultPartitionType(ResultPartitionType.BLOCKING).finish();
        this.ev11 = producer.get(0);
        this.ev12 = producer.get(1);
        this.ev21 = consumer.get(0);
        this.ev22 = consumer.get(1);
        LocalInputPreferredSlotSharingStrategy strategy = new LocalInputPreferredSlotSharingStrategy((SchedulingTopology)topology, this.slotSharingGroups, Collections.emptySet());
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroups(), (Matcher)Matchers.hasSize((int)2));
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroup(this.ev21.getId()).getExecutionVertexIds(), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{this.ev11.getId(), this.ev21.getId()}));
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroup(this.ev22.getId()).getExecutionVertexIds(), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{this.ev12.getId(), this.ev22.getId()}));
    }

    @Test
    public void testDisjointVerticesInOneGroup() {
        LocalInputPreferredSlotSharingStrategy strategy = new LocalInputPreferredSlotSharingStrategy((SchedulingTopology)this.topology, this.slotSharingGroups, Collections.emptySet());
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroups(), (Matcher)Matchers.hasSize((int)2));
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroup(this.ev11.getId()).getExecutionVertexIds(), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{this.ev11.getId(), this.ev21.getId()}));
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroup(this.ev12.getId()).getExecutionVertexIds(), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{this.ev12.getId(), this.ev22.getId()}));
    }

    @Test
    public void testVerticesInDifferentSlotSharingGroups() {
        SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
        slotSharingGroup1.addVertexToGroup(JOB_VERTEX_ID_1);
        SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
        slotSharingGroup2.addVertexToGroup(JOB_VERTEX_ID_2);
        HashSet<SlotSharingGroup> slotSharingGroups = new HashSet<SlotSharingGroup>();
        slotSharingGroups.add(slotSharingGroup1);
        slotSharingGroups.add(slotSharingGroup2);
        LocalInputPreferredSlotSharingStrategy strategy = new LocalInputPreferredSlotSharingStrategy((SchedulingTopology)this.topology, slotSharingGroups, Collections.emptySet());
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroups(), (Matcher)Matchers.hasSize((int)4));
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroup(this.ev11.getId()).getExecutionVertexIds(), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{this.ev11.getId()}));
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroup(this.ev12.getId()).getExecutionVertexIds(), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{this.ev12.getId()}));
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroup(this.ev21.getId()).getExecutionVertexIds(), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{this.ev21.getId()}));
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroup(this.ev22.getId()).getExecutionVertexIds(), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{this.ev22.getId()}));
    }

    @Test
    public void testSetSlotSharingGroupResource() {
        SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
        ResourceProfile resourceProfile1 = ResourceProfile.fromResources((double)1.0, (int)10);
        slotSharingGroup1.addVertexToGroup(JOB_VERTEX_ID_1);
        slotSharingGroup1.setResourceProfile(resourceProfile1);
        SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
        ResourceProfile resourceProfile2 = ResourceProfile.fromResources((double)2.0, (int)20);
        slotSharingGroup2.addVertexToGroup(JOB_VERTEX_ID_2);
        slotSharingGroup2.setResourceProfile(resourceProfile2);
        HashSet<SlotSharingGroup> slotSharingGroups = new HashSet<SlotSharingGroup>();
        slotSharingGroups.add(slotSharingGroup1);
        slotSharingGroups.add(slotSharingGroup2);
        LocalInputPreferredSlotSharingStrategy strategy = new LocalInputPreferredSlotSharingStrategy((SchedulingTopology)this.topology, slotSharingGroups, Collections.emptySet());
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroups(), (Matcher)Matchers.hasSize((int)4));
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroup(this.ev11.getId()).getResourceProfile(), (Matcher)Matchers.equalTo((Object)resourceProfile1));
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroup(this.ev12.getId()).getResourceProfile(), (Matcher)Matchers.equalTo((Object)resourceProfile1));
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroup(this.ev21.getId()).getResourceProfile(), (Matcher)Matchers.equalTo((Object)resourceProfile2));
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroup(this.ev22.getId()).getResourceProfile(), (Matcher)Matchers.equalTo((Object)resourceProfile2));
    }

    @Test
    public void testInputLocalityIsRespectedWithTwoEdgesBetweenTwoVertices() throws Exception {
        int parallelism = 4;
        JobVertex v1 = LocalInputPreferredSlotSharingStrategyTest.createJobVertex("v1", JOB_VERTEX_ID_1, parallelism);
        JobVertex v2 = LocalInputPreferredSlotSharingStrategyTest.createJobVertex("v2", JOB_VERTEX_ID_2, parallelism);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        Assert.assertEquals((long)2L, (long)v1.getProducedDataSets().size());
        Assert.assertEquals((long)2L, (long)v2.getInputs().size());
        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(v1, v2);
        DefaultExecutionGraph executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build();
        SchedulingTopology topology = executionGraph.getSchedulingTopology();
        LocalInputPreferredSlotSharingStrategy strategy = new LocalInputPreferredSlotSharingStrategy(topology, this.slotSharingGroups, Collections.emptySet());
        MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroups(), (Matcher)Matchers.hasSize((int)4));
        ExecutionVertex[] ev1 = Objects.requireNonNull(executionGraph.getJobVertex(JOB_VERTEX_ID_1)).getTaskVertices();
        ExecutionVertex[] ev2 = Objects.requireNonNull(executionGraph.getJobVertex(JOB_VERTEX_ID_2)).getTaskVertices();
        for (int i = 0; i < parallelism; ++i) {
            MatcherAssert.assertThat((Object)strategy.getExecutionSlotSharingGroup(ev1[i].getID()).getExecutionVertexIds(), (Matcher)Matchers.containsInAnyOrder((Object[])new ExecutionVertexID[]{ev1[i].getID(), ev2[i].getID()}));
        }
    }

    private static JobVertex createJobVertex(String vertexName, JobVertexID vertexId, int parallelism) {
        JobVertex jobVertex = new JobVertex(vertexName, vertexId);
        jobVertex.setParallelism(parallelism);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        return jobVertex;
    }

    private static class TestingCoLocationGroup
    extends CoLocationGroupImpl {
        private final List<JobVertexID> vertexIDs;

        private TestingCoLocationGroup(JobVertexID ... vertexIDs) {
            super(new JobVertex[0]);
            this.vertexIDs = Arrays.asList(vertexIDs);
        }

        public List<JobVertexID> getVertexIds() {
            return this.vertexIDs;
        }
    }
}

