/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskExecutorManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskExecutorManagerBuilder;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingResourceActionsBuilder;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class TaskExecutorManagerTest
extends TestLogger {
    @Test
    public void testPendingSlotNotFulfilledIfProfilesAreNotExactMatch() {
        int numWorkerCpuCores = 3;
        WorkerResourceSpec workerResourceSpec = new WorkerResourceSpec.Builder().setCpuCores(3.0).build();
        ResourceProfile requestedSlotProfile = ResourceProfile.newBuilder().setCpuCores(3.0).build();
        ResourceProfile offeredSlotProfile = ResourceProfile.newBuilder().setCpuCores(2.0).build();
        try (TaskExecutorManager taskExecutorManager = TaskExecutorManagerTest.createTaskExecutorManagerBuilder().setDefaultWorkerResourceSpec(workerResourceSpec).setNumSlotsPerWorker(1).setMaxNumSlots(2).createTaskExecutorManager();){
            taskExecutorManager.allocateWorker(requestedSlotProfile);
            Assert.assertThat((Object)taskExecutorManager.getNumberPendingTaskManagerSlots(), (Matcher)Matchers.is((Object)1));
            TaskExecutorManagerTest.createAndRegisterTaskExecutor(taskExecutorManager, 1, offeredSlotProfile);
            Assert.assertThat((Object)taskExecutorManager.getNumberRegisteredSlots(), (Matcher)Matchers.is((Object)1));
            Assert.assertThat((Object)taskExecutorManager.getNumberPendingTaskManagerSlots(), (Matcher)Matchers.is((Object)1));
        }
    }

    @Test
    public void testPendingSlotNotFulfilledByAllocatedSlot() {
        int numWorkerCpuCores = 3;
        WorkerResourceSpec workerResourceSpec = new WorkerResourceSpec.Builder().setCpuCores(3.0).build();
        ResourceProfile requestedSlotProfile = ResourceProfile.newBuilder().setCpuCores(3.0).build();
        try (TaskExecutorManager taskExecutorManager = TaskExecutorManagerTest.createTaskExecutorManagerBuilder().setDefaultWorkerResourceSpec(workerResourceSpec).setNumSlotsPerWorker(1).setMaxNumSlots(2).createTaskExecutorManager();){
            taskExecutorManager.allocateWorker(requestedSlotProfile);
            Assert.assertThat((Object)taskExecutorManager.getNumberPendingTaskManagerSlots(), (Matcher)Matchers.is((Object)1));
            TaskExecutorConnection taskExecutorConnection = TaskExecutorManagerTest.createTaskExecutorConnection();
            SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(taskExecutorConnection.getResourceID(), 0), requestedSlotProfile, JobID.generate(), new AllocationID()));
            taskExecutorManager.registerTaskManager(taskExecutorConnection, slotReport, ResourceProfile.ANY, ResourceProfile.ANY);
            Assert.assertThat((Object)taskExecutorManager.getNumberRegisteredSlots(), (Matcher)Matchers.is((Object)1));
            Assert.assertThat((Object)taskExecutorManager.getNumberPendingTaskManagerSlots(), (Matcher)Matchers.is((Object)1));
        }
    }

    @Test
    public void testTaskManagerTimeoutDoesNotRemoveSlots() throws Exception {
        Time taskManagerTimeout = Time.milliseconds((long)10L);
        CompletableFuture releaseResourceFuture = new CompletableFuture();
        TestingResourceActions resourceActions = TaskExecutorManagerTest.createResourceActionsBuilder().setReleaseResourceConsumer((instanceId, ignored) -> releaseResourceFuture.complete(instanceId)).build();
        ScheduledExecutorService mainThreadExecutor = TestingUtils.defaultExecutor();
        try (TaskExecutorManager taskExecutorManager = TaskExecutorManagerTest.createTaskExecutorManagerBuilder().setTaskManagerTimeout(taskManagerTimeout).setResourceActions(resourceActions).setMainThreadExecutor(mainThreadExecutor).createTaskExecutorManager();){
            ((CompletableFuture)((CompletableFuture)CompletableFuture.supplyAsync(() -> {
                InstanceID newTaskExecutorId = TaskExecutorManagerTest.createAndRegisterTaskExecutor(taskExecutorManager, 1, ResourceProfile.ANY);
                Assert.assertEquals((long)1L, (long)taskExecutorManager.getNumberRegisteredSlots());
                return newTaskExecutorId;
            }, mainThreadExecutor).thenCombine((CompletionStage)releaseResourceFuture, (registeredInstance, releasedInstance) -> {
                Assert.assertThat((Object)registeredInstance, (Matcher)Matchers.is((Object)releasedInstance));
                Assert.assertEquals((long)1L, (long)taskExecutorManager.getNumberRegisteredSlots());
                return registeredInstance;
            })).thenAccept(taskExecutorId -> {
                taskExecutorManager.unregisterTaskExecutor(taskExecutorId);
                Assert.assertEquals((long)0L, (long)taskExecutorManager.getNumberRegisteredSlots());
            })).get();
        }
    }

    @Test
    public void testTimeoutForUnusedTaskManager() throws Exception {
        WorkerResourceSpec workerResourceSpec = new WorkerResourceSpec.Builder().setCpuCores(1.0).build();
        ResourceProfile resourceProfile = ResourceProfile.newBuilder().setCpuCores(1.0).build();
        Time taskManagerTimeout = Time.milliseconds((long)50L);
        CompletableFuture releaseResourceFuture = new CompletableFuture();
        TestingResourceActions resourceManagerActions = new TestingResourceActionsBuilder().setReleaseResourceConsumer((instanceID, e) -> releaseResourceFuture.complete(instanceID)).build();
        ScheduledExecutorService mainThreadExecutor = TestingUtils.defaultExecutor();
        try (TaskExecutorManager taskExecutorManager = TaskExecutorManagerTest.createTaskExecutorManagerBuilder().setTaskManagerTimeout(taskManagerTimeout).setDefaultWorkerResourceSpec(workerResourceSpec).setResourceActions(resourceManagerActions).setMainThreadExecutor(mainThreadExecutor).createTaskExecutorManager();){
            ((CompletableFuture)CompletableFuture.supplyAsync(() -> {
                taskExecutorManager.allocateWorker(resourceProfile);
                InstanceID taskExecutorId = TaskExecutorManagerTest.createAndRegisterTaskExecutor(taskExecutorManager, 1, resourceProfile);
                taskExecutorManager.occupySlot(taskExecutorId);
                taskExecutorManager.freeSlot(taskExecutorId);
                return taskExecutorId;
            }, mainThreadExecutor).thenAcceptBoth((CompletionStage)releaseResourceFuture, (registeredInstance, releasedInstance) -> Assert.assertThat((Object)registeredInstance, (Matcher)Matchers.is((Object)releasedInstance)))).get();
        }
    }

    @Test
    public void testWorkerOnlyAllocatedIfRequestedSlotCouldBeFulfilled() {
        boolean numCoresPerWorker = true;
        WorkerResourceSpec workerResourceSpec = new WorkerResourceSpec.Builder().setCpuCores(1.0).build();
        ResourceProfile requestedProfile = ResourceProfile.newBuilder().setCpuCores(2.0).build();
        AtomicInteger resourceRequests = new AtomicInteger(0);
        TestingResourceActions resourceActions = TaskExecutorManagerTest.createResourceActionsBuilder().setAllocateResourceFunction(ignored -> {
            resourceRequests.incrementAndGet();
            return true;
        }).build();
        try (TaskExecutorManager taskExecutorManager = TaskExecutorManagerTest.createTaskExecutorManagerBuilder().setDefaultWorkerResourceSpec(workerResourceSpec).setNumSlotsPerWorker(1).setMaxNumSlots(1).setResourceActions(resourceActions).createTaskExecutorManager();){
            Assert.assertThat(taskExecutorManager.allocateWorker(requestedProfile).orElse(null), (Matcher)Matchers.nullValue());
            Assert.assertThat((Object)resourceRequests.get(), (Matcher)Matchers.is((Object)0));
        }
    }

    @Test
    public void testMaxSlotLimitAllocateWorker() {
        boolean numberSlots = true;
        boolean maxSlotNum = true;
        AtomicInteger resourceRequests = new AtomicInteger(0);
        TestingResourceActions resourceActions = TaskExecutorManagerTest.createResourceActionsBuilder().setAllocateResourceFunction(ignored -> {
            resourceRequests.incrementAndGet();
            return true;
        }).build();
        try (TaskExecutorManager taskExecutorManager = TaskExecutorManagerTest.createTaskExecutorManagerBuilder().setNumSlotsPerWorker(1).setMaxNumSlots(1).setResourceActions(resourceActions).createTaskExecutorManager();){
            Assert.assertThat((Object)resourceRequests.get(), (Matcher)Matchers.is((Object)0));
            taskExecutorManager.allocateWorker(ResourceProfile.UNKNOWN);
            Assert.assertThat((Object)resourceRequests.get(), (Matcher)Matchers.is((Object)1));
            taskExecutorManager.allocateWorker(ResourceProfile.UNKNOWN);
            Assert.assertThat((Object)resourceRequests.get(), (Matcher)Matchers.is((Object)1));
        }
    }

    @Test
    public void testMaxSlotLimitRegisterWorker() throws Exception {
        boolean numberSlots = true;
        boolean maxSlotNum = true;
        CompletableFuture releasedResourceFuture = new CompletableFuture();
        TestingResourceActions resourceActions = TaskExecutorManagerTest.createResourceActionsBuilder().setReleaseResourceConsumer((instanceID, e) -> releasedResourceFuture.complete(instanceID)).build();
        try (TaskExecutorManager taskExecutorManager = TaskExecutorManagerTest.createTaskExecutorManagerBuilder().setNumSlotsPerWorker(1).setMaxNumSlots(1).setResourceActions(resourceActions).createTaskExecutorManager();){
            TaskExecutorManagerTest.createAndRegisterTaskExecutor(taskExecutorManager, 1, ResourceProfile.ANY);
            InstanceID rejectedTaskExecutorId = TaskExecutorManagerTest.createAndRegisterTaskExecutor(taskExecutorManager, 1, ResourceProfile.ANY);
            Assert.assertThat(releasedResourceFuture.get(), (Matcher)Matchers.is((Object)rejectedTaskExecutorId));
        }
    }

    @Test
    public void testGetResourceOverview() {
        ResourceProfile resourceProfile1 = ResourceProfile.fromResources((double)1.0, (int)10);
        ResourceProfile resourceProfile2 = ResourceProfile.fromResources((double)2.0, (int)20);
        try (TaskExecutorManager taskExecutorManager = TaskExecutorManagerTest.createTaskExecutorManagerBuilder().setMaxNumSlots(4).createTaskExecutorManager();){
            InstanceID instanceId1 = TaskExecutorManagerTest.createAndRegisterTaskExecutor(taskExecutorManager, 2, resourceProfile1);
            InstanceID instanceId2 = TaskExecutorManagerTest.createAndRegisterTaskExecutor(taskExecutorManager, 2, resourceProfile2);
            taskExecutorManager.occupySlot(instanceId1);
            taskExecutorManager.occupySlot(instanceId2);
            Assert.assertThat((Object)taskExecutorManager.getTotalFreeResources(), (Matcher)Matchers.equalTo((Object)resourceProfile1.merge(resourceProfile2)));
            Assert.assertThat((Object)taskExecutorManager.getTotalFreeResourcesOf(instanceId1), (Matcher)Matchers.equalTo((Object)resourceProfile1));
            Assert.assertThat((Object)taskExecutorManager.getTotalFreeResourcesOf(instanceId2), (Matcher)Matchers.equalTo((Object)resourceProfile2));
            Assert.assertThat((Object)taskExecutorManager.getTotalRegisteredResources(), (Matcher)Matchers.equalTo((Object)resourceProfile1.merge(resourceProfile2).multiply(2)));
            Assert.assertThat((Object)taskExecutorManager.getTotalRegisteredResourcesOf(instanceId1), (Matcher)Matchers.equalTo((Object)resourceProfile1.multiply(2)));
            Assert.assertThat((Object)taskExecutorManager.getTotalRegisteredResourcesOf(instanceId2), (Matcher)Matchers.equalTo((Object)resourceProfile2.multiply(2)));
        }
    }

    private static TaskExecutorManagerBuilder createTaskExecutorManagerBuilder() {
        return new TaskExecutorManagerBuilder().setResourceActions(TaskExecutorManagerTest.createResourceActionsBuilder().build());
    }

    private static TestingResourceActionsBuilder createResourceActionsBuilder() {
        return new TestingResourceActionsBuilder().setAllocateResourceFunction(ignored -> true);
    }

    private static InstanceID createAndRegisterTaskExecutor(TaskExecutorManager taskExecutorManager, int numSlots, ResourceProfile resourceProfile) {
        TaskExecutorConnection taskExecutorConnection = TaskExecutorManagerTest.createTaskExecutorConnection();
        List slotStatuses = IntStream.range(0, numSlots).mapToObj(slotNumber -> new SlotStatus(new SlotID(taskExecutorConnection.getResourceID(), slotNumber), resourceProfile)).collect(Collectors.toList());
        SlotReport slotReport = new SlotReport(slotStatuses);
        taskExecutorManager.registerTaskManager(taskExecutorConnection, slotReport, resourceProfile.multiply(numSlots), resourceProfile);
        return taskExecutorConnection.getInstanceID();
    }

    private static TaskExecutorConnection createTaskExecutorConnection() {
        return new TaskExecutorConnection(ResourceID.generate(), (TaskExecutorGateway)new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());
    }
}

