/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class AdaptiveSchedulerSimpleITCase
extends TestLogger {
    private static final int NUMBER_TASK_MANAGERS = 2;
    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
    private static final int PARALLELISM = 10;
    private static final Configuration configuration = AdaptiveSchedulerSimpleITCase.getConfiguration();
    @ClassRule
    public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());

    private static Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.SCHEDULER, (Object)JobManagerOptions.SchedulerType.Adaptive);
        configuration.set(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, (Object)Duration.ofMillis(100L));
        return configuration;
    }

    @Test
    public void testSchedulingOfSimpleJob() throws Exception {
        MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
        JobGraph jobGraph = this.createJobGraph();
        miniCluster.submitJob(jobGraph).join();
        JobResult jobResult = (JobResult)miniCluster.requestJobResult(jobGraph.getJobID()).join();
        JobExecutionResult jobExecutionResult = jobResult.toJobExecutionResult(((Object)((Object)this)).getClass().getClassLoader());
        Assert.assertTrue((boolean)jobResult.isSuccess());
    }

    private JobGraph createJobGraph() {
        JobVertex source = new JobVertex("Source");
        source.setInvokableClass(NoOpInvokable.class);
        source.setParallelism(10);
        JobVertex sink = new JobVertex("sink");
        sink.setInvokableClass(NoOpInvokable.class);
        sink.setParallelism(10);
        sink.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        return JobGraphTestUtils.streamingJobGraph(source, sink);
    }

    @Test
    public void testJobCancellationWhileRestartingSucceeds() throws Exception {
        long timeInRestartingState = 10000L;
        MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
        JobVertex alwaysFailingOperator = new JobVertex("Always failing operator");
        alwaysFailingOperator.setInvokableClass(AlwaysFailingInvokable.class);
        alwaysFailingOperator.setParallelism(1);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(alwaysFailingOperator);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)10000L));
        jobGraph.setExecutionConfig(executionConfig);
        miniCluster.submitJob(jobGraph).join();
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> miniCluster.getJobStatus(jobGraph.getJobID()).get() == JobStatus.RESTARTING), Deadline.fromNow((Duration)Duration.of(10000L, ChronoUnit.MILLIS)), 5L);
        miniCluster.cancelJob(jobGraph.getJobID()).get();
    }

    @Test
    public void testGlobalFailoverIfTaskFails() throws Throwable {
        MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
        JobGraph jobGraph = this.createOnceFailingJobGraph();
        miniCluster.submitJob(jobGraph).join();
        JobResult jobResult = (JobResult)miniCluster.requestJobResult(jobGraph.getJobID()).join();
        if (!jobResult.isSuccess()) {
            throw ((SerializedThrowable)jobResult.getSerializedThrowable().get()).deserializeError(ClassLoader.getSystemClassLoader());
        }
    }

    private JobGraph createOnceFailingJobGraph() throws IOException {
        JobVertex onceFailingOperator = new JobVertex("Once failing operator");
        OnceFailingInvokable.reset();
        onceFailingOperator.setInvokableClass(OnceFailingInvokable.class);
        onceFailingOperator.setParallelism(1);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(onceFailingOperator);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        jobGraph.setExecutionConfig(executionConfig);
        return jobGraph;
    }

    public static final class AlwaysFailingInvokable
    extends AbstractInvokable {
        public AlwaysFailingInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            throw new FlinkRuntimeException("Test failure.");
        }
    }

    public static final class OnceFailingInvokable
    extends AbstractInvokable {
        private static volatile boolean hasFailed = false;

        public OnceFailingInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            if (!hasFailed && this.getIndexInSubtaskGroup() == 0) {
                hasFailed = true;
                throw new FlinkRuntimeException("Test failure.");
            }
        }

        private static void reset() {
            hasFailed = false;
        }
    }
}

