/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.SsgNetworkMemoryCalculationUtils;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.ProducerDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.shuffle.TaskInputsOutputsDescriptor;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.junit.Assert;
import org.junit.Test;

public class SsgNetworkMemoryCalculationUtilsTest {
    private static final TestShuffleMaster SHUFFLE_MASTER = new TestShuffleMaster();
    private static final ResourceProfile DEFAULT_RESOURCE = ResourceProfile.fromResources((double)1.0, (int)100);
    private JobGraph jobGraph;
    private ExecutionGraph executionGraph;
    private List<SlotSharingGroup> slotSharingGroups;

    @Test
    public void testGenerateEnrichedResourceProfile() throws Exception {
        this.setup(DEFAULT_RESOURCE);
        this.slotSharingGroups.forEach(ssg -> SsgNetworkMemoryCalculationUtils.enrichNetworkMemory((SlotSharingGroup)ssg, this.executionGraph.getAllVertices()::get, (ShuffleMaster)SHUFFLE_MASTER));
        Assert.assertEquals((Object)new MemorySize((long)(TestShuffleMaster.computeRequiredShuffleMemoryBytes(0, 2) + TestShuffleMaster.computeRequiredShuffleMemoryBytes(1, 6))), (Object)this.slotSharingGroups.get(0).getResourceProfile().getNetworkMemory());
        Assert.assertEquals((Object)new MemorySize((long)TestShuffleMaster.computeRequiredShuffleMemoryBytes(5, 0)), (Object)this.slotSharingGroups.get(1).getResourceProfile().getNetworkMemory());
    }

    @Test
    public void testGenerateUnknownResourceProfile() throws Exception {
        this.setup(ResourceProfile.UNKNOWN);
        this.slotSharingGroups.forEach(ssg -> SsgNetworkMemoryCalculationUtils.enrichNetworkMemory((SlotSharingGroup)ssg, this.executionGraph.getAllVertices()::get, (ShuffleMaster)SHUFFLE_MASTER));
        for (SlotSharingGroup slotSharingGroup : this.slotSharingGroups) {
            Assert.assertEquals((Object)ResourceProfile.UNKNOWN, (Object)slotSharingGroup.getResourceProfile());
        }
    }

    private void setup(ResourceProfile resourceProfile) throws Exception {
        this.slotSharingGroups = Arrays.asList(new SlotSharingGroup(), new SlotSharingGroup());
        for (SlotSharingGroup slotSharingGroup : this.slotSharingGroups) {
            slotSharingGroup.setResourceProfile(resourceProfile);
        }
        this.jobGraph = SsgNetworkMemoryCalculationUtilsTest.createJobGraph(this.slotSharingGroups);
        this.executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(this.jobGraph).build();
    }

    private static JobGraph createJobGraph(List<SlotSharingGroup> slotSharingGroups) {
        JobVertex source = new JobVertex("source");
        source.setInvokableClass(NoOpInvokable.class);
        source.setParallelism(4);
        source.setSlotSharingGroup(slotSharingGroups.get(0));
        JobVertex map = new JobVertex("map");
        map.setInvokableClass(NoOpInvokable.class);
        map.setParallelism(5);
        map.setSlotSharingGroup(slotSharingGroups.get(0));
        JobVertex sink = new JobVertex("sink");
        sink.setInvokableClass(NoOpInvokable.class);
        sink.setParallelism(6);
        sink.setSlotSharingGroup(slotSharingGroups.get(1));
        map.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        sink.connectNewDataSetAsInput(map, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        return JobGraphTestUtils.streamingJobGraph(source, map, sink);
    }

    private static class TestShuffleMaster
    implements ShuffleMaster<ShuffleDescriptor> {
        private TestShuffleMaster() {
        }

        public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
            return null;
        }

        public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
        }

        public MemorySize computeShuffleMemorySizeForTask(TaskInputsOutputsDescriptor desc) {
            int numTotalChannels = desc.getInputChannelNums().values().stream().mapToInt(Integer::intValue).sum();
            int numTotalSubpartitions = desc.getSubpartitionNums().values().stream().mapToInt(Integer::intValue).sum();
            return new MemorySize((long)TestShuffleMaster.computeRequiredShuffleMemoryBytes(numTotalChannels, numTotalSubpartitions));
        }

        static int computeRequiredShuffleMemoryBytes(int numTotalChannels, int numTotalSubpartitions) {
            return numTotalChannels * 10000 + numTotalSubpartitions;
        }
    }
}

