/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.regionserver;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil;
import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.FlushType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Counter;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;

@InterfaceAudience.Private
class MemStoreFlusher
implements FlushRequester {
    static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
    private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue<FlushQueueEntry>();
    private final Map<HRegion, FlushRegionEntry> regionsInQueue = new HashMap<HRegion, FlushRegionEntry>();
    private AtomicBoolean wakeupPending = new AtomicBoolean();
    private final long threadWakeFrequency;
    private final HRegionServer server;
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final Object blockSignal = new Object();
    protected long globalMemStoreLimit;
    protected float globalMemStoreLimitLowMarkPercent;
    protected long globalMemStoreLimitLowMark;
    private long blockingWaitTime;
    private final Counter updatesBlockedMsHighWater = new Counter();
    private final FlushHandler[] flushHandlers;
    private List<FlushRequestListener> flushRequestListeners = new ArrayList<FlushRequestListener>(1);

    public MemStoreFlusher(Configuration conf, HRegionServer server) {
        this.server = server;
        this.threadWakeFrequency = conf.getLong("hbase.server.thread.wakefrequency", 10000L);
        long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
        float globalMemStorePercent = HeapMemorySizeUtil.getGlobalMemStorePercent((Configuration)conf, (boolean)true);
        this.globalMemStoreLimit = (long)((float)max * globalMemStorePercent);
        this.globalMemStoreLimitLowMarkPercent = HeapMemorySizeUtil.getGlobalMemStoreLowerMark((Configuration)conf, (float)globalMemStorePercent);
        this.globalMemStoreLimitLowMark = (long)((float)this.globalMemStoreLimit * this.globalMemStoreLimitLowMarkPercent);
        this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000);
        int handlerCount = conf.getInt("hbase.hstore.flusher.count", 2);
        this.flushHandlers = new FlushHandler[handlerCount];
        LOG.info((Object)("globalMemStoreLimit=" + StringUtils.humanReadableInt((long)this.globalMemStoreLimit) + ", globalMemStoreLimitLowMark=" + StringUtils.humanReadableInt((long)this.globalMemStoreLimitLowMark) + ", maxHeap=" + StringUtils.humanReadableInt((long)max)));
    }

    public Counter getUpdatesBlockedMsHighWater() {
        return this.updatesBlockedMsHighWater;
    }

    private boolean flushOneForGlobalPressure() {
        SortedMap<Long, HRegion> regionsBySize = this.server.getCopyOfOnlineRegionsSortedBySize();
        HashSet<HRegion> excludedRegions = new HashSet<HRegion>();
        boolean flushedOne = false;
        while (!flushedOne) {
            HRegion regionToFlush;
            HRegion bestFlushableRegion = this.getBiggestMemstoreRegion(regionsBySize, excludedRegions, true);
            HRegion bestAnyRegion = this.getBiggestMemstoreRegion(regionsBySize, excludedRegions, false);
            if (bestAnyRegion == null) {
                LOG.error((Object)"Above memory mark but there are no flushable regions!");
                return false;
            }
            if (bestFlushableRegion != null && bestAnyRegion.memstoreSize.get() > 2L * bestFlushableRegion.memstoreSize.get()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Under global heap pressure: Region " + bestAnyRegion.getRegionNameAsString() + " has too many " + "store files, but is " + StringUtils.humanReadableInt((long)bestAnyRegion.memstoreSize.get()) + " vs best flushable region's " + StringUtils.humanReadableInt((long)bestFlushableRegion.memstoreSize.get()) + ". Choosing the bigger."));
                }
                regionToFlush = bestAnyRegion;
            } else {
                regionToFlush = bestFlushableRegion == null ? bestAnyRegion : bestFlushableRegion;
            }
            Preconditions.checkState((regionToFlush.memstoreSize.get() > 0L ? 1 : 0) != 0);
            LOG.info((Object)("Flush of region " + regionToFlush + " due to global heap pressure"));
            flushedOne = this.flushRegion(regionToFlush, true);
            if (flushedOne) continue;
            LOG.info((Object)("Excluding unflushable region " + regionToFlush + " - trying to find a different region to flush."));
            excludedRegions.add(regionToFlush);
        }
        return true;
    }

    private void wakeupFlushThread() {
        if (this.wakeupPending.compareAndSet(false, true)) {
            this.flushQueue.add(new WakeupFlushThread());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private HRegion getBiggestMemstoreRegion(SortedMap<Long, HRegion> regionsBySize, Set<HRegion> excludedRegions, boolean checkStoreFileCount) {
        Map<HRegion, FlushRegionEntry> map = this.regionsInQueue;
        synchronized (map) {
            for (HRegion region : regionsBySize.values()) {
                if (excludedRegions.contains(region) || region.writestate.flushing || !region.writestate.writesEnabled || checkStoreFileCount && this.isTooManyStoreFiles(region)) continue;
                return region;
            }
        }
        return null;
    }

    private boolean isAboveHighWaterMark() {
        return this.server.getRegionServerAccounting().getGlobalMemstoreSize() >= this.globalMemStoreLimit;
    }

    private boolean isAboveLowWaterMark() {
        return this.server.getRegionServerAccounting().getGlobalMemstoreSize() >= this.globalMemStoreLimitLowMark;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestFlush(HRegion r) {
        Map<HRegion, FlushRegionEntry> map = this.regionsInQueue;
        synchronized (map) {
            if (!this.regionsInQueue.containsKey(r)) {
                FlushRegionEntry fqe = new FlushRegionEntry(r);
                this.regionsInQueue.put(r, fqe);
                this.flushQueue.add(fqe);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestDelayedFlush(HRegion r, long delay) {
        Map<HRegion, FlushRegionEntry> map = this.regionsInQueue;
        synchronized (map) {
            if (!this.regionsInQueue.containsKey(r)) {
                FlushRegionEntry fqe = new FlushRegionEntry(r);
                fqe.requeue(delay);
                this.regionsInQueue.put(r, fqe);
                this.flushQueue.add(fqe);
            }
        }
    }

    public int getFlushQueueSize() {
        return this.flushQueue.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void interruptIfNecessary() {
        this.lock.writeLock().lock();
        try {
            for (FlushHandler flushHander : this.flushHandlers) {
                if (flushHander == null) continue;
                flushHander.interrupt();
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    synchronized void start(Thread.UncaughtExceptionHandler eh) {
        ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory((String)(this.server.getServerName().toShortString() + "-MemStoreFlusher"), (Thread.UncaughtExceptionHandler)eh);
        for (int i = 0; i < this.flushHandlers.length; ++i) {
            this.flushHandlers[i] = new FlushHandler("MemStoreFlusher." + i);
            flusherThreadFactory.newThread((Runnable)((Object)this.flushHandlers[i]));
            this.flushHandlers[i].start();
        }
    }

    boolean isAlive() {
        for (FlushHandler flushHander : this.flushHandlers) {
            if (flushHander == null || !flushHander.isAlive()) continue;
            return true;
        }
        return false;
    }

    void join() {
        for (FlushHandler flushHander : this.flushHandlers) {
            if (flushHander == null) continue;
            Threads.shutdown((Thread)flushHander.getThread());
        }
    }

    private boolean flushRegion(FlushRegionEntry fqe) {
        HRegion region = fqe.region;
        if (!region.getRegionInfo().isMetaRegion() && this.isTooManyStoreFiles(region)) {
            if (fqe.isMaximumWait(this.blockingWaitTime)) {
                LOG.info((Object)("Waited " + (EnvironmentEdgeManager.currentTime() - fqe.createTime) + "ms on a compaction to clean up 'too many store files'; waited " + "long enough... proceeding with flush of " + region.getRegionNameAsString()));
            } else {
                if (fqe.getRequeueCount() <= 0) {
                    LOG.warn((Object)("Region " + region.getRegionNameAsString() + " has too many " + "store files; delaying flush up to " + this.blockingWaitTime + "ms"));
                    if (!this.server.compactSplitThread.requestSplit(region)) {
                        try {
                            this.server.compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
                        }
                        catch (IOException e) {
                            LOG.error((Object)("Cache flush failed for region " + Bytes.toStringBinary((byte[])region.getRegionName())), (Throwable)RemoteExceptionHandler.checkIOException((IOException)e));
                        }
                    }
                }
                this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100L));
                return true;
            }
        }
        return this.flushRegion(region, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean flushRegion(HRegion region, boolean emergencyFlush) {
        long startTime = 0L;
        Map<HRegion, FlushRegionEntry> map = this.regionsInQueue;
        synchronized (map) {
            FlushRegionEntry fqe = this.regionsInQueue.remove(region);
            if (fqe != null) {
                startTime = fqe.createTime;
            }
            if (fqe != null && emergencyFlush) {
                this.flushQueue.remove(fqe);
            }
        }
        if (startTime == 0L) {
            startTime = EnvironmentEdgeManager.currentTime();
        }
        this.lock.readLock().lock();
        try {
            boolean shouldSplit;
            this.notifyFlushRequest(region, emergencyFlush);
            HRegion.FlushResult flushResult = region.flushcache();
            boolean shouldCompact = flushResult.isCompactionNeeded();
            boolean bl = shouldSplit = region.checkSplit() != null;
            if (shouldSplit) {
                this.server.compactSplitThread.requestSplit(region);
            } else if (shouldCompact) {
                this.server.compactSplitThread.requestSystemCompaction(region, Thread.currentThread().getName());
            }
            if (flushResult.isFlushSucceeded()) {
                long endTime = EnvironmentEdgeManager.currentTime();
                this.server.metricsRegionServer.updateFlushTime(endTime - startTime);
            }
        }
        catch (DroppedSnapshotException ex) {
            this.server.abort("Replay of WAL required. Forcing server shutdown", ex);
            boolean bl = false;
            return bl;
        }
        catch (IOException ex) {
            LOG.error((Object)("Cache flush failed" + (region != null ? " for region " + Bytes.toStringBinary((byte[])region.getRegionName()) : "")), (Throwable)RemoteExceptionHandler.checkIOException((IOException)ex));
            if (!this.server.checkFileSystem()) {
                boolean bl = false;
                return bl;
            }
        }
        finally {
            this.lock.readLock().unlock();
            this.wakeUpIfBlocking();
        }
        return true;
    }

    private void notifyFlushRequest(HRegion region, boolean emergencyFlush) {
        FlushType type = FlushType.NORMAL;
        if (emergencyFlush) {
            type = this.isAboveHighWaterMark() ? FlushType.ABOVE_HIGHER_MARK : FlushType.ABOVE_LOWER_MARK;
        }
        for (FlushRequestListener listener : this.flushRequestListeners) {
            listener.flushRequested(type, region);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void wakeUpIfBlocking() {
        Object object = this.blockSignal;
        synchronized (object) {
            this.blockSignal.notifyAll();
        }
    }

    private boolean isTooManyStoreFiles(HRegion region) {
        for (Store store : region.stores.values()) {
            if (!store.hasTooManyStoreFiles()) continue;
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void reclaimMemStoreMemory() {
        TraceScope scope = Trace.startSpan((String)"MemStoreFluser.reclaimMemStoreMemory");
        if (this.isAboveHighWaterMark()) {
            if (Trace.isTracing()) {
                scope.getSpan().addTimelineAnnotation("Force Flush. We're above high water mark.");
            }
            long start = EnvironmentEdgeManager.currentTime();
            Object object = this.blockSignal;
            synchronized (object) {
                boolean blocked = false;
                long startTime = 0L;
                boolean interrupted = false;
                try {
                    while (this.isAboveHighWaterMark() && !this.server.isStopped()) {
                        if (!blocked) {
                            startTime = EnvironmentEdgeManager.currentTime();
                            LOG.info((Object)("Blocking updates on " + this.server.toString() + ": the global memstore size " + StringUtils.humanReadableInt((long)this.server.getRegionServerAccounting().getGlobalMemstoreSize()) + " is >= than blocking " + StringUtils.humanReadableInt((long)this.globalMemStoreLimit) + " size"));
                        }
                        blocked = true;
                        this.wakeupFlushThread();
                        try {
                            this.blockSignal.wait(5000L);
                        }
                        catch (InterruptedException ie) {
                            LOG.warn((Object)"Interrupted while waiting");
                            interrupted = true;
                        }
                        long took = EnvironmentEdgeManager.currentTime() - start;
                        LOG.warn((Object)("Memstore is above high water mark and block " + took + "ms"));
                    }
                }
                finally {
                    if (interrupted) {
                        Thread.currentThread().interrupt();
                    }
                }
                if (blocked) {
                    long totalTime = EnvironmentEdgeManager.currentTime() - startTime;
                    if (totalTime > 0L) {
                        this.updatesBlockedMsHighWater.add(totalTime);
                    }
                    LOG.info((Object)("Unblocking updates for server " + this.server.toString()));
                }
            }
        }
        if (this.isAboveLowWaterMark()) {
            this.wakeupFlushThread();
        }
        scope.close();
    }

    public String toString() {
        return "flush_queue=" + this.flushQueue.size();
    }

    public String dumpQueue() {
        StringBuilder queueList = new StringBuilder();
        queueList.append("Flush Queue Queue dump:\n");
        queueList.append("  Flush Queue:\n");
        Iterator it = this.flushQueue.iterator();
        while (it.hasNext()) {
            queueList.append("    " + ((FlushQueueEntry)it.next()).toString());
            queueList.append("\n");
        }
        return queueList.toString();
    }

    @Override
    public void registerFlushRequestListener(FlushRequestListener listener) {
        this.flushRequestListeners.add(listener);
    }

    @Override
    public boolean unregisterFlushRequestListener(FlushRequestListener listener) {
        return this.flushRequestListeners.remove(listener);
    }

    @Override
    public void setGlobalMemstoreLimit(long globalMemStoreSize) {
        this.globalMemStoreLimit = globalMemStoreSize;
        this.globalMemStoreLimitLowMark = (long)(this.globalMemStoreLimitLowMarkPercent * (float)globalMemStoreSize);
        this.reclaimMemStoreMemory();
    }

    public long getMemoryLimit() {
        return this.globalMemStoreLimit;
    }

    static class FlushRegionEntry
    implements FlushQueueEntry {
        private final HRegion region;
        private final long createTime;
        private long whenToExpire;
        private int requeueCount = 0;

        FlushRegionEntry(HRegion r) {
            this.region = r;
            this.whenToExpire = this.createTime = EnvironmentEdgeManager.currentTime();
        }

        public boolean isMaximumWait(long maximumWait) {
            return EnvironmentEdgeManager.currentTime() - this.createTime > maximumWait;
        }

        public int getRequeueCount() {
            return this.requeueCount;
        }

        public FlushRegionEntry requeue(long when) {
            this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
            ++this.requeueCount;
            return this;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed other) {
            int ret = Long.valueOf(this.getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS)).intValue();
            if (ret != 0) {
                return ret;
            }
            FlushQueueEntry otherEntry = (FlushQueueEntry)other;
            return this.hashCode() - otherEntry.hashCode();
        }

        public String toString() {
            return "[flush region " + Bytes.toStringBinary((byte[])this.region.getRegionName()) + "]";
        }

        public int hashCode() {
            int hash = (int)this.getDelay(TimeUnit.MILLISECONDS);
            return hash ^ this.region.hashCode();
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || this.getClass() != obj.getClass()) {
                return false;
            }
            Delayed other = (Delayed)obj;
            return this.compareTo(other) == 0;
        }
    }

    static class WakeupFlushThread
    implements FlushQueueEntry {
        WakeupFlushThread() {
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return 0L;
        }

        @Override
        public int compareTo(Delayed o) {
            return -1;
        }

        public boolean equals(Object obj) {
            return this == obj;
        }
    }

    static interface FlushQueueEntry
    extends Delayed {
    }

    private class FlushHandler
    extends HasThread {
        private FlushHandler(String name) {
            super(name);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            while (!MemStoreFlusher.this.server.isStopped()) {
                FlushQueueEntry fqe = null;
                try {
                    MemStoreFlusher.this.wakeupPending.set(false);
                    fqe = (FlushQueueEntry)MemStoreFlusher.this.flushQueue.poll(MemStoreFlusher.this.threadWakeFrequency, TimeUnit.MILLISECONDS);
                    if (fqe == null || fqe instanceof WakeupFlushThread) {
                        if (!MemStoreFlusher.this.isAboveLowWaterMark()) continue;
                        LOG.debug((Object)("Flush thread woke up because memory above low water=" + StringUtils.humanReadableInt((long)MemStoreFlusher.this.globalMemStoreLimitLowMark)));
                        if (!MemStoreFlusher.this.flushOneForGlobalPressure()) {
                            Thread.sleep(1000L);
                            MemStoreFlusher.this.wakeUpIfBlocking();
                        }
                        MemStoreFlusher.this.wakeupFlushThread();
                        continue;
                    }
                    FlushRegionEntry fre = (FlushRegionEntry)fqe;
                    if (MemStoreFlusher.this.flushRegion(fre)) continue;
                    break;
                }
                catch (InterruptedException ex) {
                }
                catch (ConcurrentModificationException ex) {
                }
                catch (Exception ex) {
                    LOG.error((Object)("Cache flusher failed for entry " + fqe), (Throwable)ex);
                    if (MemStoreFlusher.this.server.checkFileSystem()) continue;
                    break;
                }
            }
            Map map = MemStoreFlusher.this.regionsInQueue;
            synchronized (map) {
                MemStoreFlusher.this.regionsInQueue.clear();
                MemStoreFlusher.this.flushQueue.clear();
            }
            MemStoreFlusher.this.wakeUpIfBlocking();
            LOG.info((Object)(this.getName() + " exiting"));
        }
    }
}

