/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.wal;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.wal.EntryBuffers;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public abstract class OutputSink {
    private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class);
    protected WALSplitter.PipelineController controller;
    protected EntryBuffers entryBuffers;
    protected ConcurrentHashMap<String, WALSplitter.SinkWriter> writers = new ConcurrentHashMap();
    protected final ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum = new ConcurrentHashMap();
    protected final List<WriterThread> writerThreads = Lists.newArrayList();
    protected final Set<byte[]> blacklistedRegions = Collections.synchronizedSet(new TreeSet(Bytes.BYTES_COMPARATOR));
    protected boolean closeAndCleanCompleted = false;
    protected boolean writersClosed = false;
    protected final int numThreads;
    protected CancelableProgressable reporter = null;
    protected AtomicLong skippedEdits = new AtomicLong();
    protected List<Path> splits = null;
    protected MonitoredTask status = null;

    public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
        this.numThreads = numWriters;
        this.controller = controller;
        this.entryBuffers = entryBuffers;
    }

    void setReporter(CancelableProgressable reporter) {
        this.reporter = reporter;
    }

    void setStatus(MonitoredTask status) {
        this.status = status;
    }

    public synchronized void startWriterThreads() {
        for (int i = 0; i < this.numThreads; ++i) {
            WriterThread t = new WriterThread(this.controller, this.entryBuffers, this, i);
            t.start();
            this.writerThreads.add(t);
        }
    }

    public synchronized void restartWriterThreadsIfNeeded() {
        for (int i = 0; i < this.writerThreads.size(); ++i) {
            WriterThread t = this.writerThreads.get(i);
            if (t.isAlive()) continue;
            String threadName = t.getName();
            LOG.debug("Replacing dead thread: " + threadName);
            WriterThread newThread = new WriterThread(this.controller, this.entryBuffers, this, threadName);
            newThread.start();
            this.writerThreads.set(i, newThread);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) {
        ConcurrentHashMap<String, Long> concurrentHashMap = this.regionMaximumEditLogSeqNum;
        synchronized (concurrentHashMap) {
            String regionName = Bytes.toString((byte[])entry.getKey().getEncodedRegionName());
            Long currentMaxSeqNum = this.regionMaximumEditLogSeqNum.get(regionName);
            if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
                this.regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
            }
        }
    }

    int getNumOpenWriters() {
        return this.writers.size();
    }

    long getSkippedEdits() {
        return this.skippedEdits.get();
    }

    protected boolean finishWriting(boolean interrupt) throws IOException {
        LOG.debug("Waiting for split writer threads to finish");
        boolean progress_failed = false;
        for (WriterThread t : this.writerThreads) {
            t.finish();
        }
        if (interrupt) {
            for (WriterThread t : this.writerThreads) {
                t.interrupt();
            }
        }
        for (WriterThread t : this.writerThreads) {
            if (!progress_failed && this.reporter != null && !this.reporter.progress()) {
                progress_failed = true;
            }
            try {
                t.join();
            }
            catch (InterruptedException ie) {
                InterruptedIOException iie = new InterruptedIOException();
                iie.initCause(ie);
                throw iie;
            }
        }
        this.controller.checkForErrors();
        String msg = this.writerThreads.size() + " split writer threads finished";
        LOG.info(msg);
        this.updateStatusWithMsg(msg);
        return !progress_failed;
    }

    public abstract List<Path> finishWritingAndClose() throws IOException;

    public abstract Map<byte[], Long> getOutputCounts();

    public abstract int getNumberOfRecoveredRegions();

    public abstract void append(WALSplitter.RegionEntryBuffer var1) throws IOException;

    public boolean flush() throws IOException {
        return false;
    }

    public abstract boolean keepRegionEvent(WAL.Entry var1);

    protected final void updateStatusWithMsg(String msg) {
        if (this.status != null) {
            this.status.setStatus(msg);
        }
    }

    public static class WriterThread
    extends Thread {
        private volatile boolean shouldStop = false;
        private WALSplitter.PipelineController controller;
        private EntryBuffers entryBuffers;
        private OutputSink outputSink = null;

        WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i) {
            this(controller, entryBuffers, sink, Thread.currentThread().getName() + "-Writer-" + i);
        }

        WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, String threadName) {
            super(threadName);
            this.controller = controller;
            this.entryBuffers = entryBuffers;
            this.outputSink = sink;
        }

        @Override
        public void run() {
            try {
                this.doRun();
            }
            catch (Throwable t) {
                LOG.error("Exiting thread", t);
                this.controller.writerThreadError(t);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void doRun() throws IOException {
            LOG.trace("Writer thread starting");
            while (true) {
                WALSplitter.RegionEntryBuffer buffer;
                if ((buffer = this.entryBuffers.getChunkToWrite()) == null) {
                    Object object = this.controller.dataAvailable;
                    synchronized (object) {
                        block12: {
                            if (this.shouldStop && !this.outputSink.flush()) {
                                return;
                            }
                            try {
                                this.controller.dataAvailable.wait(500L);
                            }
                            catch (InterruptedException ie) {
                                if (this.shouldStop) break block12;
                                throw new RuntimeException(ie);
                            }
                        }
                    }
                }
                assert (buffer != null);
                try {
                    this.writeBuffer(buffer);
                    continue;
                }
                finally {
                    this.entryBuffers.doneWriting(buffer);
                    continue;
                }
                break;
            }
        }

        private void writeBuffer(WALSplitter.RegionEntryBuffer buffer) throws IOException {
            this.outputSink.append(buffer);
        }

        void setShouldStop(boolean shouldStop) {
            this.shouldStop = shouldStop;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void finish() {
            Object object = this.controller.dataAvailable;
            synchronized (object) {
                this.shouldStop = true;
                this.controller.dataAvailable.notifyAll();
            }
        }
    }
}

