/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.s3a.commit.magic;

import java.io.IOException;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
import org.apache.hadoop.fs.s3a.commit.CommitOperations;
import org.apache.hadoop.fs.s3a.commit.CommitUtils;
import org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR;
import org.apache.hadoop.fs.s3a.commit.DurationInfo;
import org.apache.hadoop.fs.s3a.commit.MagicCommitPaths;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Public
@InterfaceStability.Unstable
public class MagicS3GuardCommitter
extends AbstractS3ACommitter {
    private static final Logger LOG = LoggerFactory.getLogger(MagicS3GuardCommitter.class);
    public static final String NAME = "magic";

    public MagicS3GuardCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
        super(outputPath, context);
        this.setWorkPath(this.getTaskAttemptPath(context));
        CommitUtils.verifyIsMagicCommitPath(this.getDestS3AFS(), this.getWorkPath());
        LOG.debug("Task attempt {} has work path {}", (Object)context.getTaskAttemptID(), (Object)this.getWorkPath());
    }

    @Override
    public String getName() {
        return NAME;
    }

    @Override
    protected boolean requiresDelayedCommitOutputInFileSystem() {
        return true;
    }

    public void setupJob(JobContext context) throws IOException {
        try (DurationInfo d = new DurationInfo(LOG, "Setup Job %s", CommitUtilsWithMR.jobIdString(context));){
            Path jobAttemptPath = this.getJobAttemptPath(context);
            this.getDestinationFS(jobAttemptPath, context.getConfiguration()).mkdirs(jobAttemptPath);
        }
    }

    @Override
    protected List<SinglePendingCommit> listPendingUploadsToCommit(JobContext context) throws IOException {
        FileSystem fs = this.getDestFS();
        return this.loadPendingsetFiles(context, false, fs, S3AUtils.listAndFilter(fs, this.getJobAttemptPath(context), false, CommitOperations.PENDINGSET_FILTER));
    }

    @Override
    public void cleanupStagingDirs() {
        Path path = MagicCommitPaths.magicSubdir(this.getOutputPath());
        Invoker.ignoreIOExceptions(LOG, "cleanup magic directory", path.toString(), () -> S3AUtils.deleteWithWarning(this.getDestFS(), path, true));
    }

    @Override
    public void setupTask(TaskAttemptContext context) throws IOException {
        try (DurationInfo d = new DurationInfo(LOG, "Setup Task %s", context.getTaskAttemptID());){
            Path taskAttemptPath = this.getTaskAttemptPath(context);
            FileSystem fs = taskAttemptPath.getFileSystem(this.getConf());
            fs.mkdirs(taskAttemptPath);
        }
    }

    public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
        Path taskAttemptPath = this.getTaskAttemptPath(context);
        try (DurationInfo d = new DurationInfo(LOG, "needsTaskCommit task %s", context.getTaskAttemptID());){
            boolean bl = taskAttemptPath.getFileSystem(context.getConfiguration()).exists(taskAttemptPath);
            return bl;
        }
    }

    public void commitTask(TaskAttemptContext context) throws IOException {
        try (DurationInfo d = new DurationInfo(LOG, "Commit task %s", context.getTaskAttemptID());){
            PendingSet commits = this.innerCommitTask(context);
            LOG.info("Task {} committed {} files", (Object)context.getTaskAttemptID(), (Object)commits.size());
        }
        catch (IOException e) {
            this.getCommitOperations().taskCompleted(false);
            throw e;
        }
        finally {
            this.deleteTaskAttemptPathQuietly(context);
        }
        this.getCommitOperations().taskCompleted(true);
    }

    private PendingSet innerCommitTask(TaskAttemptContext context) throws IOException {
        Path taskAttemptPath = this.getTaskAttemptPath(context);
        CommitOperations actions = this.getCommitOperations();
        Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>> loaded = actions.loadSinglePendingCommits(taskAttemptPath, true);
        PendingSet pendingSet = (PendingSet)loaded.getKey();
        List failures = (List)loaded.getValue();
        if (!failures.isEmpty()) {
            LOG.error("At least one commit file could not be read: failing");
            this.abortPendingUploads((JobContext)context, pendingSet.getCommits(), true);
            throw (IOException)((Pair)failures.get(0)).getValue();
        }
        String jobId = String.valueOf(context.getJobID());
        String taskId = String.valueOf(context.getTaskAttemptID());
        for (SinglePendingCommit commit : pendingSet.getCommits()) {
            commit.setJobId(jobId);
            commit.setTaskId(taskId);
        }
        Path jobAttemptPath = this.getJobAttemptPath((JobContext)context);
        TaskAttemptID taskAttemptID = context.getTaskAttemptID();
        Path taskOutcomePath = new Path(jobAttemptPath, taskAttemptID.getTaskID().toString() + ".pendingset");
        LOG.info("Saving work of {} to {}", (Object)taskAttemptID, (Object)taskOutcomePath);
        try {
            pendingSet.save(this.getDestFS(), taskOutcomePath, false);
        }
        catch (IOException e) {
            LOG.warn("Failed to save task commit data to {} ", (Object)taskOutcomePath, (Object)e);
            this.abortPendingUploads((JobContext)context, pendingSet.getCommits(), true);
            throw e;
        }
        return pendingSet;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void abortTask(TaskAttemptContext context) throws IOException {
        Path attemptPath = this.getTaskAttemptPath(context);
        try (DurationInfo d = new DurationInfo(LOG, "Abort task %s", context.getTaskAttemptID());){
            this.getCommitOperations().abortAllSinglePendingCommits(attemptPath, true);
        }
        finally {
            S3AUtils.deleteQuietly(attemptPath.getFileSystem(context.getConfiguration()), attemptPath, true);
        }
    }

    @Override
    protected Path getJobAttemptPath(int appAttemptId) {
        return CommitUtilsWithMR.getMagicJobAttemptPath(appAttemptId, this.getOutputPath());
    }

    @Override
    public Path getTaskAttemptPath(TaskAttemptContext context) {
        return CommitUtilsWithMR.getMagicTaskAttemptPath(context, this.getOutputPath());
    }

    @Override
    protected Path getBaseTaskAttemptPath(TaskAttemptContext context) {
        return CommitUtilsWithMR.getBaseMagicTaskAttemptPath(context, this.getOutputPath());
    }

    @Override
    public Path getTempTaskAttemptPath(TaskAttemptContext context) {
        return CommitUtilsWithMR.getTempTaskAttemptPath(context, this.getOutputPath());
    }
}

