/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager;

import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.DefaultJobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
import org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class ResourceManagerTaskExecutorTest
extends TestLogger {
    private static final Time TIMEOUT = Time.seconds((long)10L);
    private static final long HEARTBEAT_TIMEOUT = 5000L;
    private static TestingRpcService rpcService;
    private TestingTaskExecutorGateway taskExecutorGateway;
    private int dataPort = 1234;
    private int jmxPort = 23456;
    private HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L);
    private ResourceID taskExecutorResourceID;
    private ResourceID resourceManagerResourceID;
    private StandaloneResourceManager resourceManager;
    private ResourceManagerGateway rmGateway;
    private ResourceManagerGateway wronglyFencedGateway;
    private TestingFatalErrorHandler testingFatalErrorHandler;

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @Before
    public void setup() throws Exception {
        rpcService = new TestingRpcService();
        this.createAndRegisterTaskExecutorGateway();
        this.taskExecutorResourceID = ResourceID.generate();
        this.resourceManagerResourceID = ResourceID.generate();
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
        this.resourceManager = this.createAndStartResourceManager(rmLeaderElectionService, this.testingFatalErrorHandler);
        this.rmGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        this.wronglyFencedGateway = rpcService.connect(this.resourceManager.getAddress(), ResourceManagerId.generate(), ResourceManagerGateway.class).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
        this.grantLeadership(rmLeaderElectionService).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    private void createAndRegisterTaskExecutorGateway() {
        this.taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway();
        rpcService.registerGateway(this.taskExecutorGateway.getAddress(), (RpcGateway)this.taskExecutorGateway);
    }

    private CompletableFuture<UUID> grantLeadership(TestingLeaderElectionService leaderElectionService) {
        UUID leaderSessionId = UUID.randomUUID();
        return leaderElectionService.isLeader(leaderSessionId);
    }

    private StandaloneResourceManager createAndStartResourceManager(LeaderElectionService rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
        TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
        HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 5000L);
        highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
        SlotManagerImpl slotManager = SlotManagerBuilder.newBuilder().setScheduledExecutor(rpcService.getScheduledExecutor()).build();
        DefaultJobLeaderIdService jobLeaderIdService = new DefaultJobLeaderIdService((HighAvailabilityServices)highAvailabilityServices, rpcService.getScheduledExecutor(), Time.minutes((long)5L));
        StandaloneResourceManager resourceManager = new StandaloneResourceManager((RpcService)rpcService, this.resourceManagerResourceID, (HighAvailabilityServices)highAvailabilityServices, heartbeatServices, (SlotManager)slotManager, NoOpResourceManagerPartitionTracker::get, (JobLeaderIdService)jobLeaderIdService, new ClusterInformation("localhost", 1234), fatalErrorHandler, UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(), Time.minutes((long)5L), RpcUtils.INF_TIMEOUT, (Executor)ForkJoinPool.commonPool());
        resourceManager.start();
        return resourceManager;
    }

    @After
    public void teardown() throws Exception {
        if (this.resourceManager != null) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)this.resourceManager, (Time)TIMEOUT);
        }
        if (this.testingFatalErrorHandler != null && this.testingFatalErrorHandler.hasExceptionOccurred()) {
            this.testingFatalErrorHandler.rethrowError();
        }
    }

    @AfterClass
    public static void teardownClass() throws Exception {
        if (rpcService != null) {
            RpcUtils.terminateRpcService((RpcService)rpcService, (Time)TIMEOUT);
        }
    }

    @Test
    public void testRegisterTaskExecutor() throws Exception {
        CompletableFuture<RegistrationResponse> successfulFuture = this.registerTaskExecutor(this.rmGateway, this.taskExecutorGateway.getAddress());
        RegistrationResponse response = successfulFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
        Assert.assertTrue((boolean)(response instanceof TaskExecutorRegistrationSuccess));
        TaskManagerInfoWithSlots taskManagerInfoWithSlots = (TaskManagerInfoWithSlots)this.rmGateway.requestTaskManagerDetailsInfo(this.taskExecutorResourceID, TIMEOUT).get();
        Assert.assertThat((Object)taskManagerInfoWithSlots.getTaskManagerInfo().getResourceId(), (Matcher)Matchers.equalTo((Object)this.taskExecutorResourceID));
        CompletableFuture<RegistrationResponse> duplicateFuture = this.registerTaskExecutor(this.rmGateway, this.taskExecutorGateway.getAddress());
        RegistrationResponse duplicateResponse = duplicateFuture.get();
        Assert.assertTrue((boolean)(duplicateResponse instanceof TaskExecutorRegistrationSuccess));
        Assert.assertNotEquals((Object)((TaskExecutorRegistrationSuccess)response).getRegistrationId(), (Object)((TaskExecutorRegistrationSuccess)duplicateResponse).getRegistrationId());
        Assert.assertThat((Object)((ResourceOverview)this.rmGateway.requestResourceOverview(TIMEOUT).get()).getNumberTaskManagers(), (Matcher)Matchers.is((Object)1));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDelayedRegisterTaskExecutor() throws Exception {
        Time fastTimeout = Time.milliseconds((long)1L);
        try {
            OneShotLatch startConnection = new OneShotLatch();
            OneShotLatch finishConnection = new OneShotLatch();
            rpcService.setRpcGatewayFutureFunction(rpcGateway -> CompletableFuture.supplyAsync(() -> {
                startConnection.trigger();
                try {
                    finishConnection.await();
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                return rpcGateway;
            }, TestingUtils.defaultExecutor()));
            TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(this.taskExecutorGateway.getAddress(), this.taskExecutorResourceID, this.dataPort, this.jmxPort, this.hardwareDescription, new TaskExecutorMemoryConfiguration(Long.valueOf(1L), Long.valueOf(2L), Long.valueOf(3L), Long.valueOf(4L), Long.valueOf(5L), Long.valueOf(6L), Long.valueOf(7L), Long.valueOf(8L), Long.valueOf(9L), Long.valueOf(10L)), ResourceProfile.ZERO, ResourceProfile.ZERO);
            CompletableFuture firstFuture = this.rmGateway.registerTaskExecutor(taskExecutorRegistration, fastTimeout);
            try {
                firstFuture.get();
                Assert.fail((String)"Should have failed because connection to taskmanager is delayed beyond timeout");
            }
            catch (Exception e) {
                Throwable cause = ExceptionUtils.stripExecutionException((Throwable)e);
                Assert.assertThat((Object)cause, (Matcher)Matchers.instanceOf(TimeoutException.class));
                Assert.assertThat((Object)cause.getMessage(), (Matcher)Matchers.containsString((String)"ResourceManagerGateway.registerTaskExecutor"));
            }
            startConnection.await();
            rpcService.resetRpcGatewayFutureFunction();
            CompletableFuture secondFuture = this.rmGateway.registerTaskExecutor(taskExecutorRegistration, TIMEOUT);
            RegistrationResponse response = (RegistrationResponse)secondFuture.get();
            Assert.assertTrue((boolean)(response instanceof TaskExecutorRegistrationSuccess));
            SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(this.taskExecutorResourceID, 0), ResourceProfile.ANY));
            this.rmGateway.sendSlotReport(this.taskExecutorResourceID, ((TaskExecutorRegistrationSuccess)response).getRegistrationId(), slotReport, TIMEOUT).get();
            finishConnection.trigger();
            Thread.sleep(1L);
            TaskManagerInfoWithSlots taskManagerInfoWithSlots = (TaskManagerInfoWithSlots)this.rmGateway.requestTaskManagerDetailsInfo(this.taskExecutorResourceID, TIMEOUT).get();
            Assert.assertThat((Object)taskManagerInfoWithSlots.getTaskManagerInfo().getResourceId(), (Matcher)Matchers.equalTo((Object)this.taskExecutorResourceID));
            Assert.assertThat((Object)taskManagerInfoWithSlots.getTaskManagerInfo().getNumberSlots(), (Matcher)Matchers.equalTo((Object)1));
        }
        finally {
            rpcService.resetRpcGatewayFutureFunction();
        }
    }

    @Test
    public void testDisconnectTaskExecutor() throws Exception {
        RegistrationResponse registrationResponse = this.registerTaskExecutor(this.rmGateway, this.taskExecutorGateway.getAddress()).get();
        Assert.assertThat((Object)registrationResponse, (Matcher)Matchers.instanceOf(TaskExecutorRegistrationSuccess.class));
        InstanceID registrationId = ((TaskExecutorRegistrationSuccess)registrationResponse).getRegistrationId();
        int numberSlots = 10;
        Collection<SlotStatus> slots = this.createSlots(10);
        SlotReport slotReport = new SlotReport(slots);
        this.rmGateway.sendSlotReport(this.taskExecutorResourceID, registrationId, slotReport, TIMEOUT).get();
        ResourceOverview resourceOverview = (ResourceOverview)this.rmGateway.requestResourceOverview(TIMEOUT).get();
        Assert.assertThat((Object)resourceOverview.getNumberTaskManagers(), (Matcher)Matchers.is((Object)1));
        Assert.assertThat((Object)resourceOverview.getNumberRegisteredSlots(), (Matcher)Matchers.is((Object)10));
        this.rmGateway.disconnectTaskManager(this.taskExecutorResourceID, (Exception)((Object)new FlinkException("testDisconnectTaskExecutor")));
        ResourceOverview afterDisconnectResourceOverview = (ResourceOverview)this.rmGateway.requestResourceOverview(TIMEOUT).get();
        Assert.assertThat((Object)afterDisconnectResourceOverview.getNumberTaskManagers(), (Matcher)Matchers.is((Object)0));
        Assert.assertThat((Object)afterDisconnectResourceOverview.getNumberRegisteredSlots(), (Matcher)Matchers.is((Object)0));
    }

    private Collection<SlotStatus> createSlots(int numberSlots) {
        return IntStream.range(0, numberSlots).mapToObj(index -> new SlotStatus(new SlotID(this.taskExecutorResourceID, index), ResourceProfile.ANY)).collect(Collectors.toList());
    }

    @Test
    public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception {
        CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = this.registerTaskExecutor(this.wronglyFencedGateway, this.taskExecutorGateway.getAddress());
        try {
            unMatchedLeaderFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.fail((String)"Should have failed because we are using a wrongly fenced ResourceManagerGateway.");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(ExceptionUtils.stripExecutionException((Throwable)e) instanceof FencingTokenException));
        }
    }

    @Test
    public void testRegisterTaskExecutorFromInvalidAddress() throws Exception {
        String invalidAddress = "/taskExecutor2";
        CompletableFuture<RegistrationResponse> invalidAddressFuture = this.registerTaskExecutor(this.rmGateway, invalidAddress);
        Assert.assertTrue((boolean)(invalidAddressFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS) instanceof RegistrationResponse.Failure));
    }

    private CompletableFuture<RegistrationResponse> registerTaskExecutor(ResourceManagerGateway resourceManagerGateway, String taskExecutorAddress) {
        return resourceManagerGateway.registerTaskExecutor(new TaskExecutorRegistration(taskExecutorAddress, this.taskExecutorResourceID, this.dataPort, this.jmxPort, this.hardwareDescription, new TaskExecutorMemoryConfiguration(Long.valueOf(1L), Long.valueOf(2L), Long.valueOf(3L), Long.valueOf(4L), Long.valueOf(5L), Long.valueOf(6L), Long.valueOf(7L), Long.valueOf(8L), Long.valueOf(9L), Long.valueOf(10L)), ResourceProfile.ZERO, ResourceProfile.ZERO), TIMEOUT);
    }
}

