/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceFactory;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class ReplicationSourceManager
implements ReplicationListener {
    private static final Log LOG = LogFactory.getLog(ReplicationSourceManager.class);
    private final List<ReplicationSourceInterface> sources;
    private final List<ReplicationSourceInterface> oldsources;
    private final ReplicationQueues replicationQueues;
    private final ReplicationTracker replicationTracker;
    private final ReplicationPeers replicationPeers;
    private final UUID clusterId;
    private final Server server;
    private final Map<String, Map<String, SortedSet<String>>> walsById;
    private final Map<String, Map<String, SortedSet<String>>> walsByIdRecoveredQueues;
    private final Configuration conf;
    private final FileSystem fs;
    private Set<Path> latestPaths;
    private final Path logDir;
    private final Path oldLogDir;
    private final WALFileLengthProvider walFileLengthProvider;
    private final long sleepBeforeFailover;
    private final ThreadPoolExecutor executor;
    private final boolean replicationForBulkLoadDataEnabled;
    private Connection connection;
    private long replicationWaitTime;
    private AtomicLong totalBufferUsed = new AtomicLong();

    public ReplicationSourceManager(ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId, WALFileLengthProvider walFileLengthProvider) throws IOException {
        this.sources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
        this.replicationQueues = replicationQueues;
        this.replicationPeers = replicationPeers;
        this.replicationTracker = replicationTracker;
        this.server = server;
        this.walsById = new HashMap<String, Map<String, SortedSet<String>>>();
        this.walsByIdRecoveredQueues = new ConcurrentHashMap<String, Map<String, SortedSet<String>>>();
        this.oldsources = new CopyOnWriteArrayList<ReplicationSourceInterface>();
        this.conf = conf;
        this.fs = fs;
        this.logDir = logDir;
        this.oldLogDir = oldLogDir;
        this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 30000L);
        this.clusterId = clusterId;
        this.walFileLengthProvider = walFileLengthProvider;
        this.replicationTracker.registerListener((ReplicationListener)this);
        this.replicationPeers.getAllPeerIds();
        int nbWorkers = conf.getInt("replication.executor.workers", 1);
        this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
        tfb.setNameFormat("ReplicationExecutor-%d");
        tfb.setDaemon(true);
        this.executor.setThreadFactory(tfb.build());
        this.latestPaths = new HashSet<Path>();
        this.replicationForBulkLoadDataEnabled = conf.getBoolean("hbase.replication.bulkload.enabled", false);
        this.replicationWaitTime = conf.getLong("hbase.serial.replication.waitingMs", 10000L);
        this.connection = ConnectionFactory.createConnection((Configuration)conf);
    }

    public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered, boolean holdLogInZK) {
        String fileName = log.getName();
        this.replicationQueues.setLogPosition(id, fileName, position);
        if (holdLogInZK) {
            return;
        }
        this.cleanOldLogs(fileName, id, queueRecovered);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cleanOldLogs(String key, String id, boolean queueRecovered) {
        String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(key);
        if (queueRecovered) {
            SortedSet<String> wals = this.walsByIdRecoveredQueues.get(id).get(logPrefix);
            if (wals != null && !wals.first().equals(key)) {
                this.cleanOldLogs(wals, key, id);
            }
        } else {
            Map<String, Map<String, SortedSet<String>>> map = this.walsById;
            synchronized (map) {
                SortedSet<String> wals = this.walsById.get(id).get(logPrefix);
                if (wals != null && !wals.first().equals(key)) {
                    this.cleanOldLogs(wals, key, id);
                }
            }
        }
    }

    private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
        SortedSet<String> walSet = wals.headSet(key);
        LOG.debug((Object)("Removing " + walSet.size() + " logs in the list: " + walSet));
        for (String wal : walSet) {
            this.replicationQueues.removeLog(id, wal);
        }
        walSet.clear();
    }

    void init() throws IOException, ReplicationException {
        for (String id : this.replicationPeers.getConnectedPeerIds()) {
            this.addSource(id);
            if (!this.replicationForBulkLoadDataEnabled) continue;
            this.replicationQueues.addPeerToHFileRefs(id);
        }
        AdoptAbandonedQueuesWorker adoptionWorker = new AdoptAbandonedQueuesWorker();
        try {
            this.executor.execute(adoptionWorker);
        }
        catch (RejectedExecutionException ex) {
            LOG.info((Object)("Cancelling the adoption of abandoned queues because of " + ex.getMessage()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException {
        ReplicationPeerConfig peerConfig = this.replicationPeers.getReplicationPeerConfig(id);
        ReplicationPeer peer = this.replicationPeers.getConnectedPeer(id);
        ReplicationSourceInterface src = this.getReplicationSource(this.conf, this.fs, this, this.replicationQueues, this.replicationPeers, this.server, id, this.clusterId, peerConfig, peer, this.walFileLengthProvider);
        Map<String, Map<String, SortedSet<String>>> map = this.walsById;
        synchronized (map) {
            this.sources.add(src);
            HashMap walsByGroup = new HashMap();
            this.walsById.put(id, walsByGroup);
            Set<Path> set = this.latestPaths;
            synchronized (set) {
                if (this.latestPaths.size() > 0) {
                    for (Path logPath : this.latestPaths) {
                        String name = logPath.getName();
                        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(name);
                        TreeSet<String> logs = new TreeSet<String>();
                        logs.add(name);
                        walsByGroup.put(walPrefix, logs);
                        try {
                            this.replicationQueues.addLog(id, name);
                        }
                        catch (ReplicationException e) {
                            String message = "Cannot add log to queue when creating a new source, queueId=" + id + ", filename=" + name;
                            this.server.stop(message);
                            throw e;
                        }
                        src.enqueueLog(logPath);
                    }
                }
            }
        }
        src.startup();
        return src;
    }

    public void deleteSource(String peerId, boolean closeConnection) {
        this.replicationQueues.removeQueue(peerId);
        if (closeConnection) {
            this.replicationPeers.peerDisconnected(peerId);
        }
    }

    public void join() {
        this.executor.shutdown();
        for (ReplicationSourceInterface source : this.sources) {
            source.terminate("Region server is closing");
        }
    }

    @VisibleForTesting
    Map<String, Map<String, SortedSet<String>>> getWALs() {
        return Collections.unmodifiableMap(this.walsById);
    }

    @VisibleForTesting
    Map<String, Map<String, SortedSet<String>>> getWalsByIdRecoveredQueues() {
        return Collections.unmodifiableMap(this.walsByIdRecoveredQueues);
    }

    public List<ReplicationSourceInterface> getSources() {
        return this.sources;
    }

    public List<ReplicationSourceInterface> getOldSources() {
        return this.oldsources;
    }

    public ReplicationSourceInterface getSource(String peerId) {
        return this.getSources().stream().filter(s -> s.getPeerId().equals(peerId)).findFirst().orElse(null);
    }

    @VisibleForTesting
    List<String> getAllQueues() {
        return this.replicationQueues.getAllQueues();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void preLogRoll(Path newLog) throws IOException {
        this.recordLog(newLog);
        String logName = newLog.getName();
        String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
        Set<Path> set = this.latestPaths;
        synchronized (set) {
            Iterator<Path> iterator = this.latestPaths.iterator();
            while (iterator.hasNext()) {
                Path path = iterator.next();
                if (!path.getName().contains(logPrefix)) continue;
                iterator.remove();
                break;
            }
            this.latestPaths.add(newLog);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void recordLog(Path logPath) throws IOException {
        String logName = logPath.getName();
        String logPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(logName);
        Object object = this.replicationPeers;
        synchronized (object) {
            for (String string : this.replicationPeers.getConnectedPeerIds()) {
                try {
                    this.replicationQueues.addLog(string, logName);
                }
                catch (ReplicationException e) {
                    throw new IOException("Cannot add log to replication queue when creating a new source, queueId=" + string + ", filename=" + logName, e);
                }
            }
        }
        object = this.walsById;
        synchronized (object) {
            for (Map.Entry entry : this.walsById.entrySet()) {
                String peerId = (String)entry.getKey();
                Map walsByPrefix = (Map)entry.getValue();
                boolean existingPrefix = false;
                for (Map.Entry walsEntry : walsByPrefix.entrySet()) {
                    SortedSet wals = (SortedSet)walsEntry.getValue();
                    if (this.sources.isEmpty()) {
                        wals.clear();
                    }
                    if (!logPrefix.equals(walsEntry.getKey())) continue;
                    wals.add(logName);
                    existingPrefix = true;
                }
                if (existingPrefix) continue;
                LOG.debug((Object)("Start tracking logs for wal group " + logPrefix + " for peer " + peerId));
                TreeSet<String> wals = new TreeSet<String>();
                wals.add(logName);
                walsByPrefix.put(logPrefix, wals);
            }
        }
    }

    void postLogRoll(Path newLog) throws IOException {
        for (ReplicationSourceInterface source : this.sources) {
            source.enqueueLog(newLog);
        }
    }

    @VisibleForTesting
    public AtomicLong getTotalBufferUsed() {
        return this.totalBufferUsed;
    }

    private ReplicationSourceInterface getReplicationSource(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server, String peerId, UUID clusterId, ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer, WALFileLengthProvider walFileLengthProvider) throws IOException {
        RegionServerCoprocessorHost rsServerHost = null;
        TableDescriptors tableDescriptors = null;
        if (server instanceof HRegionServer) {
            rsServerHost = ((HRegionServer)server).getRegionServerCoprocessorHost();
            tableDescriptors = ((HRegionServer)server).getTableDescriptors();
        }
        ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, peerId);
        ReplicationEndpoint replicationEndpoint = null;
        try {
            ReplicationEndpoint newReplicationEndPoint;
            String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
            if (replicationEndpointImpl == null) {
                replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
            }
            Class<?> c = Class.forName(replicationEndpointImpl);
            replicationEndpoint = (ReplicationEndpoint)c.newInstance();
            if (rsServerHost != null && (newReplicationEndPoint = rsServerHost.postCreateReplicationEndPoint(replicationEndpoint)) != null) {
                replicationEndpoint = newReplicationEndPoint;
            }
        }
        catch (Exception e) {
            LOG.warn((Object)("Passed replication endpoint implementation throws errors while initializing ReplicationSource for peer: " + peerId), (Throwable)e);
            throw new IOException(e);
        }
        MetricsSource metrics = new MetricsSource(peerId);
        src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, clusterId, replicationEndpoint, walFileLengthProvider, metrics);
        replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(), fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server));
        return src;
    }

    private void transferQueues(String rsZnode) {
        NodeFailoverWorker transfer = new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers, this.clusterId);
        try {
            this.executor.execute(transfer);
        }
        catch (RejectedExecutionException ex) {
            LOG.info((Object)("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage()));
        }
    }

    public void closeRecoveredQueue(ReplicationSourceInterface src) {
        LOG.info((Object)("Done with the recovered queue " + src.getPeerClusterZnode()));
        if (src instanceof ReplicationSource) {
            ((ReplicationSource)src).getSourceMetrics().clear();
        }
        this.oldsources.remove(src);
        this.deleteSource(src.getPeerClusterZnode(), false);
        this.walsByIdRecoveredQueues.remove(src.getPeerClusterZnode());
    }

    public void closeQueue(ReplicationSourceInterface src) {
        LOG.info((Object)("Done with the queue " + src.getPeerClusterZnode()));
        src.getSourceMetrics().clear();
        this.sources.remove(src);
        this.deleteSource(src.getPeerClusterZnode(), true);
        this.walsById.remove(src.getPeerClusterZnode());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void removePeer(String id) {
        LOG.info((Object)("Closing the following queue " + id + ", currently have " + this.sources.size() + " and another " + this.oldsources.size() + " that were recovered"));
        String terminateMessage = "Replication stream was removed by a user";
        ArrayList<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<ReplicationSourceInterface>();
        List<ReplicationSourceInterface> list = this.oldsources;
        synchronized (list) {
            for (ReplicationSourceInterface src : this.oldsources) {
                if (!id.equals(src.getPeerId())) continue;
                oldSourcesToDelete.add(src);
            }
            for (ReplicationSourceInterface src : oldSourcesToDelete) {
                src.terminate(terminateMessage);
                this.closeRecoveredQueue(src);
            }
        }
        LOG.info((Object)("Number of deleted recovered sources for " + id + ": " + oldSourcesToDelete.size()));
        ArrayList<ReplicationSourceInterface> srcToRemove = new ArrayList<ReplicationSourceInterface>();
        Iterator<ReplicationSourceInterface> iterator = this.replicationPeers;
        synchronized (iterator) {
            for (ReplicationSourceInterface src : this.sources) {
                if (!id.equals(src.getPeerId())) continue;
                srcToRemove.add(src);
            }
            if (srcToRemove.isEmpty()) {
                LOG.error((Object)("The peer we wanted to remove is missing a ReplicationSourceInterface. This could mean that ReplicationSourceInterface initialization failed for this peer and that replication on this peer may not be caught up. peerId=" + id));
            }
            for (ReplicationSourceInterface toRemove : srcToRemove) {
                toRemove.terminate(terminateMessage);
                this.closeQueue(toRemove);
            }
            this.deleteSource(id, true);
        }
    }

    public void regionServerRemoved(String regionserver) {
        this.transferQueues(regionserver);
    }

    public void peerRemoved(String peerId) {
        this.removePeer(peerId);
        this.replicationQueues.removePeerFromHFileRefs(peerId);
    }

    public void peerListChanged(List<String> peerIds) {
        for (String id : peerIds) {
            try {
                boolean added = this.replicationPeers.peerConnected(id);
                if (!added) continue;
                this.addSource(id);
                if (!this.replicationForBulkLoadDataEnabled) continue;
                this.replicationQueues.addPeerToHFileRefs(id);
            }
            catch (Exception e) {
                LOG.error((Object)"Error while adding a new peer", (Throwable)e);
            }
        }
    }

    public Path getOldLogDir() {
        return this.oldLogDir;
    }

    public Path getLogDir() {
        return this.logDir;
    }

    public FileSystem getFs() {
        return this.fs;
    }

    public Connection getConnection() {
        return this.connection;
    }

    public ReplicationPeers getReplicationPeers() {
        return this.replicationPeers;
    }

    public String getStats() {
        StringBuffer stats = new StringBuffer();
        for (ReplicationSourceInterface source : this.sources) {
            stats.append("Normal source for cluster " + source.getPeerId() + ": ");
            stats.append(source.getStats() + "\n");
        }
        for (ReplicationSourceInterface oldSource : this.oldsources) {
            stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerId() + ": ");
            stats.append(oldSource.getStats() + "\n");
        }
        return stats.toString();
    }

    public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs) throws ReplicationException {
        for (ReplicationSourceInterface source : this.sources) {
            source.addHFileRefs(tableName, family, pairs);
        }
    }

    public void cleanUpHFileRefs(String peerId, List<String> files) {
        this.replicationQueues.removeHFileRefs(peerId, files);
    }

    void waitUntilCanBePushed(byte[] encodedName, long seq, String peerId) throws IOException, InterruptedException {
        List barriers = MetaTableAccessor.getReplicationBarriers((Connection)this.connection, (byte[])encodedName);
        if (barriers.isEmpty() || seq <= (Long)barriers.get(0)) {
            return;
        }
        int interval = Collections.binarySearch(barriers, seq);
        if (interval < 0) {
            interval = -interval - 1;
        }
        if (interval == 1) {
            String parentValue = MetaTableAccessor.getSerialReplicationParentRegion((Connection)this.connection, (byte[])encodedName);
            if (parentValue == null) {
                return;
            }
            while (true) {
                String[] parentRegions;
                boolean allParentDone = true;
                for (String parent : parentRegions = parentValue.split(",")) {
                    byte[] region = Bytes.toBytes((String)parent);
                    long pos = MetaTableAccessor.getReplicationPositionForOnePeer((Connection)this.connection, (byte[])region, (String)peerId);
                    List parentBarriers = MetaTableAccessor.getReplicationBarriers((Connection)this.connection, (byte[])region);
                    if (parentBarriers.size() <= 0 || (Long)parentBarriers.get(parentBarriers.size() - 1) - 1L <= pos) continue;
                    allParentDone = false;
                    LOG.info((Object)(Bytes.toString((byte[])encodedName) + " can not start pushing because parent region's log has not been fully pushed: parent=" + Bytes.toString((byte[])region) + " pos=" + pos + " barriers=" + Arrays.toString(barriers.toArray())));
                    break;
                }
                if (allParentDone) {
                    return;
                }
                Thread.sleep(this.replicationWaitTime);
            }
        }
        while (true) {
            long pos;
            if (seq <= (pos = MetaTableAccessor.getReplicationPositionForOnePeer((Connection)this.connection, (byte[])encodedName, (String)peerId))) {
                // empty if block
            }
            if (pos >= 0L) {
                int posInterval = Collections.binarySearch(barriers, pos);
                if (posInterval < 0) {
                    posInterval = -posInterval - 1;
                }
                if (posInterval == interval || pos == (Long)barriers.get(interval - 1) - 1L) {
                    return;
                }
            }
            LOG.info((Object)(Bytes.toString((byte[])encodedName) + " can not start pushing to peer " + peerId + " because previous log has not been pushed: sequence=" + seq + " pos=" + pos + " barriers=" + Arrays.toString(barriers.toArray())));
            Thread.sleep(this.replicationWaitTime);
        }
    }

    class AdoptAbandonedQueuesWorker
    extends Thread {
        @Override
        public void run() {
            List currentReplicators = ReplicationSourceManager.this.replicationQueues.getListOfReplicators();
            if (currentReplicators == null || currentReplicators.isEmpty()) {
                return;
            }
            List otherRegionServers = ReplicationSourceManager.this.replicationTracker.getListOfRegionServers();
            LOG.info((Object)("Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers));
            for (String rs : currentReplicators) {
                if (otherRegionServers.contains(rs)) continue;
                ReplicationSourceManager.this.transferQueues(rs);
            }
        }
    }

    class NodeFailoverWorker
    extends Thread {
        private String rsZnode;
        private final ReplicationQueues rq;
        private final ReplicationPeers rp;
        private final UUID clusterId;

        public NodeFailoverWorker(String rsZnode) {
            this(rsZnode, this$0.replicationQueues, this$0.replicationPeers, this$0.clusterId);
        }

        public NodeFailoverWorker(String rsZnode, ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, UUID clusterId) {
            super("Failover-for-" + rsZnode);
            this.rsZnode = rsZnode;
            this.rq = replicationQueues;
            this.rp = replicationPeers;
            this.clusterId = clusterId;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (this.rq.isThisOurRegionServer(this.rsZnode)) {
                return;
            }
            try {
                Thread.sleep(ReplicationSourceManager.this.sleepBeforeFailover + (long)(ThreadLocalRandom.current().nextFloat() * (float)ReplicationSourceManager.this.sleepBeforeFailover));
            }
            catch (InterruptedException e) {
                LOG.warn((Object)"Interrupted while waiting before transferring a queue.");
                Thread.currentThread().interrupt();
            }
            if (ReplicationSourceManager.this.server.isStopped()) {
                LOG.info((Object)"Not transferring queue since we are shutting down");
                return;
            }
            HashMap<Object, Object> newQueues = new HashMap<Object, Object>();
            List peers = this.rq.getUnClaimedQueueIds(this.rsZnode);
            while (peers != null && !peers.isEmpty()) {
                Pair peer = this.rq.claimQueue(this.rsZnode, (String)peers.get(ThreadLocalRandom.current().nextInt(peers.size())));
                long sleep = ReplicationSourceManager.this.sleepBeforeFailover / 2L;
                if (peer != null) {
                    newQueues.put(peer.getFirst(), peer.getSecond());
                    sleep = ReplicationSourceManager.this.sleepBeforeFailover;
                }
                try {
                    Thread.sleep(sleep);
                }
                catch (InterruptedException e) {
                    LOG.warn((Object)"Interrupted while waiting before transferring a queue.");
                    Thread.currentThread().interrupt();
                }
                peers = this.rq.getUnClaimedQueueIds(this.rsZnode);
            }
            if (peers != null) {
                this.rq.removeReplicatorIfQueueIsEmpty(this.rsZnode);
            }
            if (newQueues.isEmpty()) {
                return;
            }
            for (Map.Entry entry : newQueues.entrySet()) {
                String peerId = (String)entry.getKey();
                Set walsSet = (Set)entry.getValue();
                try {
                    ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
                    String actualPeerId = replicationQueueInfo.getPeerId();
                    ReplicationPeer peer = ReplicationSourceManager.this.replicationPeers.getConnectedPeer(actualPeerId);
                    ReplicationPeerConfig peerConfig = null;
                    try {
                        peerConfig = ReplicationSourceManager.this.replicationPeers.getReplicationPeerConfig(actualPeerId);
                    }
                    catch (ReplicationException ex) {
                        LOG.warn((Object)("Received exception while getting replication peer config, skipping replay" + (Object)((Object)ex)));
                    }
                    if (peer == null || peerConfig == null) {
                        LOG.warn((Object)("Skipping failover for peer:" + actualPeerId + " of node" + this.rsZnode));
                        ReplicationSourceManager.this.replicationQueues.removeQueue(peerId);
                        continue;
                    }
                    HashMap<String, TreeSet<String>> walsByGroup = new HashMap<String, TreeSet<String>>();
                    ReplicationSourceManager.this.walsByIdRecoveredQueues.put(peerId, walsByGroup);
                    for (String wal : walsSet) {
                        String walPrefix = AbstractFSWALProvider.getWALPrefixFromWALName(wal);
                        TreeSet<String> wals = (TreeSet<String>)walsByGroup.get(walPrefix);
                        if (wals == null) {
                            wals = new TreeSet<String>();
                            walsByGroup.put(walPrefix, wals);
                        }
                        wals.add(wal);
                    }
                    ReplicationSourceInterface src = ReplicationSourceManager.this.getReplicationSource(ReplicationSourceManager.this.conf, ReplicationSourceManager.this.fs, ReplicationSourceManager.this, this.rq, this.rp, ReplicationSourceManager.this.server, peerId, this.clusterId, peerConfig, peer, ReplicationSourceManager.this.walFileLengthProvider);
                    List list = ReplicationSourceManager.this.oldsources;
                    synchronized (list) {
                        if (!this.rp.getConnectedPeerIds().contains(src.getPeerId())) {
                            src.terminate("Recovered queue doesn't belong to any current peer");
                            ReplicationSourceManager.this.closeRecoveredQueue(src);
                            continue;
                        }
                        ReplicationSourceManager.this.oldsources.add(src);
                        for (String wal : walsSet) {
                            src.enqueueLog(new Path(ReplicationSourceManager.this.oldLogDir, wal));
                        }
                        src.startup();
                    }
                }
                catch (IOException e) {
                    LOG.error((Object)"Failed creating a source", (Throwable)e);
                }
            }
        }
    }
}

