/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph.failover.flip1;

import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.throwable.ThrowableClassifier;
import org.apache.flink.runtime.throwable.ThrowableType;
import org.apache.flink.runtime.topology.Vertex;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;

public class ExecutionFailureHandler {
    private final FailoverTopology<?, ?> failoverTopology;
    private final FailoverStrategy failoverStrategy;
    private final RestartBackoffTimeStrategy restartBackoffTimeStrategy;
    private long numberOfRestarts;

    public ExecutionFailureHandler(FailoverTopology<?, ?> failoverTopology, FailoverStrategy failoverStrategy, RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
        this.failoverTopology = (FailoverTopology)Preconditions.checkNotNull(failoverTopology);
        this.failoverStrategy = (FailoverStrategy)Preconditions.checkNotNull((Object)failoverStrategy);
        this.restartBackoffTimeStrategy = (RestartBackoffTimeStrategy)Preconditions.checkNotNull((Object)restartBackoffTimeStrategy);
    }

    public FailureHandlingResult getFailureHandlingResult(ExecutionVertexID failedTask, Throwable cause) {
        return this.handleFailure(cause, this.failoverStrategy.getTasksNeedingRestart(failedTask, cause));
    }

    public FailureHandlingResult getGlobalFailureHandlingResult(Throwable cause) {
        return this.handleFailure(cause, IterableUtils.toStream(this.failoverTopology.getVertices()).map(Vertex::getId).collect(Collectors.toSet()));
    }

    private FailureHandlingResult handleFailure(Throwable cause, Set<ExecutionVertexID> verticesToRestart) {
        if (ExecutionFailureHandler.isUnrecoverableError(cause)) {
            return FailureHandlingResult.unrecoverable((Throwable)((Object)new JobException("The failure is not recoverable", cause)));
        }
        this.restartBackoffTimeStrategy.notifyFailure(cause);
        if (this.restartBackoffTimeStrategy.canRestart()) {
            ++this.numberOfRestarts;
            return FailureHandlingResult.restartable(verticesToRestart, this.restartBackoffTimeStrategy.getBackoffTime());
        }
        return FailureHandlingResult.unrecoverable((Throwable)((Object)new JobException("Recovery is suppressed by " + this.restartBackoffTimeStrategy, cause)));
    }

    @VisibleForTesting
    static boolean isUnrecoverableError(Throwable cause) {
        Optional<Throwable> unrecoverableError = ThrowableClassifier.findThrowableOfThrowableType(cause, ThrowableType.NonRecoverableError);
        return unrecoverableError.isPresent();
    }

    public long getNumberOfRestarts() {
        return this.numberOfRestarts;
    }
}

