/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
import org.apache.flink.runtime.jobmaster.slotpool.TestingSlotPoolImpl;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class SlotPoolBatchSlotRequestTest
extends TestLogger {
    private static final ResourceProfile resourceProfile = ResourceProfile.fromResources((double)1.0, (int)1024);
    private static final ResourceProfile smallerResourceProfile = ResourceProfile.fromResources((double)0.5, (int)512);
    public static final CompletableFuture[] COMPLETABLE_FUTURES_EMPTY_ARRAY = new CompletableFuture[0];
    private static ScheduledExecutorService singleThreadScheduledExecutorService;
    private static ComponentMainThreadExecutor mainThreadExecutor;

    @BeforeClass
    public static void setupClass() {
        singleThreadScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadScheduledExecutorService);
    }

    @AfterClass
    public static void teardownClass() {
        if (singleThreadScheduledExecutorService != null) {
            singleThreadScheduledExecutorService.shutdownNow();
        }
    }

    @Test
    public void testPendingBatchSlotRequestTimeout() throws Exception {
        try (TestingSlotPoolImpl slotPool = this.createAndSetUpSlotPool(mainThreadExecutor, null, Time.milliseconds((long)2L));){
            CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, mainThreadExecutor, ResourceProfile.UNKNOWN);
            try {
                slotFuture.get();
                Assert.fail((String)"Expected that slot future times out.");
            }
            catch (ExecutionException ee) {
                MatcherAssert.assertThat((Object)ExceptionUtils.stripExecutionException((Throwable)ee), (Matcher)Matchers.instanceOf(TimeoutException.class));
            }
        }
    }

    @Test
    public void testPendingBatchSlotRequestDoesNotTimeoutIfFulfillingSlotExists() throws Exception {
        Time batchSlotTimeout = Time.milliseconds((long)2L);
        ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
        ManualClock clock = new ManualClock();
        try (TestingSlotPoolImpl slotPool = this.createAndSetUpSlotPool(directMainThreadExecutor, null, batchSlotTimeout, (Clock)clock);){
            SlotPoolUtils.offerSlots(slotPool, directMainThreadExecutor, Arrays.asList(resourceProfile, smallerResourceProfile));
            CompletableFuture<PhysicalSlot> firstSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, directMainThreadExecutor, resourceProfile);
            CompletableFuture<PhysicalSlot> secondSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, directMainThreadExecutor, ResourceProfile.UNKNOWN);
            CompletableFuture<PhysicalSlot> thirdSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, directMainThreadExecutor, smallerResourceProfile);
            List<CompletableFuture> slotFutures = Arrays.asList(firstSlotFuture, secondSlotFuture, thirdSlotFuture);
            this.advanceTimeAndTriggerCheckBatchSlotTimeout(slotPool, clock, batchSlotTimeout);
            for (CompletableFuture slotFuture : slotFutures) {
                MatcherAssert.assertThat((Object)slotFuture.isDone(), (Matcher)Matchers.is((Object)false));
            }
        }
    }

    @Test
    public void testPendingBatchSlotRequestDoesNotFailIfAllocationFails() throws Exception {
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        CompletableFuture allocationIdFuture = new CompletableFuture();
        testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));
        ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
        Time batchSlotTimeout = Time.milliseconds((long)1000L);
        try (TestingSlotPoolImpl slotPool = this.createAndSetUpSlotPool(directMainThreadExecutor, testingResourceManagerGateway, batchSlotTimeout);){
            CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, directMainThreadExecutor, resourceProfile);
            SlotPoolUtils.failAllocation(slotPool, directMainThreadExecutor, (AllocationID)allocationIdFuture.get(), (Exception)new FlinkException("Failed request"));
            MatcherAssert.assertThat((Object)slotFuture.isDone(), (Matcher)Matchers.is((Object)false));
        }
    }

    @Test
    public void testPendingBatchSlotRequestFailsIfAllocationFailsUnfulfillably() throws Exception {
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        CompletableFuture allocationIdFuture = new CompletableFuture();
        testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId()));
        ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
        try (TestingSlotPoolImpl slotPool = new SlotPoolBuilder(directMainThreadExecutor).setResourceManagerGateway(testingResourceManagerGateway).build();){
            CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, directMainThreadExecutor, resourceProfile);
            SlotPoolUtils.failAllocation(slotPool, directMainThreadExecutor, (AllocationID)allocationIdFuture.get(), (Exception)new UnfulfillableSlotRequestException(new AllocationID(), ResourceProfile.UNKNOWN));
            MatcherAssert.assertThat((Object)slotFuture.isCompletedExceptionally(), (Matcher)Matchers.is((Object)true));
        }
    }

    @Test
    public void testPendingBatchSlotRequestDoesNotFailIfRMRequestFails() throws Exception {
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setRequestSlotFuture(FutureUtils.completedExceptionally((Throwable)new FlinkException("Failed request")));
        ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
        Time batchSlotTimeout = Time.milliseconds((long)1000L);
        try (TestingSlotPoolImpl slotPool = this.createAndSetUpSlotPool(directMainThreadExecutor, testingResourceManagerGateway, batchSlotTimeout);){
            CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, directMainThreadExecutor, resourceProfile);
            MatcherAssert.assertThat((Object)slotFuture.isDone(), (Matcher)Matchers.is((Object)false));
        }
    }

    @Test
    public void testPendingBatchSlotRequestFailsIfRMRequestFailsUnfulfillably() throws Exception {
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setRequestSlotFuture(FutureUtils.completedExceptionally((Throwable)new UnfulfillableSlotRequestException(new AllocationID(), ResourceProfile.UNKNOWN)));
        ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
        try (TestingSlotPoolImpl slotPool = new SlotPoolBuilder(directMainThreadExecutor).setResourceManagerGateway(testingResourceManagerGateway).build();){
            CompletableFuture<PhysicalSlot> slotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, directMainThreadExecutor, resourceProfile);
            MatcherAssert.assertThat((Object)slotFuture.isCompletedExceptionally(), (Matcher)Matchers.is((Object)true));
        }
    }

    @Test
    public void testPendingBatchSlotRequestTimeoutAfterSlotRelease() throws Exception {
        ComponentMainThreadExecutor directMainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
        ManualClock clock = new ManualClock();
        Time batchSlotTimeout = Time.milliseconds((long)1000L);
        try (TestingSlotPoolImpl slotPool = this.createAndSetUpSlotPool(directMainThreadExecutor, null, batchSlotTimeout, (Clock)clock);){
            ResourceID taskManagerResourceId = SlotPoolUtils.offerSlots(slotPool, directMainThreadExecutor, Arrays.asList(resourceProfile, smallerResourceProfile));
            CompletableFuture<PhysicalSlot> firstSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, directMainThreadExecutor, resourceProfile);
            CompletableFuture<PhysicalSlot> secondSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, directMainThreadExecutor, ResourceProfile.UNKNOWN);
            CompletableFuture<PhysicalSlot> thirdSlotFuture = SlotPoolUtils.requestNewAllocatedBatchSlot((SlotPool)slotPool, directMainThreadExecutor, smallerResourceProfile);
            List<CompletableFuture> slotFutures = Arrays.asList(firstSlotFuture, secondSlotFuture, thirdSlotFuture);
            this.advanceTimeAndTriggerCheckBatchSlotTimeout(slotPool, clock, batchSlotTimeout);
            MatcherAssert.assertThat((Object)CompletableFuture.anyOf(slotFutures.toArray(COMPLETABLE_FUTURES_EMPTY_ARRAY)).isDone(), (Matcher)Matchers.is((Object)false));
            SlotPoolUtils.releaseTaskManager(slotPool, directMainThreadExecutor, taskManagerResourceId);
            this.advanceTimeAndTriggerCheckBatchSlotTimeout(slotPool, clock, batchSlotTimeout);
            for (CompletableFuture slotFuture : slotFutures) {
                MatcherAssert.assertThat((Object)slotFuture.isCompletedExceptionally(), (Matcher)Matchers.is((Object)true));
                try {
                    slotFuture.get();
                    Assert.fail((String)"Expected that the slot future times out.");
                }
                catch (ExecutionException ee) {
                    MatcherAssert.assertThat((Object)ExceptionUtils.stripExecutionException((Throwable)ee), (Matcher)Matchers.instanceOf(TimeoutException.class));
                }
            }
        }
    }

    private void advanceTimeAndTriggerCheckBatchSlotTimeout(TestingSlotPoolImpl slotPool, ManualClock clock, Time batchSlotTimeout) {
        slotPool.triggerCheckBatchSlotTimeout();
        clock.advanceTime(batchSlotTimeout.toMilliseconds() + 1L, TimeUnit.MILLISECONDS);
        slotPool.triggerCheckBatchSlotTimeout();
    }

    private TestingSlotPoolImpl createAndSetUpSlotPool(ComponentMainThreadExecutor componentMainThreadExecutor, @Nullable ResourceManagerGateway resourceManagerGateway, Time batchSlotTimeout) throws Exception {
        return new SlotPoolBuilder(componentMainThreadExecutor).setResourceManagerGateway(resourceManagerGateway).setBatchSlotTimeout(batchSlotTimeout).build();
    }

    private TestingSlotPoolImpl createAndSetUpSlotPool(ComponentMainThreadExecutor componentMainThreadExecutor, @Nullable ResourceManagerGateway resourceManagerGateway, Time batchSlotTimeout, Clock clock) throws Exception {
        return new SlotPoolBuilder(componentMainThreadExecutor).setResourceManagerGateway(resourceManagerGateway).setBatchSlotTimeout(batchSlotTimeout).setClock(clock).build();
    }
}

