/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummarySnapshot;
import org.apache.flink.runtime.checkpoint.StatsSummarySnapshot;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class ArchivedExecutionGraphTest
extends TestLogger {
    private static ExecutionGraph runtimeGraph;

    @BeforeClass
    public static void setupExecutionGraph() throws Exception {
        JobVertexID v1ID = new JobVertexID();
        JobVertexID v2ID = new JobVertexID();
        JobVertex v1 = new JobVertex("v1", v1ID);
        JobVertex v2 = new JobVertex("v2", v2ID);
        v1.setParallelism(1);
        v2.setParallelism(2);
        v1.setInvokableClass(AbstractInvokable.class);
        v2.setInvokableClass(AbstractInvokable.class);
        ExecutionConfig config = new ExecutionConfig();
        config.setRestartStrategy((RestartStrategies.RestartStrategyConfiguration)new RestartStrategies.NoRestartStrategyConfiguration());
        config.setParallelism(4);
        config.enableObjectReuse();
        config.setGlobalJobParameters((ExecutionConfig.GlobalJobParameters)new TestJobParameters());
        CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration(100L, 100L, 100L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, 0, 0L);
        JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(chkConfig, null);
        JobGraph jobGraph = JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertices(Arrays.asList(v1, v2)).setJobCheckpointingSettings(checkpointingSettings).setExecutionConfig(config).build();
        DefaultScheduler scheduler = SchedulerTestingUtils.createScheduler(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread());
        runtimeGraph = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        scheduler.updateTaskExecutionState(new TaskExecutionState(((ExecutionVertex)runtimeGraph.getAllExecutionVertices().iterator().next()).getCurrentExecutionAttempt().getAttemptId(), ExecutionState.FAILED, (Throwable)new RuntimeException("Local failure")));
    }

    @Test
    public void testArchive() throws IOException, ClassNotFoundException {
        ArchivedExecutionGraph archivedGraph = ArchivedExecutionGraph.createFrom((ExecutionGraph)runtimeGraph);
        ArchivedExecutionGraphTest.compareExecutionGraph((AccessExecutionGraph)runtimeGraph, (AccessExecutionGraph)archivedGraph);
    }

    @Test
    public void testSerialization() throws IOException, ClassNotFoundException {
        ArchivedExecutionGraph archivedGraph = ArchivedExecutionGraph.createFrom((ExecutionGraph)runtimeGraph);
        ArchivedExecutionGraphTest.verifySerializability(archivedGraph);
    }

    @Test
    public void testCreateFromInitializingJobForSuspendedJob() {
        ArchivedExecutionGraph suspendedExecutionGraph = ArchivedExecutionGraph.createFromInitializingJob((JobID)new JobID(), (String)"TestJob", (JobStatus)JobStatus.SUSPENDED, (Throwable)new Exception("Test suspension exception"), null, (long)System.currentTimeMillis());
        Assert.assertThat((Object)suspendedExecutionGraph.getState(), (Matcher)CoreMatchers.is((Object)JobStatus.SUSPENDED));
        Assert.assertThat((Object)suspendedExecutionGraph.getFailureInfo(), (Matcher)CoreMatchers.notNullValue());
    }

    @Test
    public void testCheckpointSettingsArchiving() {
        CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = CheckpointCoordinatorConfiguration.builder().build();
        ArchivedExecutionGraph archivedGraph = ArchivedExecutionGraph.createFromInitializingJob((JobID)new JobID(), (String)"TestJob", (JobStatus)JobStatus.INITIALIZING, null, (JobCheckpointingSettings)new JobCheckpointingSettings(checkpointCoordinatorConfiguration, null), (long)System.currentTimeMillis());
        ArchivedExecutionGraphTest.assertContainsCheckpointSettings(archivedGraph);
    }

    public static void assertContainsCheckpointSettings(ArchivedExecutionGraph archivedGraph) {
        Assert.assertThat((Object)archivedGraph.getCheckpointCoordinatorConfiguration(), (Matcher)CoreMatchers.notNullValue());
        Assert.assertThat((Object)archivedGraph.getCheckpointStatsSnapshot(), (Matcher)CoreMatchers.notNullValue());
        Assert.assertThat(archivedGraph.getCheckpointStorageName().get(), (Matcher)CoreMatchers.is((Object)"Unknown"));
        Assert.assertThat(archivedGraph.getStateBackendName().get(), (Matcher)CoreMatchers.is((Object)"Unknown"));
    }

    @Test
    public void testArchiveWithStatusOverride() throws IOException, ClassNotFoundException {
        ArchivedExecutionGraph archivedGraph = ArchivedExecutionGraph.createFrom((ExecutionGraph)runtimeGraph, (JobStatus)JobStatus.RESTARTING);
        Assert.assertThat((Object)archivedGraph.getState(), (Matcher)CoreMatchers.is((Object)JobStatus.RESTARTING));
        Assert.assertThat((Object)archivedGraph.getStatusTimestamp(JobStatus.FAILED), (Matcher)CoreMatchers.is((Object)0L));
    }

    private static void compareExecutionGraph(AccessExecutionGraph runtimeGraph, AccessExecutionGraph archivedGraph) throws IOException, ClassNotFoundException {
        Assert.assertEquals((Object)runtimeGraph.getJsonPlan(), (Object)archivedGraph.getJsonPlan());
        Assert.assertEquals((Object)runtimeGraph.getJobID(), (Object)archivedGraph.getJobID());
        Assert.assertEquals((Object)runtimeGraph.getJobName(), (Object)archivedGraph.getJobName());
        Assert.assertEquals((Object)runtimeGraph.getState(), (Object)archivedGraph.getState());
        Assert.assertEquals((Object)runtimeGraph.getFailureInfo().getExceptionAsString(), (Object)archivedGraph.getFailureInfo().getExceptionAsString());
        Assert.assertEquals((long)runtimeGraph.getStatusTimestamp(JobStatus.CREATED), (long)archivedGraph.getStatusTimestamp(JobStatus.CREATED));
        Assert.assertEquals((long)runtimeGraph.getStatusTimestamp(JobStatus.RUNNING), (long)archivedGraph.getStatusTimestamp(JobStatus.RUNNING));
        Assert.assertEquals((long)runtimeGraph.getStatusTimestamp(JobStatus.FAILING), (long)archivedGraph.getStatusTimestamp(JobStatus.FAILING));
        Assert.assertEquals((long)runtimeGraph.getStatusTimestamp(JobStatus.FAILED), (long)archivedGraph.getStatusTimestamp(JobStatus.FAILED));
        Assert.assertEquals((long)runtimeGraph.getStatusTimestamp(JobStatus.CANCELLING), (long)archivedGraph.getStatusTimestamp(JobStatus.CANCELLING));
        Assert.assertEquals((long)runtimeGraph.getStatusTimestamp(JobStatus.CANCELED), (long)archivedGraph.getStatusTimestamp(JobStatus.CANCELED));
        Assert.assertEquals((long)runtimeGraph.getStatusTimestamp(JobStatus.FINISHED), (long)archivedGraph.getStatusTimestamp(JobStatus.FINISHED));
        Assert.assertEquals((long)runtimeGraph.getStatusTimestamp(JobStatus.RESTARTING), (long)archivedGraph.getStatusTimestamp(JobStatus.RESTARTING));
        Assert.assertEquals((long)runtimeGraph.getStatusTimestamp(JobStatus.SUSPENDED), (long)archivedGraph.getStatusTimestamp(JobStatus.SUSPENDED));
        Assert.assertEquals((Object)runtimeGraph.isStoppable(), (Object)archivedGraph.isStoppable());
        CheckpointStatsSnapshot runtimeSnapshot = runtimeGraph.getCheckpointStatsSnapshot();
        CheckpointStatsSnapshot archivedSnapshot = archivedGraph.getCheckpointStatsSnapshot();
        List<Function> meters = Arrays.asList(CompletedCheckpointStatsSummarySnapshot::getEndToEndDurationStats, CompletedCheckpointStatsSummarySnapshot::getPersistedDataStats, CompletedCheckpointStatsSummarySnapshot::getProcessedDataStats, CompletedCheckpointStatsSummarySnapshot::getStateSizeStats);
        List<Function> aggs = Arrays.asList(StatsSummarySnapshot::getAverage, StatsSummarySnapshot::getMinimum, StatsSummarySnapshot::getMaximum, StatsSummarySnapshot::getSum, StatsSummarySnapshot::getCount, s -> s.getQuantile(0.5), s -> s.getQuantile(0.9), s -> s.getQuantile(0.95), s -> s.getQuantile(0.99), s -> s.getQuantile(0.999));
        for (Function meter : meters) {
            StatsSummarySnapshot runtime = (StatsSummarySnapshot)meter.apply(runtimeSnapshot.getSummaryStats());
            StatsSummarySnapshot archived = (StatsSummarySnapshot)meter.apply(runtimeSnapshot.getSummaryStats());
            for (Function function : aggs) {
                Assert.assertEquals(function.apply(runtime), function.apply(archived));
            }
        }
        Assert.assertEquals((long)runtimeSnapshot.getCounts().getTotalNumberOfCheckpoints(), (long)archivedSnapshot.getCounts().getTotalNumberOfCheckpoints());
        Assert.assertEquals((long)runtimeSnapshot.getCounts().getNumberOfCompletedCheckpoints(), (long)archivedSnapshot.getCounts().getNumberOfCompletedCheckpoints());
        Assert.assertEquals((long)runtimeSnapshot.getCounts().getNumberOfInProgressCheckpoints(), (long)archivedSnapshot.getCounts().getNumberOfInProgressCheckpoints());
        ArchivedExecutionConfig runtimeConfig = runtimeGraph.getArchivedExecutionConfig();
        ArchivedExecutionConfig archivedConfig = archivedGraph.getArchivedExecutionConfig();
        Assert.assertEquals((Object)runtimeConfig.getExecutionMode(), (Object)archivedConfig.getExecutionMode());
        Assert.assertEquals((long)runtimeConfig.getParallelism(), (long)archivedConfig.getParallelism());
        Assert.assertEquals((Object)runtimeConfig.getObjectReuseEnabled(), (Object)archivedConfig.getObjectReuseEnabled());
        Assert.assertEquals((Object)runtimeConfig.getRestartStrategyDescription(), (Object)archivedConfig.getRestartStrategyDescription());
        Assert.assertNotNull(archivedConfig.getGlobalJobParameters().get("hello"));
        Assert.assertEquals(runtimeConfig.getGlobalJobParameters().get("hello"), archivedConfig.getGlobalJobParameters().get("hello"));
        ArchivedExecutionGraphTest.compareStringifiedAccumulators(runtimeGraph.getAccumulatorResultsStringified(), archivedGraph.getAccumulatorResultsStringified());
        ArchivedExecutionGraphTest.compareSerializedAccumulators(runtimeGraph.getAccumulatorsSerialized(), archivedGraph.getAccumulatorsSerialized());
        Map runtimeVertices = runtimeGraph.getAllVertices();
        Map archivedVertices = archivedGraph.getAllVertices();
        for (Map.Entry entry : runtimeVertices.entrySet()) {
            ArchivedExecutionGraphTest.compareExecutionJobVertex((AccessExecutionJobVertex)entry.getValue(), (AccessExecutionJobVertex)archivedVertices.get(entry.getKey()));
        }
        Iterator runtimeTopologicalVertices = runtimeGraph.getVerticesTopologically().iterator();
        Iterator iterator = archivedGraph.getVerticesTopologically().iterator();
        while (runtimeTopologicalVertices.hasNext()) {
            Assert.assertTrue((boolean)iterator.hasNext());
            ArchivedExecutionGraphTest.compareExecutionJobVertex((AccessExecutionJobVertex)runtimeTopologicalVertices.next(), (AccessExecutionJobVertex)iterator.next());
        }
        Iterator runtimeExecutionVertices = runtimeGraph.getAllExecutionVertices().iterator();
        Iterator archivedExecutionVertices = archivedGraph.getAllExecutionVertices().iterator();
        while (runtimeExecutionVertices.hasNext()) {
            Assert.assertTrue((boolean)archivedExecutionVertices.hasNext());
            ArchivedExecutionGraphTest.compareExecutionVertex((AccessExecutionVertex)runtimeExecutionVertices.next(), (AccessExecutionVertex)archivedExecutionVertices.next());
        }
    }

    private static void compareExecutionJobVertex(AccessExecutionJobVertex runtimeJobVertex, AccessExecutionJobVertex archivedJobVertex) {
        Assert.assertEquals((Object)runtimeJobVertex.getName(), (Object)archivedJobVertex.getName());
        Assert.assertEquals((long)runtimeJobVertex.getParallelism(), (long)archivedJobVertex.getParallelism());
        Assert.assertEquals((long)runtimeJobVertex.getMaxParallelism(), (long)archivedJobVertex.getMaxParallelism());
        Assert.assertEquals((Object)runtimeJobVertex.getJobVertexId(), (Object)archivedJobVertex.getJobVertexId());
        Assert.assertEquals((Object)runtimeJobVertex.getAggregateState(), (Object)archivedJobVertex.getAggregateState());
        ArchivedExecutionGraphTest.compareStringifiedAccumulators(runtimeJobVertex.getAggregatedUserAccumulatorsStringified(), archivedJobVertex.getAggregatedUserAccumulatorsStringified());
        AccessExecutionVertex[] runtimeExecutionVertices = runtimeJobVertex.getTaskVertices();
        AccessExecutionVertex[] archivedExecutionVertices = archivedJobVertex.getTaskVertices();
        Assert.assertEquals((long)runtimeExecutionVertices.length, (long)archivedExecutionVertices.length);
        for (int x = 0; x < runtimeExecutionVertices.length; ++x) {
            ArchivedExecutionGraphTest.compareExecutionVertex(runtimeExecutionVertices[x], archivedExecutionVertices[x]);
        }
    }

    private static void compareExecutionVertex(AccessExecutionVertex runtimeVertex, AccessExecutionVertex archivedVertex) {
        Assert.assertEquals((Object)runtimeVertex.getTaskNameWithSubtaskIndex(), (Object)archivedVertex.getTaskNameWithSubtaskIndex());
        Assert.assertEquals((long)runtimeVertex.getParallelSubtaskIndex(), (long)archivedVertex.getParallelSubtaskIndex());
        Assert.assertEquals((Object)runtimeVertex.getExecutionState(), (Object)archivedVertex.getExecutionState());
        Assert.assertEquals((long)runtimeVertex.getStateTimestamp(ExecutionState.CREATED), (long)archivedVertex.getStateTimestamp(ExecutionState.CREATED));
        Assert.assertEquals((long)runtimeVertex.getStateTimestamp(ExecutionState.SCHEDULED), (long)archivedVertex.getStateTimestamp(ExecutionState.SCHEDULED));
        Assert.assertEquals((long)runtimeVertex.getStateTimestamp(ExecutionState.DEPLOYING), (long)archivedVertex.getStateTimestamp(ExecutionState.DEPLOYING));
        Assert.assertEquals((long)runtimeVertex.getStateTimestamp(ExecutionState.INITIALIZING), (long)archivedVertex.getStateTimestamp(ExecutionState.INITIALIZING));
        Assert.assertEquals((long)runtimeVertex.getStateTimestamp(ExecutionState.RUNNING), (long)archivedVertex.getStateTimestamp(ExecutionState.RUNNING));
        Assert.assertEquals((long)runtimeVertex.getStateTimestamp(ExecutionState.FINISHED), (long)archivedVertex.getStateTimestamp(ExecutionState.FINISHED));
        Assert.assertEquals((long)runtimeVertex.getStateTimestamp(ExecutionState.CANCELING), (long)archivedVertex.getStateTimestamp(ExecutionState.CANCELING));
        Assert.assertEquals((long)runtimeVertex.getStateTimestamp(ExecutionState.CANCELED), (long)archivedVertex.getStateTimestamp(ExecutionState.CANCELED));
        Assert.assertEquals((long)runtimeVertex.getStateTimestamp(ExecutionState.FAILED), (long)archivedVertex.getStateTimestamp(ExecutionState.FAILED));
        Assert.assertThat(runtimeVertex.getFailureInfo().map(ErrorInfo::getExceptionAsString), (Matcher)CoreMatchers.is(archivedVertex.getFailureInfo().map(ErrorInfo::getExceptionAsString)));
        Assert.assertThat(runtimeVertex.getFailureInfo().map(ErrorInfo::getTimestamp), (Matcher)CoreMatchers.is(archivedVertex.getFailureInfo().map(ErrorInfo::getTimestamp)));
        Assert.assertEquals((Object)runtimeVertex.getCurrentAssignedResourceLocation(), (Object)archivedVertex.getCurrentAssignedResourceLocation());
        ArchivedExecutionGraphTest.compareExecution(runtimeVertex.getCurrentExecutionAttempt(), archivedVertex.getCurrentExecutionAttempt());
    }

    private static void compareExecution(AccessExecution runtimeExecution, AccessExecution archivedExecution) {
        Assert.assertEquals((Object)runtimeExecution.getAttemptId(), (Object)archivedExecution.getAttemptId());
        Assert.assertEquals((long)runtimeExecution.getAttemptNumber(), (long)archivedExecution.getAttemptNumber());
        Assert.assertArrayEquals((long[])runtimeExecution.getStateTimestamps(), (long[])archivedExecution.getStateTimestamps());
        Assert.assertEquals((Object)runtimeExecution.getState(), (Object)archivedExecution.getState());
        Assert.assertEquals((Object)runtimeExecution.getAssignedResourceLocation(), (Object)archivedExecution.getAssignedResourceLocation());
        Assert.assertThat(runtimeExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString), (Matcher)CoreMatchers.is(archivedExecution.getFailureInfo().map(ErrorInfo::getExceptionAsString)));
        Assert.assertThat(runtimeExecution.getFailureInfo().map(ErrorInfo::getTimestamp), (Matcher)CoreMatchers.is(archivedExecution.getFailureInfo().map(ErrorInfo::getTimestamp)));
        Assert.assertEquals((long)runtimeExecution.getStateTimestamp(ExecutionState.CREATED), (long)archivedExecution.getStateTimestamp(ExecutionState.CREATED));
        Assert.assertEquals((long)runtimeExecution.getStateTimestamp(ExecutionState.SCHEDULED), (long)archivedExecution.getStateTimestamp(ExecutionState.SCHEDULED));
        Assert.assertEquals((long)runtimeExecution.getStateTimestamp(ExecutionState.DEPLOYING), (long)archivedExecution.getStateTimestamp(ExecutionState.DEPLOYING));
        Assert.assertEquals((long)runtimeExecution.getStateTimestamp(ExecutionState.INITIALIZING), (long)archivedExecution.getStateTimestamp(ExecutionState.INITIALIZING));
        Assert.assertEquals((long)runtimeExecution.getStateTimestamp(ExecutionState.RUNNING), (long)archivedExecution.getStateTimestamp(ExecutionState.RUNNING));
        Assert.assertEquals((long)runtimeExecution.getStateTimestamp(ExecutionState.FINISHED), (long)archivedExecution.getStateTimestamp(ExecutionState.FINISHED));
        Assert.assertEquals((long)runtimeExecution.getStateTimestamp(ExecutionState.CANCELING), (long)archivedExecution.getStateTimestamp(ExecutionState.CANCELING));
        Assert.assertEquals((long)runtimeExecution.getStateTimestamp(ExecutionState.CANCELED), (long)archivedExecution.getStateTimestamp(ExecutionState.CANCELED));
        Assert.assertEquals((long)runtimeExecution.getStateTimestamp(ExecutionState.FAILED), (long)archivedExecution.getStateTimestamp(ExecutionState.FAILED));
        ArchivedExecutionGraphTest.compareStringifiedAccumulators(runtimeExecution.getUserAccumulatorsStringified(), archivedExecution.getUserAccumulatorsStringified());
        Assert.assertEquals((long)runtimeExecution.getParallelSubtaskIndex(), (long)archivedExecution.getParallelSubtaskIndex());
    }

    private static void compareStringifiedAccumulators(StringifiedAccumulatorResult[] runtimeAccs, StringifiedAccumulatorResult[] archivedAccs) {
        Assert.assertEquals((long)runtimeAccs.length, (long)archivedAccs.length);
        for (int x = 0; x < runtimeAccs.length; ++x) {
            StringifiedAccumulatorResult runtimeResult = runtimeAccs[x];
            StringifiedAccumulatorResult archivedResult = archivedAccs[x];
            Assert.assertEquals((Object)runtimeResult.getName(), (Object)archivedResult.getName());
            Assert.assertEquals((Object)runtimeResult.getType(), (Object)archivedResult.getType());
            Assert.assertEquals((Object)runtimeResult.getValue(), (Object)archivedResult.getValue());
        }
    }

    private static void compareSerializedAccumulators(Map<String, SerializedValue<OptionalFailure<Object>>> runtimeAccs, Map<String, SerializedValue<OptionalFailure<Object>>> archivedAccs) throws IOException, ClassNotFoundException {
        Assert.assertEquals((long)runtimeAccs.size(), (long)archivedAccs.size());
        for (Map.Entry<String, SerializedValue<OptionalFailure<Object>>> runtimeAcc : runtimeAccs.entrySet()) {
            long runtimeUserAcc = (Long)((OptionalFailure)runtimeAcc.getValue().deserializeValue(ClassLoader.getSystemClassLoader())).getUnchecked();
            long archivedUserAcc = (Long)((OptionalFailure)archivedAccs.get(runtimeAcc.getKey()).deserializeValue(ClassLoader.getSystemClassLoader())).getUnchecked();
            Assert.assertEquals((long)runtimeUserAcc, (long)archivedUserAcc);
        }
    }

    private static void verifySerializability(ArchivedExecutionGraph graph) throws IOException, ClassNotFoundException {
        ArchivedExecutionGraph copy = (ArchivedExecutionGraph)CommonTestUtils.createCopySerializable((Serializable)graph);
        ArchivedExecutionGraphTest.compareExecutionGraph((AccessExecutionGraph)graph, (AccessExecutionGraph)copy);
    }

    private static class TestJobParameters
    extends ExecutionConfig.GlobalJobParameters {
        private static final long serialVersionUID = -8118611781035212808L;
        private Map<String, String> parameters = new HashMap<String, String>();

        private TestJobParameters() {
            this.parameters.put("hello", "world");
        }

        public Map<String, String> toMap() {
            return this.parameters;
        }
    }
}

