/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import java.util.Collection;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.dispatcher.NoOpJobGraphListener;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.DefaultJobGraphStore;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.jobmanager.JobGraphStoreUtil;
import org.apache.flink.runtime.jobmanager.JobGraphStoreWatcher;
import org.apache.flink.runtime.jobmanager.TestingJobGraphListener;
import org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStoreUtil;
import org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStoreWatcher;
import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class ZooKeeperJobGraphsStoreITCase
extends TestLogger {
    private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
    private static final RetrievableStateStorageHelper<JobGraph> localStateStorage = jobGraph -> {
        ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle(String.valueOf(UUID.randomUUID()), InstantiationUtil.serializeObject((Object)jobGraph));
        return new RetrievableStreamStateHandle((StreamStateHandle)byteStreamStateHandle);
    };

    @AfterClass
    public static void tearDown() throws Exception {
        ZooKeeper.shutdown();
    }

    @Before
    public void cleanUp() throws Exception {
        ZooKeeper.deleteAll();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPutAndRemoveJobGraph() throws Exception {
        JobGraphStore jobGraphs = this.createZooKeeperJobGraphStore("/testPutAndRemoveJobGraph");
        try {
            JobGraphStore.JobGraphListener listener = (JobGraphStore.JobGraphListener)Mockito.mock(JobGraphStore.JobGraphListener.class);
            jobGraphs.start(listener);
            JobGraph jobGraph = this.createJobGraph(new JobID(), "JobName");
            Assert.assertEquals((long)0L, (long)jobGraphs.getJobIds().size());
            jobGraphs.putJobGraph(jobGraph);
            Collection jobIds = jobGraphs.getJobIds();
            Assert.assertEquals((long)1L, (long)jobIds.size());
            JobID jobId = (JobID)jobIds.iterator().next();
            this.verifyJobGraphs(jobGraph, jobGraphs.recoverJobGraph(jobId));
            jobGraph = this.createJobGraph(jobGraph.getJobID(), "Updated JobName");
            jobGraphs.putJobGraph(jobGraph);
            jobIds = jobGraphs.getJobIds();
            Assert.assertEquals((long)1L, (long)jobIds.size());
            jobId = (JobID)jobIds.iterator().next();
            this.verifyJobGraphs(jobGraph, jobGraphs.recoverJobGraph(jobId));
            jobGraphs.removeJobGraph(jobGraph.getJobID());
            Assert.assertEquals((long)0L, (long)jobGraphs.getJobIds().size());
            ((JobGraphStore.JobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.atMost((int)1))).onAddedJobGraph((JobID)org.mockito.Matchers.any(JobID.class));
            ((JobGraphStore.JobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.never())).onRemovedJobGraph((JobID)org.mockito.Matchers.any(JobID.class));
            jobGraphs.removeJobGraph(jobGraph.getJobID());
        }
        finally {
            jobGraphs.stop();
        }
    }

    @Nonnull
    private JobGraphStore createZooKeeperJobGraphStore(String fullPath) throws Exception {
        CuratorFramework client = ZooKeeper.getClient();
        client.newNamespaceAwareEnsurePath(fullPath).ensure(client.getZookeeperClient());
        CuratorFramework facade = client.usingNamespace(client.getNamespace() + fullPath);
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(facade, localStateStorage);
        return new DefaultJobGraphStore((StateHandleStore)zooKeeperStateHandleStore, (JobGraphStoreWatcher)new ZooKeeperJobGraphStoreWatcher(new PathChildrenCache(facade, "/", false)), (JobGraphStoreUtil)ZooKeeperJobGraphStoreUtil.INSTANCE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRecoverJobGraphs() throws Exception {
        JobGraphStore jobGraphs = this.createZooKeeperJobGraphStore("/testRecoverJobGraphs");
        try {
            JobGraphStore.JobGraphListener listener = (JobGraphStore.JobGraphListener)Mockito.mock(JobGraphStore.JobGraphListener.class);
            jobGraphs.start(listener);
            HashMap<JobID, JobGraph> expected = new HashMap<JobID, JobGraph>();
            JobID[] jobIds = new JobID[]{new JobID(), new JobID(), new JobID()};
            expected.put(jobIds[0], this.createJobGraph(jobIds[0]));
            expected.put(jobIds[1], this.createJobGraph(jobIds[1]));
            expected.put(jobIds[2], this.createJobGraph(jobIds[2]));
            for (JobGraph jobGraph : expected.values()) {
                jobGraphs.putJobGraph(jobGraph);
            }
            Collection actual = jobGraphs.getJobIds();
            Assert.assertEquals((long)expected.size(), (long)actual.size());
            for (JobID jobId : actual) {
                JobGraph jobGraph = jobGraphs.recoverJobGraph(jobId);
                Assert.assertTrue((boolean)expected.containsKey(jobGraph.getJobID()));
                this.verifyJobGraphs((JobGraph)expected.get(jobGraph.getJobID()), jobGraph);
                jobGraphs.removeJobGraph(jobGraph.getJobID());
            }
            Assert.assertEquals((long)0L, (long)jobGraphs.getJobIds().size());
            ((JobGraphStore.JobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.atMost((int)expected.size()))).onAddedJobGraph((JobID)org.mockito.Matchers.any(JobID.class));
            ((JobGraphStore.JobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.never())).onRemovedJobGraph((JobID)org.mockito.Matchers.any(JobID.class));
        }
        finally {
            jobGraphs.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentAddJobGraph() throws Exception {
        JobGraphStore jobGraphs = null;
        JobGraphStore otherJobGraphs = null;
        try {
            jobGraphs = this.createZooKeeperJobGraphStore("/testConcurrentAddJobGraph");
            otherJobGraphs = this.createZooKeeperJobGraphStore("/testConcurrentAddJobGraph");
            JobGraph jobGraph = this.createJobGraph(new JobID());
            JobGraph otherJobGraph = this.createJobGraph(new JobID());
            JobGraphStore.JobGraphListener listener = (JobGraphStore.JobGraphListener)Mockito.mock(JobGraphStore.JobGraphListener.class);
            final JobID[] actualOtherJobId = new JobID[1];
            final CountDownLatch sync = new CountDownLatch(1);
            ((JobGraphStore.JobGraphListener)Mockito.doAnswer((Answer)new Answer<Void>(){

                public Void answer(InvocationOnMock invocation) throws Throwable {
                    actualOtherJobId[0] = (JobID)invocation.getArguments()[0];
                    sync.countDown();
                    return null;
                }
            }).when((Object)listener)).onAddedJobGraph((JobID)org.mockito.Matchers.any(JobID.class));
            jobGraphs.start(listener);
            otherJobGraphs.start((JobGraphStore.JobGraphListener)NoOpJobGraphListener.INSTANCE);
            jobGraphs.putJobGraph(jobGraph);
            ((JobGraphStore.JobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.never())).onAddedJobGraph((JobID)org.mockito.Matchers.any(JobID.class));
            ((JobGraphStore.JobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.never())).onRemovedJobGraph((JobID)org.mockito.Matchers.any(JobID.class));
            otherJobGraphs.putJobGraph(otherJobGraph);
            sync.await();
            ((JobGraphStore.JobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onAddedJobGraph((JobID)org.mockito.Matchers.any(JobID.class));
            ((JobGraphStore.JobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.never())).onRemovedJobGraph((JobID)org.mockito.Matchers.any(JobID.class));
            Assert.assertEquals((Object)otherJobGraph.getJobID(), (Object)actualOtherJobId[0]);
        }
        finally {
            if (jobGraphs != null) {
                jobGraphs.stop();
            }
            if (otherJobGraphs != null) {
                otherJobGraphs.stop();
            }
        }
    }

    @Test(expected=IllegalStateException.class)
    public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
        JobGraphStore jobGraphs = this.createZooKeeperJobGraphStore("/testUpdateJobGraphYouDidNotGetOrAdd");
        JobGraphStore otherJobGraphs = this.createZooKeeperJobGraphStore("/testUpdateJobGraphYouDidNotGetOrAdd");
        jobGraphs.start((JobGraphStore.JobGraphListener)NoOpJobGraphListener.INSTANCE);
        otherJobGraphs.start((JobGraphStore.JobGraphListener)NoOpJobGraphListener.INSTANCE);
        JobGraph jobGraph = this.createJobGraph(new JobID());
        jobGraphs.putJobGraph(jobGraph);
        otherJobGraphs.putJobGraph(jobGraph);
    }

    @Test
    public void testJobGraphRemovalFailureAndLockRelease() throws Exception {
        JobGraphStore submittedJobGraphStore = this.createZooKeeperJobGraphStore("/testConcurrentAddJobGraph");
        JobGraphStore otherSubmittedJobGraphStore = this.createZooKeeperJobGraphStore("/testConcurrentAddJobGraph");
        TestingJobGraphListener listener = new TestingJobGraphListener();
        submittedJobGraphStore.start((JobGraphStore.JobGraphListener)listener);
        otherSubmittedJobGraphStore.start((JobGraphStore.JobGraphListener)listener);
        JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
        submittedJobGraphStore.putJobGraph(jobGraph);
        JobGraph recoveredJobGraph = otherSubmittedJobGraphStore.recoverJobGraph(jobGraph.getJobID());
        Assert.assertThat((Object)recoveredJobGraph, (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
        try {
            otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobID());
            Assert.fail((String)"It should not be possible to remove the JobGraph since the first store still has a lock on it.");
        }
        catch (Exception exception) {
            // empty catch block
        }
        submittedJobGraphStore.stop();
        otherSubmittedJobGraphStore.removeJobGraph(recoveredJobGraph.getJobID());
        Assert.assertThat((Object)otherSubmittedJobGraphStore.recoverJobGraph(recoveredJobGraph.getJobID()), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
        otherSubmittedJobGraphStore.stop();
    }

    private JobGraph createJobGraph(JobID jobId) {
        return this.createJobGraph(jobId, "Test JobGraph");
    }

    private JobGraph createJobGraph(JobID jobId, String jobName) {
        JobVertex jobVertex = new JobVertex("Test JobVertex");
        jobVertex.setParallelism(1);
        return JobGraphBuilder.newStreamingJobGraphBuilder().setJobName(jobName).setJobId(jobId).addJobVertex(jobVertex).build();
    }

    private void verifyJobGraphs(JobGraph expected, JobGraph actual) {
        Assert.assertEquals((Object)expected.getName(), (Object)actual.getName());
        Assert.assertEquals((Object)expected.getJobID(), (Object)actual.getJobID());
    }
}

