/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.dispatcher.AbstractDispatcherTest;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.JobManagerRunnerFactory;
import org.apache.flink.runtime.dispatcher.JobMasterServiceLeadershipRunnerFactory;
import org.apache.flink.runtime.dispatcher.JobMasterTester;
import org.apache.flink.runtime.dispatcher.TestingDispatcher;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.ThrowingConsumer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class DispatcherFailoverITCase
extends AbstractDispatcherTest {
    private final BlockingQueue<RpcEndpoint> toTerminate = new LinkedBlockingQueue<RpcEndpoint>();

    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.haServices.setCheckpointRecoveryFactory((CheckpointRecoveryFactory)new PerJobCheckpointRecoveryFactory((maxCheckpoints, previous) -> {
            if (previous != null) {
                Assert.assertFalse((boolean)previous.getShutdownStatus().isPresent());
                Assert.assertFalse((boolean)previous.getAllCheckpoints().isEmpty());
                return new EmbeddedCompletedCheckpointStore(maxCheckpoints.intValue(), (Collection)previous.getAllCheckpoints());
            }
            return new EmbeddedCompletedCheckpointStore(maxCheckpoints.intValue());
        }));
    }

    @Override
    @After
    public void tearDown() {
        while (!this.toTerminate.isEmpty()) {
            RpcEndpoint endpoint = (RpcEndpoint)this.toTerminate.poll();
            try {
                RpcUtils.terminateRpcEndpoint((RpcEndpoint)endpoint, (Time)TIMEOUT);
            }
            catch (Exception exception) {}
        }
    }

    @Test
    public void testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed() throws Exception {
        JobGraph jobGraph = this.createJobGraph();
        JobID jobId = jobGraph.getJobID();
        Error jobGraphRemovalError = new Error("Unable to remove job graph.");
        TestingJobGraphStore jobGraphStore = TestingJobGraphStore.newBuilder().setRemoveJobGraphConsumer((ThrowingConsumer<JobID, ? extends Exception>)((ThrowingConsumer)graph -> {
            throw jobGraphRemovalError;
        })).build();
        jobGraphStore.start(null);
        this.haServices.setJobGraphStore(jobGraphStore);
        TestingLeaderElectionService leaderElectionService = new TestingLeaderElectionService();
        this.haServices.setJobMasterLeaderElectionService(jobId, leaderElectionService);
        CountDownLatch jobGraphRemovalErrorReceived = new CountDownLatch(1);
        TestingDispatcher dispatcher = this.createRecoveredDispatcher(throwable -> {
            Optional maybeError = ExceptionUtils.findThrowable((Throwable)throwable, Error.class);
            if (maybeError.isPresent() && jobGraphRemovalError.equals(maybeError.get())) {
                jobGraphRemovalErrorReceived.countDown();
            } else {
                this.testingFatalErrorHandlerResource.getFatalErrorHandler().onFatalError(throwable);
            }
        });
        this.toTerminate.add((RpcEndpoint)dispatcher);
        leaderElectionService.isLeader(UUID.randomUUID());
        DispatcherGateway dispatcherGateway = (DispatcherGateway)dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
        JobMasterGateway jobMasterGateway = DispatcherFailoverITCase.connectToLeadingJobMaster(leaderElectionService).get();
        try (JobMasterTester tester = new JobMasterTester(rpcService, jobId, jobMasterGateway);){
            CompletableFuture<List<TaskDeploymentDescriptor>> descriptorsFuture = tester.deployVertices(2);
            DispatcherFailoverITCase.awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
            tester.transitionTo(descriptorsFuture.get(), ExecutionState.INITIALIZING).get();
            tester.transitionTo(descriptorsFuture.get(), ExecutionState.RUNNING).get();
            tester.getCheckpointFuture(1L).get();
            tester.transitionTo(descriptorsFuture.get(), ExecutionState.FINISHED).get();
        }
        DispatcherFailoverITCase.awaitStatus(dispatcherGateway, jobId, JobStatus.FINISHED);
        jobGraphRemovalErrorReceived.await();
        leaderElectionService.notLeader();
        leaderElectionService.stop();
        TestingDispatcher secondDispatcher = this.createRecoveredDispatcher(null);
        this.toTerminate.add((RpcEndpoint)secondDispatcher);
        DispatcherGateway secondDispatcherGateway = (DispatcherGateway)secondDispatcher.getSelfGateway(DispatcherGateway.class);
        leaderElectionService.isLeader(UUID.randomUUID());
        JobMasterGateway secondJobMasterGateway = DispatcherFailoverITCase.connectToLeadingJobMaster(leaderElectionService).get();
        try (JobMasterTester tester = new JobMasterTester(rpcService, jobId, secondJobMasterGateway);){
            CompletableFuture<List<TaskDeploymentDescriptor>> descriptorsFuture = tester.deployVertices(2);
            DispatcherFailoverITCase.awaitStatus(secondDispatcherGateway, jobId, JobStatus.RUNNING);
            Optional<JobManagerTaskRestore> maybeRestore = descriptorsFuture.get().stream().map(TaskDeploymentDescriptor::getTaskRestore).filter(Objects::nonNull).findAny();
            Assert.assertTrue((String)"Job has recovered from checkpoint.", (boolean)maybeRestore.isPresent());
        }
    }

    private JobGraph createJobGraph() {
        JobVertex firstVertex = new JobVertex("first");
        firstVertex.setInvokableClass(NoOpInvokable.class);
        firstVertex.setParallelism(1);
        JobVertex secondVertex = new JobVertex("second");
        secondVertex.setInvokableClass(NoOpInvokable.class);
        secondVertex.setParallelism(1);
        CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(20L).setMinPauseBetweenCheckpoints(20L).setCheckpointTimeout(10000L).build();
        JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(checkpointCoordinatorConfiguration, null);
        return JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertex(firstVertex).addJobVertex(secondVertex).setJobCheckpointingSettings(checkpointingSettings).build();
    }

    private TestingDispatcher createRecoveredDispatcher(@Nullable FatalErrorHandler fatalErrorHandler) throws Exception {
        ArrayList<JobGraph> jobGraphs = new ArrayList<JobGraph>();
        for (JobID jobId : this.haServices.getJobGraphStore().getJobIds()) {
            jobGraphs.add(this.haServices.getJobGraphStore().recoverJobGraph(jobId));
        }
        this.haServices.setRunningJobsRegistry((RunningJobsRegistry)new StandaloneRunningJobsRegistry());
        TestingDispatcher dispatcher = new AbstractDispatcherTest.TestingDispatcherBuilder().setJobManagerRunnerFactory((JobManagerRunnerFactory)JobMasterServiceLeadershipRunnerFactory.INSTANCE).setJobGraphWriter((JobGraphWriter)this.haServices.getJobGraphStore()).setInitialJobGraphs(jobGraphs).setFatalErrorHandler(fatalErrorHandler == null ? this.testingFatalErrorHandlerResource.getFatalErrorHandler() : fatalErrorHandler).build();
        dispatcher.start();
        return dispatcher;
    }

    private static CompletableFuture<JobMasterGateway> connectToLeadingJobMaster(TestingLeaderElectionService leaderElectionService) {
        return leaderElectionService.getConfirmationFuture().thenCompose(leaderConnectionInfo -> rpcService.connect(leaderConnectionInfo.getAddress(), JobMasterId.fromUuidOrNull((UUID)leaderConnectionInfo.getLeaderSessionId()), JobMasterGateway.class));
    }
}

