/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.regionserver.LastSequenceId;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class SplitLogWorker
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(SplitLogWorker.class);
    Thread worker;
    private SplitLogWorkerCoordination coordination;
    private Configuration conf;
    private RegionServerServices server;

    public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server, TaskExecutor splitTaskExecutor) {
        this.server = server;
        this.conf = conf;
        this.coordination = hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination();
        this.coordination.init(server, conf, splitTaskExecutor, this);
    }

    public SplitLogWorker(Server hserver, final Configuration conf, final RegionServerServices server, final LastSequenceId sequenceIdChecker, final WALFactory factory) {
        this(hserver, conf, server, new TaskExecutor(){

            @Override
            public TaskExecutor.Status exec(String filename, CancelableProgressable p) {
                FileSystem fs;
                Path walDir;
                try {
                    walDir = FSUtils.getWALRootDir((Configuration)conf);
                    fs = walDir.getFileSystem(conf);
                }
                catch (IOException e) {
                    LOG.warn("could not find root dir or fs", (Throwable)e);
                    return TaskExecutor.Status.RESIGNED;
                }
                try {
                    if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, filename)), fs, conf, p, sequenceIdChecker, server.getCoordinatedStateManager().getSplitLogWorkerCoordination(), factory)) {
                        return TaskExecutor.Status.PREEMPTED;
                    }
                }
                catch (InterruptedIOException iioe) {
                    LOG.warn("log splitting of " + filename + " interrupted, resigning", (Throwable)iioe);
                    return TaskExecutor.Status.RESIGNED;
                }
                catch (IOException e) {
                    Throwable cause = e.getCause();
                    if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException || cause instanceof ConnectException || cause instanceof SocketTimeoutException)) {
                        LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, resigning", (Throwable)e);
                        return TaskExecutor.Status.RESIGNED;
                    }
                    if (cause instanceof InterruptedException) {
                        LOG.warn("log splitting of " + filename + " interrupted, resigning", (Throwable)e);
                        return TaskExecutor.Status.RESIGNED;
                    }
                    LOG.warn("log splitting of " + filename + " failed, returning error", (Throwable)e);
                    return TaskExecutor.Status.ERR;
                }
                return TaskExecutor.Status.DONE;
            }
        });
    }

    @Override
    public void run() {
        try {
            LOG.info("SplitLogWorker " + this.server.getServerName() + " starting");
            this.coordination.registerListener();
            boolean res = false;
            while (!res && !this.coordination.isStop()) {
                res = this.coordination.isReady();
            }
            if (!this.coordination.isStop()) {
                this.coordination.taskLoop();
            }
        }
        catch (Throwable t) {
            if (ExceptionUtil.isInterrupt((Throwable)t)) {
                LOG.info("SplitLogWorker interrupted. Exiting. " + (this.coordination.isStop() ? "" : " (ERROR: exitWorker is not set, exiting anyway)"));
            } else {
                LOG.error("unexpected error ", t);
            }
        }
        finally {
            this.coordination.removeListener();
            LOG.info("SplitLogWorker " + this.server.getServerName() + " exiting");
        }
    }

    public void stopTask() {
        LOG.info("Sending interrupt to stop the worker thread");
        this.worker.interrupt();
    }

    public void start() {
        this.worker = new Thread(null, this, "SplitLogWorker-" + this.server.getServerName().toShortString());
        this.worker.start();
    }

    public void stop() {
        this.coordination.stopProcessingTasks();
        this.stopTask();
    }

    @VisibleForTesting
    public int getTaskReadySeq() {
        return this.coordination.getTaskReadySeq();
    }

    public static interface TaskExecutor {
        public Status exec(String var1, CancelableProgressable var2);

        public static enum Status {
            DONE,
            ERR,
            RESIGNED,
            PREEMPTED;

        }
    }
}

