/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterTestUtils;
import org.apache.flink.runtime.jobmaster.KvStateRegistryGateway;
import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class JobMasterQueryableStateTest
extends TestLogger {
    private static final Time testingTimeout = Time.seconds((long)10L);
    private static TestingRpcService rpcService;
    private static final int PARALLELISM = 4;
    private static final JobVertex JOB_VERTEX_1;
    private static final JobVertex JOB_VERTEX_2;
    private static final JobGraph JOB_GRAPH;

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @After
    public void teardown() throws Exception {
        rpcService.clearGateways();
    }

    @AfterClass
    public static void teardownClass() {
        if (rpcService != null) {
            rpcService.stopService();
            rpcService = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestKvStateWithoutRegistration() throws Exception {
        JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();
        jobMaster.start();
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        JobMasterQueryableStateTest.registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID());
        try {
            try {
                jobMasterGateway.requestKvStateLocation(JOB_GRAPH.getJobID(), "unknown").get();
                Assert.fail((String)"Expected to fail with UnknownKvStateLocation");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, UnknownKvStateLocation.class).isPresent());
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestKvStateOfWrongJob() throws Exception {
        JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();
        jobMaster.start();
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        JobMasterQueryableStateTest.registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID());
        try {
            try {
                jobMasterGateway.requestKvStateLocation(new JobID(), "unknown").get();
                Assert.fail((String)"Expected to fail with FlinkJobNotFoundException");
            }
            catch (Exception e) {
                Assert.assertThat((Object)e, (Matcher)FlinkMatchers.containsCause(FlinkJobNotFoundException.class));
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRequestKvStateWithIrrelevantRegistration() throws Exception {
        JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();
        jobMaster.start();
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        JobMasterQueryableStateTest.registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID());
        try {
            try {
                JobMasterQueryableStateTest.registerKvState((KvStateRegistryGateway)jobMasterGateway, new JobID(), new JobVertexID(), "any-name");
                Assert.fail((String)"Expected to fail with FlinkJobNotFoundException.");
            }
            catch (Exception e) {
                Assert.assertThat((Object)e, (Matcher)FlinkMatchers.containsCause(FlinkJobNotFoundException.class));
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRegisterKvState() throws Exception {
        JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();
        jobMaster.start();
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        JobMasterQueryableStateTest.registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID());
        try {
            String registrationName = "register-me";
            KvStateID kvStateID = new KvStateID();
            KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
            InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 1029);
            jobMasterGateway.notifyKvStateRegistered(JOB_GRAPH.getJobID(), JOB_VERTEX_1.getID(), keyGroupRange, "register-me", kvStateID, address).get();
            KvStateLocation location = (KvStateLocation)jobMasterGateway.requestKvStateLocation(JOB_GRAPH.getJobID(), "register-me").get();
            Assert.assertEquals((Object)JOB_GRAPH.getJobID(), (Object)location.getJobId());
            Assert.assertEquals((Object)JOB_VERTEX_1.getID(), (Object)location.getJobVertexId());
            Assert.assertEquals((long)JOB_VERTEX_1.getMaxParallelism(), (long)location.getNumKeyGroups());
            Assert.assertEquals((long)1L, (long)location.getNumRegisteredKeyGroups());
            Assert.assertEquals((long)1L, (long)keyGroupRange.getNumberOfKeyGroups());
            Assert.assertEquals((Object)kvStateID, (Object)location.getKvStateID(keyGroupRange.getStartKeyGroup()));
            Assert.assertEquals((Object)address, (Object)location.getKvStateServerAddress(keyGroupRange.getStartKeyGroup()));
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUnregisterKvState() throws Exception {
        JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();
        jobMaster.start();
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        JobMasterQueryableStateTest.registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID());
        try {
            String registrationName = "register-me";
            KvStateID kvStateID = new KvStateID();
            KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
            InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 1029);
            jobMasterGateway.notifyKvStateRegistered(JOB_GRAPH.getJobID(), JOB_VERTEX_1.getID(), keyGroupRange, "register-me", kvStateID, address).get();
            jobMasterGateway.notifyKvStateUnregistered(JOB_GRAPH.getJobID(), JOB_VERTEX_1.getID(), keyGroupRange, "register-me").get();
            try {
                jobMasterGateway.requestKvStateLocation(JOB_GRAPH.getJobID(), "register-me").get();
                Assert.fail((String)"Expected to fail with an UnknownKvStateLocation.");
            }
            catch (Exception e) {
                Assert.assertThat((Object)e, (Matcher)FlinkMatchers.containsCause(UnknownKvStateLocation.class));
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDuplicatedKvStateRegistrationsFailTask() throws Exception {
        JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();
        jobMaster.start();
        JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
        JobMasterQueryableStateTest.registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID());
        try {
            String registrationName = "duplicate-me";
            JobMasterQueryableStateTest.registerKvState((KvStateRegistryGateway)jobMasterGateway, JOB_GRAPH.getJobID(), JOB_VERTEX_1.getID(), "duplicate-me");
            try {
                JobMasterQueryableStateTest.registerKvState((KvStateRegistryGateway)jobMasterGateway, JOB_GRAPH.getJobID(), JOB_VERTEX_2.getID(), "duplicate-me");
                Assert.fail((String)"Expected to fail because of clashing registration message.");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"Registration name clash").isPresent());
                Assert.assertThat(jobMasterGateway.requestJobStatus(testingTimeout).get(), (Matcher)CoreMatchers.either((Matcher)CoreMatchers.is((Object)JobStatus.FAILED)).or(CoreMatchers.is((Object)JobStatus.FAILING)));
            }
        }
        finally {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)jobMaster, (Time)testingTimeout);
        }
    }

    private static void registerSlotsRequiredForJobExecution(JobMasterGateway jobMasterGateway, JobID jobId) throws ExecutionException, InterruptedException {
        OneShotLatch oneTaskSubmittedLatch = new OneShotLatch();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
            oneTaskSubmittedLatch.trigger();
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway();
        JobMasterTestUtils.registerTaskExecutorAndOfferSlots(rpcService, jobMasterGateway, jobId, 4, taskExecutorGateway, testingTimeout);
        oneTaskSubmittedLatch.await();
    }

    private static void registerKvState(KvStateRegistryGateway stateRegistryGateway, JobID jobId, JobVertexID jobVertexId, String registrationName) throws UnknownHostException, ExecutionException, InterruptedException {
        stateRegistryGateway.notifyKvStateRegistered(jobId, jobVertexId, new KeyGroupRange(0, 0), registrationName, new KvStateID(), new InetSocketAddress(InetAddress.getLocalHost(), 1233)).get();
    }

    static {
        JOB_VERTEX_1 = new JobVertex("v1");
        JOB_VERTEX_1.setParallelism(4);
        JOB_VERTEX_1.setInvokableClass(AbstractInvokable.class);
        JOB_VERTEX_2 = new JobVertex("v2");
        JOB_VERTEX_2.setParallelism(4);
        JOB_VERTEX_2.setInvokableClass(AbstractInvokable.class);
        JOB_VERTEX_1.setMaxParallelism(16);
        JOB_VERTEX_2.setMaxParallelism(16);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        JOB_VERTEX_1.setSlotSharingGroup(slotSharingGroup);
        JOB_VERTEX_2.setSlotSharingGroup(slotSharingGroup);
        JOB_GRAPH = JobGraphTestUtils.streamingJobGraph(JOB_VERTEX_1, JOB_VERTEX_2);
        JOB_GRAPH.setJobType(JobType.STREAMING);
    }
}

