/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher.runner;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.DispatcherFactory;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.HistoryServerArchivist;
import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
import org.apache.flink.runtime.dispatcher.runner.DispatcherRunner;
import org.apache.flink.runtime.dispatcher.runner.DispatcherRunnerFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
import org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperRunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.jobmanager.JobGraphStoreFactory;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperDefaultDispatcherRunnerTest
extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperDefaultDispatcherRunnerTest.class);
    private static final Time TESTING_TIMEOUT = Time.seconds((long)10L);
    private static final Duration VERIFICATION_TIMEOUT = Duration.ofSeconds(10L);
    @ClassRule
    public static ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    @ClassRule
    public static TestingRpcServiceResource testingRpcServiceResource = new TestingRpcServiceResource();
    private BlobServer blobServer;
    private TestingFatalErrorHandler fatalErrorHandler;
    private File clusterHaStorageDir;
    private Configuration configuration;

    @Before
    public void setup() throws IOException {
        this.fatalErrorHandler = new TestingFatalErrorHandler();
        this.configuration = new Configuration();
        this.configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
        this.configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
        this.configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().getAbsolutePath());
        this.clusterHaStorageDir = new File(HighAvailabilityServicesUtils.getClusterHighAvailableStoragePath((Configuration)this.configuration).toString());
        this.blobServer = new BlobServer(this.configuration, (BlobStore)BlobUtils.createBlobStoreFromConfig((Configuration)this.configuration));
    }

    @After
    public void teardown() throws Exception {
        if (this.blobServer != null) {
            this.blobServer.close();
        }
        if (this.fatalErrorHandler != null) {
            this.fatalErrorHandler.rethrowError();
        }
    }

    @Test
    public void testResourceCleanupUnderLeadershipChange() throws Exception {
        TestingRpcService rpcService = testingRpcServiceResource.getTestingRpcService();
        TestingLeaderElectionService dispatcherLeaderElectionService = new TestingLeaderElectionService();
        CuratorFramework client = ZooKeeperUtils.startCuratorFramework((Configuration)this.configuration, (FatalErrorHandler)this.fatalErrorHandler).asCuratorFramework();
        try (TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServicesBuilder().setRunningJobsRegistry((RunningJobsRegistry)new ZooKeeperRunningJobsRegistry(client, this.configuration)).setDispatcherLeaderElectionService(dispatcherLeaderElectionService).setJobMasterLeaderRetrieverFunction(jobId -> ZooKeeperUtils.createLeaderRetrievalService((CuratorFramework)client)).build();){
            PartialDispatcherServices partialDispatcherServices = new PartialDispatcherServices(this.configuration, (HighAvailabilityServices)highAvailabilityServices, CompletableFuture::new, this.blobServer, (HeartbeatServices)new TestingHeartbeatServices(), UnregisteredMetricGroups::createUnregisteredJobManagerMetricGroup, (ExecutionGraphInfoStore)new MemoryExecutionGraphInfoStore(), (FatalErrorHandler)this.fatalErrorHandler, (HistoryServerArchivist)VoidHistoryServerArchivist.INSTANCE, null, (Executor)ForkJoinPool.commonPool());
            JobGraph jobGraph = this.createJobGraphWithBlobs();
            DefaultDispatcherRunnerFactory defaultDispatcherRunnerFactory = DefaultDispatcherRunnerFactory.createSessionRunner((DispatcherFactory)SessionDispatcherFactory.INSTANCE);
            try (DispatcherRunner dispatcherRunner = this.createDispatcherRunner(rpcService, dispatcherLeaderElectionService, () -> this.createZooKeeperJobGraphStore(client), partialDispatcherServices, (DispatcherRunnerFactory)defaultDispatcherRunnerFactory);){
                DispatcherGateway dispatcherGateway = this.grantLeadership(dispatcherLeaderElectionService);
                LOG.info("Initial job submission {}.", (Object)jobGraph.getJobID());
                dispatcherGateway.submitJob(jobGraph, TESTING_TIMEOUT).get();
                dispatcherLeaderElectionService.notLeader();
                LOG.info("Re-grant leadership first time.");
                dispatcherGateway = this.grantLeadership(dispatcherLeaderElectionService);
                LOG.info("Cancel recovered job {}.", (Object)jobGraph.getJobID());
                CompletableFuture jobResultFuture = dispatcherGateway.requestJobResult(jobGraph.getJobID(), TESTING_TIMEOUT);
                dispatcherGateway.cancelJob(jobGraph.getJobID(), TESTING_TIMEOUT).get();
                JobResult jobResult = (JobResult)jobResultFuture.get();
                Assert.assertThat((Object)jobResult.getApplicationStatus(), (Matcher)Matchers.is((Object)ApplicationStatus.CANCELED));
                dispatcherLeaderElectionService.notLeader();
                JobGraphStore submittedJobGraphStore = this.createZooKeeperJobGraphStore(client);
                CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> submittedJobGraphStore.getJobIds().isEmpty()), Deadline.fromNow((Duration)VERIFICATION_TIMEOUT), 20L);
            }
        }
        Assert.assertThat((Object)this.clusterHaStorageDir.listFiles(), (Matcher)Matchers.is((Matcher)Matchers.emptyArray()));
    }

    private DispatcherRunner createDispatcherRunner(TestingRpcService rpcService, TestingLeaderElectionService dispatcherLeaderElectionService, JobGraphStoreFactory jobGraphStoreFactory, PartialDispatcherServices partialDispatcherServices, DispatcherRunnerFactory dispatcherRunnerFactory) throws Exception {
        return dispatcherRunnerFactory.createDispatcherRunner((LeaderElectionService)dispatcherLeaderElectionService, (FatalErrorHandler)this.fatalErrorHandler, jobGraphStoreFactory, (Executor)TestingUtils.defaultExecutor(), (RpcService)rpcService, partialDispatcherServices);
    }

    private JobGraphStore createZooKeeperJobGraphStore(CuratorFramework client) {
        try {
            return ZooKeeperUtils.createJobGraphs((CuratorFramework)client, (Configuration)this.configuration);
        }
        catch (Exception e) {
            ExceptionUtils.rethrow((Throwable)e);
            return null;
        }
    }

    private DispatcherGateway grantLeadership(TestingLeaderElectionService dispatcherLeaderElectionService) throws InterruptedException, ExecutionException {
        UUID leaderSessionId = UUID.randomUUID();
        dispatcherLeaderElectionService.isLeader(leaderSessionId);
        LeaderConnectionInfo leaderConnectionInfo = dispatcherLeaderElectionService.getConfirmationFuture().get();
        return testingRpcServiceResource.getTestingRpcService().connect(leaderConnectionInfo.getAddress(), DispatcherId.fromUuid((UUID)leaderSessionId), DispatcherGateway.class).get();
    }

    private JobGraph createJobGraphWithBlobs() throws IOException {
        JobVertex vertex = new JobVertex("test vertex");
        vertex.setInvokableClass(NoOpInvokable.class);
        vertex.setParallelism(1);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(vertex);
        PermanentBlobKey permanentBlobKey = this.blobServer.putPermanent(jobGraph.getJobID(), new byte[256]);
        jobGraph.addUserJarBlobKey(permanentBlobKey);
        return jobGraph;
    }
}

