/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationGlobalSourceSource;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceImpl;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSource;
import org.apache.hadoop.hbase.replication.regionserver.MetricsReplicationSourceSourceImpl;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={ReplicationTests.class, MediumTests.class})
public class TestReplicationEndpoint
extends TestReplicationBase {
    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationEndpoint.class);
    private static final Logger LOG = LoggerFactory.getLogger(TestReplicationEndpoint.class);
    static int numRegionServers;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        TestReplicationBase.setUpBeforeClass();
        numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size();
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        TestReplicationBase.tearDownAfterClass();
        Assert.assertTrue((ReplicationEndpointForTest.stoppedCount.get() > 0 ? 1 : 0) != 0);
    }

    @Before
    public void setup() throws Exception {
        ReplicationEndpointForTest.contructedCount.set(0);
        ReplicationEndpointForTest.startedCount.set(0);
        ReplicationEndpointForTest.replicateCount.set(0);
        ReplicationEndpointReturningFalse.replicated.set(false);
        ReplicationEndpointForTest.lastEntries = null;
        final List<JVMClusterUtil.RegionServerThread> rsThreads = UTIL1.getMiniHBaseCluster().getRegionServerThreads();
        for (JVMClusterUtil.RegionServerThread rs : rsThreads) {
            UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName());
        }
        UTIL1.waitFor(3000L, (Waiter.Predicate)new Waiter.ExplainingPredicate<Exception>(){

            public boolean evaluate() throws Exception {
                for (JVMClusterUtil.RegionServerThread rs : rsThreads) {
                    if (rs.getRegionServer().walRollRequestFinished()) continue;
                    return false;
                }
                return true;
            }

            public String explainFailure() throws Exception {
                ArrayList<String> logRollInProgressRsList = new ArrayList<String>();
                for (JVMClusterUtil.RegionServerThread rs : rsThreads) {
                    if (rs.getRegionServer().walRollRequestFinished()) continue;
                    logRollInProgressRsList.add(rs.getRegionServer().toString());
                }
                return "Still waiting for log roll on regionservers: " + logRollInProgressRsList;
            }
        });
    }

    @Test
    public void testCustomReplicationEndpoint() throws Exception {
        admin.addPeer("testCustomReplicationEndpoint", new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey((Configuration)CONF1)).setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
        Waiter.waitFor((Configuration)CONF1, (long)60000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

            public boolean evaluate() throws Exception {
                return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
            }
        });
        Waiter.waitFor((Configuration)CONF1, (long)60000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

            public boolean evaluate() throws Exception {
                return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
            }
        });
        Assert.assertEquals((long)0L, (long)ReplicationEndpointForTest.replicateCount.get());
        this.doPut(Bytes.toBytes((String)"row42"));
        Waiter.waitFor((Configuration)CONF1, (long)60000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

            public boolean evaluate() throws Exception {
                return ReplicationEndpointForTest.replicateCount.get() >= 1;
            }
        });
        TestReplicationEndpoint.doAssert(Bytes.toBytes((String)"row42"));
        admin.removePeer("testCustomReplicationEndpoint");
    }

    @Test
    public void testReplicationEndpointReturnsFalseOnReplicate() throws Exception {
        Assert.assertEquals((long)0L, (long)ReplicationEndpointForTest.replicateCount.get());
        Assert.assertTrue((!ReplicationEndpointReturningFalse.replicated.get() ? 1 : 0) != 0);
        int peerCount = admin.getPeersCount();
        String id = "testReplicationEndpointReturnsFalseOnReplicate";
        admin.addPeer("testReplicationEndpointReturnsFalseOnReplicate", new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey((Configuration)CONF1)).setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
        if (admin.getPeersCount() <= peerCount) {
            LOG.info("Waiting on peercount to go up from " + peerCount);
            Threads.sleep((long)100L);
        }
        this.doPut(row);
        Waiter.waitFor((Configuration)CONF1, (long)60000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

            public boolean evaluate() throws Exception {
                int count = ReplicationEndpointForTest.replicateCount.get();
                LOG.info("count=" + count);
                return ReplicationEndpointReturningFalse.replicated.get();
            }
        });
        if (ReplicationEndpointReturningFalse.ex.get() != null) {
            throw ReplicationEndpointReturningFalse.ex.get();
        }
        admin.removePeer("testReplicationEndpointReturnsFalseOnReplicate");
    }

    @Test
    public void testInterClusterReplication() throws Exception {
        String id = "testInterClusterReplication";
        List<HRegion> regions = UTIL1.getHBaseCluster().getRegions(tableName);
        int totEdits = 0;
        for (HRegion region : regions) {
            RegionInfo hri = region.getRegionInfo();
            byte[] row = hri.getStartKey();
            for (int i = 0; i < 100; ++i) {
                if (row.length <= 0) continue;
                Put put = new Put(row);
                put.addColumn(famName, row, row);
                region.put(put);
                ++totEdits;
            }
        }
        admin.addPeer("testInterClusterReplication", new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey((Configuration)CONF2)).setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()), null);
        final int numEdits = totEdits;
        Waiter.waitFor((Configuration)CONF1, (long)60000L, (Waiter.Predicate)new Waiter.ExplainingPredicate<Exception>(){

            public boolean evaluate() throws Exception {
                return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
            }

            public String explainFailure() throws Exception {
                String failure = "Failed to replicate all edits, expected = " + numEdits + " replicated = " + InterClusterReplicationEndpointForTest.replicateCount.get();
                return failure;
            }
        });
        admin.removePeer("testInterClusterReplication");
        UTIL1.deleteTableData(tableName);
    }

    @Test
    public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
        ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey((Configuration)CONF1)).setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
        rpc.getConfiguration().put("hbase.replication.source.custom.walentryfilters", EverythingPassesWALEntryFilter.class.getName() + "," + EverythingPassesWALEntryFilterSubclass.class.getName());
        admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
        try (Connection connection = ConnectionFactory.createConnection((Configuration)CONF1);){
            this.doPut(connection, Bytes.toBytes((String)"row1"));
            this.doPut(connection, row);
            this.doPut(connection, Bytes.toBytes((String)"row2"));
        }
        Waiter.waitFor((Configuration)CONF1, (long)60000L, (Waiter.Predicate)new Waiter.Predicate<Exception>(){

            public boolean evaluate() throws Exception {
                return ReplicationEndpointForTest.replicateCount.get() >= 1;
            }
        });
        Assert.assertNull((Object)ReplicationEndpointWithWALEntryFilter.ex.get());
        Assert.assertTrue((boolean)EverythingPassesWALEntryFilter.hasPassedAnEntry());
        admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
    }

    @Test(expected=IOException.class)
    public void testWALEntryFilterAddValidation() throws Exception {
        ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey((Configuration)CONF1)).setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
        rpc.getConfiguration().put("hbase.replication.source.custom.walentryfilters", "IAmNotARealWalEntryFilter");
        admin.addPeer("testWALEntryFilterAddValidation", rpc);
    }

    @Test(expected=IOException.class)
    public void testWALEntryFilterUpdateValidation() throws Exception {
        ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey((Configuration)CONF1)).setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
        rpc.getConfiguration().put("hbase.replication.source.custom.walentryfilters", "IAmNotARealWalEntryFilter");
        admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc);
    }

    @Test
    public void testMetricsSourceBaseSourcePassthrough() {
        String id = "id";
        DynamicMetricsRegistry mockRegistry = (DynamicMetricsRegistry)Mockito.mock(DynamicMetricsRegistry.class);
        MetricsReplicationSourceImpl singleRms = (MetricsReplicationSourceImpl)Mockito.mock(MetricsReplicationSourceImpl.class);
        Mockito.when((Object)singleRms.getMetricsRegistry()).thenReturn((Object)mockRegistry);
        MetricsReplicationSourceImpl globalRms = (MetricsReplicationSourceImpl)Mockito.mock(MetricsReplicationSourceImpl.class);
        Mockito.when((Object)globalRms.getMetricsRegistry()).thenReturn((Object)mockRegistry);
        MetricsReplicationSourceSourceImpl singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
        MetricsReplicationGlobalSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
        MetricsReplicationSourceSource spyglobalSourceSource = (MetricsReplicationSourceSource)Mockito.spy((Object)globalSourceSource);
        ((MetricsReplicationSourceSource)Mockito.doNothing().when((Object)spyglobalSourceSource)).incrFailedRecoveryQueue();
        HashMap singleSourceSourceByTable = new HashMap();
        MetricsSource source = new MetricsSource(id, (MetricsReplicationSourceSource)singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable);
        String gaugeName = "gauge";
        String singleGaugeName = "source.id." + gaugeName;
        String globalGaugeName = "source." + gaugeName;
        long delta = 1L;
        String counterName = "counter";
        String singleCounterName = "source.id." + counterName;
        String globalCounterName = "source." + counterName;
        long count = 2L;
        source.decGauge(gaugeName, delta);
        source.getMetricsContext();
        source.getMetricsDescription();
        source.getMetricsJmxContext();
        source.getMetricsName();
        source.incCounters(counterName, count);
        source.incGauge(gaugeName, delta);
        source.init();
        source.removeMetric(gaugeName);
        source.setGauge(gaugeName, delta);
        source.updateHistogram(counterName, count);
        source.incrFailedRecoveryQueue();
        ((MetricsReplicationSourceImpl)Mockito.verify((Object)singleRms)).decGauge(singleGaugeName, delta);
        ((MetricsReplicationSourceImpl)Mockito.verify((Object)globalRms)).decGauge(globalGaugeName, delta);
        ((MetricsReplicationSourceImpl)Mockito.verify((Object)globalRms)).getMetricsContext();
        ((MetricsReplicationSourceImpl)Mockito.verify((Object)globalRms)).getMetricsJmxContext();
        ((MetricsReplicationSourceImpl)Mockito.verify((Object)globalRms)).getMetricsName();
        ((MetricsReplicationSourceImpl)Mockito.verify((Object)singleRms)).incCounters(singleCounterName, count);
        ((MetricsReplicationSourceImpl)Mockito.verify((Object)globalRms)).incCounters(globalCounterName, count);
        ((MetricsReplicationSourceImpl)Mockito.verify((Object)singleRms)).incGauge(singleGaugeName, delta);
        ((MetricsReplicationSourceImpl)Mockito.verify((Object)globalRms)).incGauge(globalGaugeName, delta);
        ((MetricsReplicationSourceImpl)Mockito.verify((Object)globalRms)).init();
        ((MetricsReplicationSourceImpl)Mockito.verify((Object)singleRms)).removeMetric(singleGaugeName);
        ((MetricsReplicationSourceImpl)Mockito.verify((Object)globalRms)).removeMetric(globalGaugeName);
        ((MetricsReplicationSourceImpl)Mockito.verify((Object)singleRms)).setGauge(singleGaugeName, delta);
        ((MetricsReplicationSourceImpl)Mockito.verify((Object)globalRms)).setGauge(globalGaugeName, delta);
        ((MetricsReplicationSourceImpl)Mockito.verify((Object)singleRms)).updateHistogram(singleCounterName, count);
        ((MetricsReplicationSourceImpl)Mockito.verify((Object)globalRms)).updateHistogram(globalCounterName, count);
        ((MetricsReplicationSourceSource)Mockito.verify((Object)spyglobalSourceSource)).incrFailedRecoveryQueue();
        boolean containsRandomNewTable = source.getSingleSourceSourceByTable().containsKey("RandomNewTable");
        Assert.assertEquals((Object)false, (Object)containsRandomNewTable);
        source.setAgeOfLastShippedOpByTable(123L, "RandomNewTable");
        containsRandomNewTable = source.getSingleSourceSourceByTable().containsKey("RandomNewTable");
        Assert.assertEquals((Object)true, (Object)containsRandomNewTable);
        MetricsReplicationSourceSource msr = (MetricsReplicationSourceSource)source.getSingleSourceSourceByTable().get("RandomNewTable");
        Assert.assertTrue((msr.getLastShippedAge() > 0L ? 1 : 0) != 0);
    }

    private void doPut(byte[] row) throws IOException {
        try (Connection connection = ConnectionFactory.createConnection((Configuration)CONF1);){
            this.doPut(connection, row);
        }
    }

    private void doPut(Connection connection, byte[] row) throws IOException {
        try (Table t = connection.getTable(tableName);){
            Put put = new Put(row);
            put.addColumn(famName, row, row);
            t.put(put);
        }
    }

    private static void doAssert(byte[] row) throws Exception {
        if (ReplicationEndpointForTest.lastEntries == null) {
            return;
        }
        Assert.assertEquals((long)1L, (long)ReplicationEndpointForTest.lastEntries.size());
        ArrayList cells = ReplicationEndpointForTest.lastEntries.get(0).getEdit().getCells();
        Assert.assertEquals((long)1L, (long)cells.size());
        Assert.assertTrue((boolean)Bytes.equals((byte[])((Cell)cells.get(0)).getRowArray(), (int)((Cell)cells.get(0)).getRowOffset(), (int)((Cell)cells.get(0)).getRowLength(), (byte[])row, (int)0, (int)row.length));
    }

    public static class EverythingPassesWALEntryFilterSubclass
    extends EverythingPassesWALEntryFilter {
    }

    public static class EverythingPassesWALEntryFilter
    implements WALEntryFilter {
        private static boolean passedEntry = false;

        public WAL.Entry filter(WAL.Entry entry) {
            passedEntry = true;
            return entry;
        }

        public static boolean hasPassedAnEntry() {
            return passedEntry;
        }
    }

    public static class ReplicationEndpointWithWALEntryFilter
    extends ReplicationEndpointForTest {
        static AtomicReference<Exception> ex = new AtomicReference<Object>(null);

        @Override
        public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
            try {
                super.replicate(replicateContext);
                TestReplicationEndpoint.doAssert(TestReplicationBase.row);
            }
            catch (Exception e) {
                ex.set(e);
            }
            return true;
        }

        public WALEntryFilter getWALEntryfilter() {
            return new ChainWALEntryFilter(new WALEntryFilter[]{super.getWALEntryfilter(), new WALEntryFilter(){

                public WAL.Entry filter(WAL.Entry entry) {
                    ArrayList cells = entry.getEdit().getCells();
                    int size = cells.size();
                    for (int i = size - 1; i >= 0; --i) {
                        Cell cell = (Cell)cells.get(i);
                        if (Bytes.equals((byte[])cell.getRowArray(), (int)cell.getRowOffset(), (int)cell.getRowLength(), (byte[])TestReplicationBase.row, (int)0, (int)TestReplicationBase.row.length)) continue;
                        cells.remove(i);
                    }
                    return entry;
                }
            }});
        }
    }

    public static class ReplicationEndpointReturningFalse
    extends ReplicationEndpointForTest {
        static int COUNT = 10;
        static AtomicReference<Exception> ex = new AtomicReference<Object>(null);
        static AtomicBoolean replicated = new AtomicBoolean(false);

        @Override
        public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
            try {
                TestReplicationEndpoint.doAssert(TestReplicationBase.row);
            }
            catch (Exception e) {
                ex.set(e);
            }
            super.replicate(replicateContext);
            LOG.info("Replicated " + Bytes.toString((byte[])TestReplicationBase.row) + ", count=" + replicateCount.get());
            replicated.set(replicateCount.get() > COUNT);
            return replicated.get();
        }
    }

    public static class InterClusterReplicationEndpointForTest
    extends HBaseInterClusterReplicationEndpoint {
        static AtomicInteger replicateCount = new AtomicInteger();
        static boolean failedOnce;

        public InterClusterReplicationEndpointForTest() {
            replicateCount.set(0);
        }

        public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
            boolean success = super.replicate(replicateContext);
            if (success) {
                replicateCount.addAndGet(replicateContext.entries.size());
            }
            return success;
        }

        protected Callable<Integer> createReplicator(List<WAL.Entry> entries, int ordinal, int timeout) {
            if (failedOnce) {
                return () -> ordinal;
            }
            failedOnce = true;
            return () -> {
                throw new IOException("Sample Exception: Failed to replicate.");
            };
        }
    }

    public static class ReplicationEndpointForTest
    extends BaseReplicationEndpoint {
        static UUID uuid = TestReplicationBase.UTIL1.getRandomUUID();
        static AtomicInteger contructedCount = new AtomicInteger();
        static AtomicInteger startedCount = new AtomicInteger();
        static AtomicInteger stoppedCount = new AtomicInteger();
        static AtomicInteger replicateCount = new AtomicInteger();
        static volatile List<WAL.Entry> lastEntries = null;

        public ReplicationEndpointForTest() {
            replicateCount.set(0);
            contructedCount.incrementAndGet();
        }

        public UUID getPeerUUID() {
            return uuid;
        }

        public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
            replicateCount.incrementAndGet();
            lastEntries = new ArrayList<WAL.Entry>(replicateContext.entries);
            return true;
        }

        public void start() {
            this.startAsync();
        }

        public void stop() {
            this.stopAsync();
        }

        protected void doStart() {
            startedCount.incrementAndGet();
            this.notifyStarted();
        }

        protected void doStop() {
            stoppedCount.incrementAndGet();
            this.notifyStopped();
        }
    }
}

