/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPoolTest;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPoolBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotInfoWithUtilization;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils;
import org.apache.flink.runtime.jobmaster.slotpool.TestingPhysicalSlotPayload;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.QuadConsumer;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert;
import org.junit.Test;

public class DefaultDeclarativeSlotPoolTest
extends TestLogger {
    private static final ResourceProfile RESOURCE_PROFILE_1 = ResourceProfile.newBuilder().setCpuCores(1.7).build();
    private static final ResourceProfile RESOURCE_PROFILE_2 = ResourceProfile.newBuilder().setManagedMemoryMB(100).build();

    @Test
    public void testIncreasingResourceRequirementsWillSendResourceRequirementNotification() throws InterruptedException {
        NewResourceRequirementsService requirementsListener = new NewResourceRequirementsService();
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPool(requirementsListener);
        ResourceCounter increment1 = ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE_1, (int)1);
        ResourceCounter increment2 = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        slotPool.increaseResourceRequirementsBy(increment1);
        slotPool.increaseResourceRequirementsBy(increment2);
        Assert.assertThat((Object)requirementsListener.takeResourceRequirements(), (Matcher)CoreMatchers.is(DefaultDeclarativeSlotPoolTest.toResourceRequirements(increment1)));
        ResourceCounter totalResources = increment1.add(increment2);
        Assert.assertThat((Object)requirementsListener.takeResourceRequirements(), (Matcher)CoreMatchers.is(DefaultDeclarativeSlotPoolTest.toResourceRequirements(totalResources)));
        Assert.assertThat((Object)requirementsListener.hasNextResourceRequirements(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void testDecreasingResourceRequirementsWillSendResourceRequirementNotification() throws InterruptedException {
        NewResourceRequirementsService requirementsListener = new NewResourceRequirementsService();
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPool(requirementsListener);
        ResourceCounter increment = ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE_1, (int)3);
        slotPool.increaseResourceRequirementsBy(increment);
        requirementsListener.takeResourceRequirements();
        ResourceCounter decrement = ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE_1, (int)2);
        slotPool.decreaseResourceRequirementsBy(decrement);
        ResourceCounter totalResources = increment.subtract(decrement);
        Assert.assertThat((Object)requirementsListener.takeResourceRequirements(), (Matcher)CoreMatchers.is(DefaultDeclarativeSlotPoolTest.toResourceRequirements(totalResources)));
        Assert.assertThat((Object)requirementsListener.hasNextResourceRequirements(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void testGetResourceRequirements() {
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder().build();
        Assert.assertThat((Object)slotPool.getResourceRequirements(), (Matcher)CoreMatchers.is(DefaultDeclarativeSlotPoolTest.toResourceRequirements(ResourceCounter.empty())));
        ResourceCounter resourceRequirements = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        slotPool.increaseResourceRequirementsBy(resourceRequirements);
        Assert.assertThat((Object)slotPool.getResourceRequirements(), (Matcher)CoreMatchers.is(DefaultDeclarativeSlotPoolTest.toResourceRequirements(resourceRequirements)));
    }

    @Test
    public void testOfferSlots() throws InterruptedException {
        NewSlotsService notifyNewSlots = new NewSlotsService();
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPoolWithNewSlotsListener(notifyNewSlots);
        ResourceCounter resourceRequirements = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        slotPool.increaseResourceRequirementsBy(resourceRequirements);
        Collection<SlotOffer> slotOffers = DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(resourceRequirements);
        Collection<SlotOffer> acceptedSlots = SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)slotPool, slotOffers);
        Assert.assertThat(acceptedSlots, (Matcher)Matchers.containsInAnyOrder((Object[])slotOffers.toArray()));
        Collection<PhysicalSlot> newSlots = DefaultDeclarativeSlotPoolTest.drainNewSlotService(notifyNewSlots);
        Assert.assertThat(newSlots, (Matcher)Matchers.containsInAnyOrder((Collection)slotOffers.stream().map(DefaultDeclarativeSlotPoolTest::matchesSlotOffer).collect(Collectors.toList())));
        Assert.assertThat((Object)slotPool.getAllSlotsInformation(), (Matcher)Matchers.containsInAnyOrder((Collection)newSlots.stream().map(DefaultAllocatedSlotPoolTest::matchesPhysicalSlot).collect(Collectors.toList())));
    }

    @Test
    public void testDuplicateSlotOfferings() throws InterruptedException {
        NewSlotsService notifyNewSlots = new NewSlotsService();
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPoolWithNewSlotsListener(notifyNewSlots);
        ResourceCounter resourceRequirements = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        slotPool.increaseResourceRequirementsBy(resourceRequirements);
        Collection<SlotOffer> slotOffers = DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(resourceRequirements);
        SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)slotPool, slotOffers);
        DefaultDeclarativeSlotPoolTest.drainNewSlotService(notifyNewSlots);
        Collection<SlotOffer> acceptedSlots = SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)slotPool, slotOffers);
        Assert.assertThat(acceptedSlots, (Matcher)Matchers.containsInAnyOrder((Object[])slotOffers.toArray()));
        Assert.assertFalse((boolean)notifyNewSlots.hasNextNewSlots());
    }

    @Test
    public void testOfferingTooManySlots() {
        NewSlotsService notifyNewSlots = new NewSlotsService();
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPoolWithNewSlotsListener(notifyNewSlots);
        ResourceCounter resourceRequirements = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        slotPool.increaseResourceRequirementsBy(resourceRequirements);
        ResourceCounter increasedRequirements = resourceRequirements.add(RESOURCE_PROFILE_1, 2);
        Collection<SlotOffer> slotOffers = DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(increasedRequirements);
        Collection<SlotOffer> acceptedSlots = SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)slotPool, slotOffers);
        Map resourceProfileCount = acceptedSlots.stream().map(SlotOffer::getResourceProfile).collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
        for (Map.Entry resourceCount : resourceRequirements.getResourcesWithCount()) {
            Assert.assertThat((Object)resourceProfileCount.getOrDefault(resourceCount.getKey(), 0L), (Matcher)CoreMatchers.is((Object)((Integer)resourceCount.getValue())));
        }
    }

    @Test
    public void testReleaseSlotsRemovesSlots() throws InterruptedException {
        NewResourceRequirementsService notifyNewResourceRequirements = new NewResourceRequirementsService();
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPool(notifyNewResourceRequirements);
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool(slotPool, DefaultDeclarativeSlotPoolTest.createResourceRequirements(), taskManagerLocation);
        notifyNewResourceRequirements.takeResourceRequirements();
        slotPool.releaseSlots(taskManagerLocation.getResourceID(), (Exception)new FlinkException("Test failure"));
        Assert.assertThat((Object)slotPool.getAllSlotsInformation(), (Matcher)CoreMatchers.is((Matcher)Matchers.empty()));
    }

    @Test
    public void testReleaseSlotsReturnsSlot() {
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder().build();
        ResourceCounter resourceRequirements = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer();
        TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction(freeSlotConsumer).createTestingTaskExecutorGateway();
        Collection<SlotOffer> slotOffers = DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool(slotPool, resourceRequirements, taskManagerLocation, testingTaskExecutorGateway);
        slotPool.releaseSlots(taskManagerLocation.getResourceID(), (Exception)new FlinkException("Test failure"));
        Collection freedSlots = freeSlotConsumer.drainFreedSlots();
        Assert.assertThat((Object)freedSlots, (Matcher)Matchers.containsInAnyOrder((Object[])slotOffers.stream().map(SlotOffer::getAllocationId).toArray()));
    }

    @Test
    public void testReleaseSlotsOnlyReturnsFulfilledRequirementsOfReservedSlots() {
        DefaultDeclarativeSlotPoolTest.withSlotPoolContainingOneTaskManagerWithTwoSlotsWithUniqueResourceProfiles((QuadConsumer<DefaultDeclarativeSlotPool, SlotOffer, SlotOffer, TaskManagerLocation>)((QuadConsumer)(slotPool, freeSlot, slotToReserve, taskManagerLocation) -> {
            slotPool.reserveFreeSlot(slotToReserve.getAllocationId(), slotToReserve.getResourceProfile()).tryAssignPayload((PhysicalSlot.Payload)new TestingPhysicalSlotPayload());
            ResourceCounter fulfilledRequirements = slotPool.releaseSlots(taskManagerLocation.getResourceID(), (Exception)new FlinkException("Test failure"));
            Assert.assertThat((Object)fulfilledRequirements.getResourceCount(freeSlot.getResourceProfile()), (Matcher)CoreMatchers.is((Object)0));
            Assert.assertThat((Object)fulfilledRequirements.getResourceCount(slotToReserve.getResourceProfile()), (Matcher)CoreMatchers.is((Object)1));
        }));
    }

    @Test
    public void testReleaseSlotOnlyReturnsFulfilledRequirementsOfReservedSlots() {
        DefaultDeclarativeSlotPoolTest.withSlotPoolContainingOneTaskManagerWithTwoSlotsWithUniqueResourceProfiles((QuadConsumer<DefaultDeclarativeSlotPool, SlotOffer, SlotOffer, TaskManagerLocation>)((QuadConsumer)(slotPool, freeSlot, slotToReserve, ignored) -> {
            slotPool.reserveFreeSlot(slotToReserve.getAllocationId(), slotToReserve.getResourceProfile()).tryAssignPayload((PhysicalSlot.Payload)new TestingPhysicalSlotPayload());
            ResourceCounter fulfilledRequirementsOfFreeSlot = slotPool.releaseSlot(freeSlot.getAllocationId(), (Exception)new FlinkException("Test failure"));
            ResourceCounter fulfilledRequirementsOfReservedSlot = slotPool.releaseSlot(slotToReserve.getAllocationId(), (Exception)new FlinkException("Test failure"));
            Assert.assertThat((Object)fulfilledRequirementsOfFreeSlot.getResources(), (Matcher)CoreMatchers.is((Matcher)Matchers.empty()));
            Assert.assertThat((Object)fulfilledRequirementsOfReservedSlot.getResourceCount(slotToReserve.getResourceProfile()), (Matcher)CoreMatchers.is((Object)1));
        }));
    }

    private static void withSlotPoolContainingOneTaskManagerWithTwoSlotsWithUniqueResourceProfiles(QuadConsumer<DefaultDeclarativeSlotPool, SlotOffer, SlotOffer, TaskManagerLocation> test) {
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder().build();
        ResourceCounter resourceRequirements = ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE_1, (int)1).add(RESOURCE_PROFILE_2, 1);
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer();
        TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction(freeSlotConsumer).createTestingTaskExecutorGateway();
        Iterator<SlotOffer> slotOffers = DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool(slotPool, resourceRequirements, taskManagerLocation, testingTaskExecutorGateway).iterator();
        SlotOffer slot1 = slotOffers.next();
        SlotOffer slot2 = slotOffers.next();
        test.accept((Object)slotPool, (Object)slot1, (Object)slot2, (Object)taskManagerLocation);
    }

    @Test
    public void testReleaseSlotDecreasesFulfilledResourceRequirements() throws InterruptedException {
        NewSlotsService notifyNewSlots = new NewSlotsService();
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPoolWithNewSlotsListener(notifyNewSlots);
        ResourceCounter resourceRequirements = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool(slotPool, resourceRequirements, null);
        Collection physicalSlots = notifyNewSlots.takeNewSlots();
        PhysicalSlot physicalSlot = (PhysicalSlot)physicalSlots.iterator().next();
        slotPool.releaseSlot(physicalSlot.getAllocationId(), (Exception)new FlinkException("Test failure"));
        ResourceCounter finalResourceRequirements = resourceRequirements.subtract(physicalSlot.getResourceProfile(), 1);
        Assert.assertThat((Object)slotPool.getFulfilledResourceRequirements(), (Matcher)CoreMatchers.is((Object)finalResourceRequirements));
    }

    @Test
    public void testReleaseSlotReturnsSlot() throws InterruptedException {
        NewSlotsService notifyNewSlots = new NewSlotsService();
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPoolWithNewSlotsListener(notifyNewSlots);
        ResourceCounter resourceRequirements = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer();
        TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction(freeSlotConsumer).createTestingTaskExecutorGateway();
        DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool(slotPool, resourceRequirements, new LocalTaskManagerLocation(), testingTaskExecutorGateway);
        Collection physicalSlots = notifyNewSlots.takeNewSlots();
        PhysicalSlot physicalSlot = (PhysicalSlot)physicalSlots.iterator().next();
        slotPool.releaseSlot(physicalSlot.getAllocationId(), (Exception)new FlinkException("Test failure"));
        AllocationID freedSlot = (AllocationID)Iterables.getOnlyElement((Iterable)freeSlotConsumer.drainFreedSlots());
        Assert.assertThat((Object)freedSlot, (Matcher)CoreMatchers.is((Object)physicalSlot.getAllocationId()));
    }

    @Test
    public void testReturnIdleSlotsAfterTimeout() {
        Time idleSlotTimeout = Time.seconds((long)10L);
        long offerTime = 0L;
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder().setIdleSlotTimeout(idleSlotTimeout).build();
        ResourceCounter resourceRequirements = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        FreeSlotConsumer freeSlotConsumer = new FreeSlotConsumer();
        TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setFreeSlotFunction(freeSlotConsumer).createTestingTaskExecutorGateway();
        Collection<SlotOffer> acceptedSlots = DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool(slotPool, resourceRequirements, new LocalTaskManagerLocation(), testingTaskExecutorGateway);
        slotPool.decreaseResourceRequirementsBy(resourceRequirements);
        slotPool.releaseIdleSlots(0L + idleSlotTimeout.toMilliseconds());
        Collection freedSlots = freeSlotConsumer.drainFreedSlots();
        Assert.assertThat(acceptedSlots, (Matcher)CoreMatchers.is((Matcher)CoreMatchers.not((Matcher)Matchers.empty())));
        Assert.assertThat((Object)freedSlots, (Matcher)Matchers.containsInAnyOrder((Object[])acceptedSlots.stream().map(SlotOffer::getAllocationId).toArray()));
        this.assertNoAvailableAndRequiredResources(slotPool);
    }

    private void assertNoAvailableAndRequiredResources(DefaultDeclarativeSlotPool slotPool) {
        Assert.assertTrue((boolean)slotPool.getFulfilledResourceRequirements().isEmpty());
        Assert.assertTrue((boolean)slotPool.getResourceRequirements().isEmpty());
        Assert.assertThat((Object)slotPool.getAllSlotsInformation(), (Matcher)CoreMatchers.is((Matcher)Matchers.empty()));
    }

    @Test
    public void testOnlyReturnExcessIdleSlots() {
        Time idleSlotTimeout = Time.seconds((long)10L);
        long offerTime = 0L;
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolBuilder.builder().setIdleSlotTimeout(idleSlotTimeout).build();
        ResourceCounter resourceRequirements = DefaultDeclarativeSlotPoolTest.createResourceRequirements();
        Collection<SlotOffer> slotOffers = DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(resourceRequirements);
        slotPool.increaseResourceRequirementsBy(resourceRequirements);
        Collection<SlotOffer> acceptedSlots = SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)slotPool, slotOffers);
        ResourceCounter requiredResources = ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE_1, (int)1);
        ResourceCounter excessRequirements = resourceRequirements.subtract(requiredResources);
        slotPool.decreaseResourceRequirementsBy(excessRequirements);
        slotPool.releaseIdleSlots(0L + idleSlotTimeout.toMilliseconds());
        Assert.assertThat(acceptedSlots, (Matcher)CoreMatchers.is((Matcher)CoreMatchers.not((Matcher)Matchers.empty())));
        Assert.assertThat((Object)slotPool.getFulfilledResourceRequirements(), (Matcher)CoreMatchers.is((Object)requiredResources));
    }

    @Test
    public void testFreedSlotWillBeUsedToFulfillOutstandingResourceRequirementsOfSameProfile() throws InterruptedException {
        NewSlotsService notifyNewSlots = new NewSlotsService();
        DefaultDeclarativeSlotPool slotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPoolWithNewSlotsListener(notifyNewSlots);
        ResourceCounter initialRequirements = ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE_1, (int)1);
        DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool(slotPool, initialRequirements, null);
        Collection<PhysicalSlot> newSlots = DefaultDeclarativeSlotPoolTest.drainNewSlotService(notifyNewSlots);
        PhysicalSlot newSlot = (PhysicalSlot)Iterables.getOnlyElement(newSlots);
        slotPool.reserveFreeSlot(newSlot.getAllocationId(), RESOURCE_PROFILE_1);
        slotPool.freeReservedSlot(newSlot.getAllocationId(), null, 0L);
        Collection<PhysicalSlot> recycledSlots = DefaultDeclarativeSlotPoolTest.drainNewSlotService(notifyNewSlots);
        Assert.assertThat((Object)Iterables.getOnlyElement(recycledSlots), (Matcher)CoreMatchers.sameInstance((Object)newSlot));
        Collection<SlotOffer> newSlotOffers = DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(initialRequirements);
        Collection acceptedSlots = slotPool.offerSlots(newSlotOffers, (TaskManagerLocation)new LocalTaskManagerLocation(), SlotPoolTestUtils.createTaskManagerGateway(null), 0L);
        Assert.assertThat((Object)acceptedSlots, (Matcher)CoreMatchers.is((Matcher)Matchers.empty()));
        Assert.assertTrue((boolean)slotPool.calculateUnfulfilledResources().isEmpty());
    }

    @Test
    public void testFreedSlotWillRemainAssignedToMatchedResourceProfile() {
        DefaultDeclarativeSlotPool slotPool = new DefaultDeclarativeSlotPoolBuilder().build();
        ResourceProfile largeResourceProfile = ResourceProfile.newBuilder().setManagedMemoryMB(1024).build();
        ResourceProfile smallResourceProfile = ResourceProfile.newBuilder().setManagedMemoryMB(512).build();
        slotPool.increaseResourceRequirementsBy(ResourceCounter.withResource((ResourceProfile)largeResourceProfile, (int)1));
        SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)slotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)ResourceProfile.ANY, (int)1)));
        SlotInfoWithUtilization slot = (SlotInfoWithUtilization)slotPool.getFreeSlotsInformation().iterator().next();
        slotPool.reserveFreeSlot(slot.getAllocationId(), largeResourceProfile);
        Assert.assertThat((Object)slotPool.getFulfilledResourceRequirements().getResourceCount(largeResourceProfile), (Matcher)CoreMatchers.is((Object)1));
        slotPool.increaseResourceRequirementsBy(ResourceCounter.withResource((ResourceProfile)smallResourceProfile, (int)1));
        slotPool.decreaseResourceRequirementsBy(ResourceCounter.withResource((ResourceProfile)largeResourceProfile, (int)1));
        slotPool.freeReservedSlot(slot.getAllocationId(), null, 1L);
        Assert.assertThat((Object)slotPool.getFulfilledResourceRequirements().getResourceCount(largeResourceProfile), (Matcher)CoreMatchers.is((Object)1));
        Assert.assertThat((Object)slotPool.getFulfilledResourceRequirements().getResourceCount(smallResourceProfile), (Matcher)CoreMatchers.is((Object)0));
    }

    @Test
    public void testReserveFreeSlotForResourceUpdatesAvailableResourcesAndRequirements() {
        DefaultDeclarativeSlotPool slotPool = new DefaultDeclarativeSlotPoolBuilder().build();
        ResourceProfile largeResourceProfile = ResourceProfile.newBuilder().setManagedMemoryMB(1024).build();
        ResourceProfile smallResourceProfile = ResourceProfile.UNKNOWN;
        slotPool.increaseResourceRequirementsBy(ResourceCounter.withResource((ResourceProfile)largeResourceProfile, (int)1));
        SlotPoolTestUtils.offerSlots((DeclarativeSlotPool)slotPool, DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(ResourceCounter.withResource((ResourceProfile)largeResourceProfile, (int)1)));
        slotPool.increaseResourceRequirementsBy(ResourceCounter.withResource((ResourceProfile)smallResourceProfile, (int)1));
        SlotInfoWithUtilization largeSlot = slotPool.getFreeSlotsInformation().stream().filter(slot -> slot.getResourceProfile().equals((Object)largeResourceProfile)).findFirst().get();
        slotPool.reserveFreeSlot(largeSlot.getAllocationId(), smallResourceProfile);
        ResourceCounter availableResources = slotPool.getFulfilledResourceRequirements();
        Assert.assertThat((Object)availableResources.getResourceCount(smallResourceProfile), (Matcher)CoreMatchers.is((Object)1));
        Assert.assertThat((Object)availableResources.getResourceCount(largeResourceProfile), (Matcher)CoreMatchers.is((Object)0));
        Collection currentResourceRequirements = slotPool.getResourceRequirements();
        Assert.assertThat((Object)currentResourceRequirements, (Matcher)CoreMatchers.hasItems((Object[])new ResourceRequirement[]{ResourceRequirement.create((ResourceProfile)largeResourceProfile, (int)2)}));
    }

    @Test
    public void testSetResourceRequirementsForInitialResourceRequirements() {
        DefaultDeclarativeSlotPool slotPool = new DefaultDeclarativeSlotPoolBuilder().build();
        ResourceCounter resourceRequirements = ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE_1, (int)2);
        slotPool.setResourceRequirements(resourceRequirements);
        Assert.assertThat((Object)slotPool.getResourceRequirements(), (Matcher)CoreMatchers.is(DefaultDeclarativeSlotPoolTest.toResourceRequirements(resourceRequirements)));
    }

    @Test
    public void testSetResourceRequirementsOverwritesPreviousValue() {
        DefaultDeclarativeSlotPool slotPool = new DefaultDeclarativeSlotPoolBuilder().build();
        slotPool.setResourceRequirements(ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE_1, (int)1));
        ResourceCounter resourceRequirements = ResourceCounter.withResource((ResourceProfile)RESOURCE_PROFILE_2, (int)1);
        slotPool.setResourceRequirements(resourceRequirements);
        Assert.assertThat((Object)slotPool.getResourceRequirements(), (Matcher)CoreMatchers.is(DefaultDeclarativeSlotPoolTest.toResourceRequirements(resourceRequirements)));
    }

    @Nonnull
    private static ResourceCounter createResourceRequirements() {
        HashMap<ResourceProfile, Integer> requirements = new HashMap<ResourceProfile, Integer>();
        requirements.put(RESOURCE_PROFILE_1, 2);
        requirements.put(RESOURCE_PROFILE_2, 1);
        return ResourceCounter.withResources(requirements);
    }

    @Nonnull
    public static Collection<SlotOffer> createSlotOffersForResourceRequirements(ResourceCounter resourceRequirements) {
        ArrayList<SlotOffer> slotOffers = new ArrayList<SlotOffer>();
        int slotIndex = 0;
        for (Map.Entry resourceWithCount : resourceRequirements.getResourcesWithCount()) {
            for (int i = 0; i < (Integer)resourceWithCount.getValue(); ++i) {
                ResourceProfile slotProfile = (ResourceProfile)resourceWithCount.getKey();
                slotOffers.add(new SlotOffer(new AllocationID(), slotIndex++, slotProfile == ResourceProfile.UNKNOWN ? ResourceProfile.ANY : slotProfile));
            }
        }
        return slotOffers;
    }

    @Nonnull
    private static Collection<ResourceRequirement> toResourceRequirements(ResourceCounter resourceCounter) {
        return resourceCounter.getResourcesWithCount().stream().map(resourceCount -> ResourceRequirement.create((ResourceProfile)((ResourceProfile)resourceCount.getKey()), (int)((Integer)resourceCount.getValue()))).collect(Collectors.toList());
    }

    @Nonnull
    private static DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPool(NewResourceRequirementsService requirementsListener) {
        return DefaultDeclarativeSlotPoolBuilder.builder().setNotifyNewResourceRequirements(requirementsListener).build();
    }

    @Nonnull
    private static DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPoolWithNewSlotsListener(DeclarativeSlotPool.NewSlotsListener newSlotsListener) {
        DefaultDeclarativeSlotPool declarativeSlotPool = DefaultDeclarativeSlotPoolTest.createDefaultDeclarativeSlotPool();
        declarativeSlotPool.registerNewSlotsListener(newSlotsListener);
        return declarativeSlotPool;
    }

    @Nonnull
    private static DefaultDeclarativeSlotPool createDefaultDeclarativeSlotPool() {
        return DefaultDeclarativeSlotPoolBuilder.builder().build();
    }

    @Nonnull
    private static Collection<SlotOffer> increaseRequirementsAndOfferSlotsToSlotPool(DefaultDeclarativeSlotPool slotPool, ResourceCounter resourceRequirements, @Nullable LocalTaskManagerLocation taskManagerLocation) {
        return DefaultDeclarativeSlotPoolTest.increaseRequirementsAndOfferSlotsToSlotPool(slotPool, resourceRequirements, taskManagerLocation, null);
    }

    @Nonnull
    private static Collection<SlotOffer> increaseRequirementsAndOfferSlotsToSlotPool(DefaultDeclarativeSlotPool slotPool, ResourceCounter resourceRequirements, @Nullable LocalTaskManagerLocation taskManagerLocation, @Nullable TaskExecutorGateway taskExecutorGateway) {
        Collection<SlotOffer> slotOffers = DefaultDeclarativeSlotPoolTest.createSlotOffersForResourceRequirements(resourceRequirements);
        slotPool.increaseResourceRequirementsBy(resourceRequirements);
        return slotPool.offerSlots(slotOffers, (TaskManagerLocation)(taskManagerLocation == null ? new LocalTaskManagerLocation() : taskManagerLocation), SlotPoolTestUtils.createTaskManagerGateway(taskExecutorGateway), 0L);
    }

    @Nonnull
    private static Collection<PhysicalSlot> drainNewSlotService(NewSlotsService notifyNewSlots) throws InterruptedException {
        ArrayList<PhysicalSlot> newSlots = new ArrayList<PhysicalSlot>();
        while (notifyNewSlots.hasNextNewSlots()) {
            newSlots.addAll(notifyNewSlots.takeNewSlots());
        }
        return newSlots;
    }

    private static TypeSafeMatcher<PhysicalSlot> matchesSlotOffer(SlotOffer slotOffer) {
        return new PhysicalSlotSlotOfferMatcher(slotOffer);
    }

    private static class FreeSlotConsumer
    implements BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> {
        final BlockingQueue<AllocationID> freedSlots = new ArrayBlockingQueue<AllocationID>(10);

        private FreeSlotConsumer() {
        }

        @Override
        public CompletableFuture<Acknowledge> apply(AllocationID allocationID, Throwable throwable) {
            this.freedSlots.offer(allocationID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }

        private Collection<AllocationID> drainFreedSlots() {
            ArrayList<AllocationID> result = new ArrayList<AllocationID>();
            this.freedSlots.drainTo(result);
            return result;
        }
    }

    private static class PhysicalSlotSlotOfferMatcher
    extends TypeSafeMatcher<PhysicalSlot> {
        private final SlotOffer slotOffer;

        public PhysicalSlotSlotOfferMatcher(SlotOffer slotOffer) {
            this.slotOffer = slotOffer;
        }

        protected boolean matchesSafely(PhysicalSlot item) {
            return item.getAllocationId().equals((Object)this.slotOffer.getAllocationId()) && item.getResourceProfile().equals((Object)this.slotOffer.getResourceProfile()) && item.getPhysicalSlotNumber() == this.slotOffer.getSlotIndex();
        }

        public void describeTo(Description description) {
            description.appendText("SlotOffer: ");
            description.appendValueList("{", ",", "}", (Object[])new Serializable[]{this.slotOffer.getAllocationId(), this.slotOffer.getResourceProfile(), Integer.valueOf(this.slotOffer.getSlotIndex())});
        }
    }

    private static final class NewSlotsService
    implements DeclarativeSlotPool.NewSlotsListener {
        private final BlockingQueue<Collection<? extends PhysicalSlot>> physicalSlotsQueue = new ArrayBlockingQueue<Collection<? extends PhysicalSlot>>(2);

        private NewSlotsService() {
        }

        private Collection<? extends PhysicalSlot> takeNewSlots() throws InterruptedException {
            return this.physicalSlotsQueue.take();
        }

        private boolean hasNextNewSlots() {
            return !this.physicalSlotsQueue.isEmpty();
        }

        public void notifyNewSlotsAreAvailable(Collection<? extends PhysicalSlot> newlyAvailableSlots) {
            this.physicalSlotsQueue.offer(newlyAvailableSlots);
        }
    }

    private static final class NewResourceRequirementsService
    implements Consumer<Collection<ResourceRequirement>> {
        private final BlockingQueue<Collection<ResourceRequirement>> resourceRequirementsQueue = new ArrayBlockingQueue<Collection<ResourceRequirement>>(2);

        private NewResourceRequirementsService() {
        }

        @Override
        public void accept(Collection<ResourceRequirement> resourceRequirements) {
            this.resourceRequirementsQueue.offer(resourceRequirements);
        }

        private Collection<ResourceRequirement> takeResourceRequirements() throws InterruptedException {
            return this.resourceRequirementsQueue.take();
        }

        public boolean hasNextResourceRequirements() {
            return !this.resourceRequirementsQueue.isEmpty();
        }
    }
}

