/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.datanode;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockMissingException;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PlatformAssumptions;
import org.apache.hadoop.util.Time;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestDataNodeHotSwapVolumes {
    private static final Logger LOG = LoggerFactory.getLogger(TestDataNodeHotSwapVolumes.class);
    private static final int BLOCK_SIZE = 512;
    private static final int DEFAULT_STORAGES_PER_DATANODE = 2;
    private MiniDFSCluster cluster;
    private Configuration conf;

    @After
    public void tearDown() {
        this.shutdown();
    }

    private void startDFSCluster(int numNameNodes, int numDataNodes) throws IOException {
        this.startDFSCluster(numNameNodes, numDataNodes, 2);
    }

    private void startDFSCluster(int numNameNodes, int numDataNodes, int storagePerDataNode) throws IOException {
        this.shutdown();
        this.conf = new Configuration();
        this.conf.setLong("dfs.blocksize", 512L);
        this.conf.setInt("dfs.heartbeat.interval", 1);
        this.conf.setInt("dfs.df.interval", 1000);
        this.conf.setInt("dfs.namenode.heartbeat.recheck-interval", 1000);
        this.conf.setInt("dfs.datanode.failed.volumes.tolerated", 1);
        this.conf.setTimeDuration("dfs.datanode.disk.check.min.gap", 0L, TimeUnit.MILLISECONDS);
        MiniDFSNNTopology nnTopology = MiniDFSNNTopology.simpleFederatedTopology(numNameNodes);
        this.cluster = new MiniDFSCluster.Builder(this.conf).nnTopology(nnTopology).numDataNodes(numDataNodes).storagesPerDatanode(storagePerDataNode).build();
        this.cluster.waitActive();
    }

    private void shutdown() {
        if (this.cluster != null) {
            this.cluster.shutdown();
            this.cluster = null;
        }
    }

    private void createFile(Path path, int numBlocks) throws IOException, InterruptedException, TimeoutException {
        boolean replicateFactor = true;
        this.createFile(path, numBlocks, (short)1);
    }

    private void createFile(Path path, int numBlocks, short replicateFactor) throws IOException, InterruptedException, TimeoutException {
        this.createFile(0, path, numBlocks, replicateFactor);
    }

    private void createFile(int fsIdx, Path path, int numBlocks) throws IOException, InterruptedException, TimeoutException {
        boolean replicateFactor = true;
        this.createFile(fsIdx, path, numBlocks, (short)1);
    }

    private void createFile(int fsIdx, Path path, int numBlocks, short replicateFactor) throws IOException, TimeoutException, InterruptedException {
        boolean seed = false;
        DistributedFileSystem fs = this.cluster.getFileSystem(fsIdx);
        DFSTestUtil.createFile((FileSystem)fs, path, 512 * numBlocks, replicateFactor, 0L);
        DFSTestUtil.waitReplication((FileSystem)fs, path, replicateFactor);
    }

    private static void verifyFileLength(FileSystem fs, Path path, int numBlocks) throws IOException {
        FileStatus status = fs.getFileStatus(path);
        Assert.assertEquals((long)(numBlocks * 512), (long)status.getLen());
    }

    private static int getNumReplicas(FileSystem fs, Path file, int blockIdx) throws IOException {
        BlockLocation[] locs = fs.getFileBlockLocations(file, 0L, Long.MAX_VALUE);
        return locs.length < blockIdx + 1 ? 0 : locs[blockIdx].getNames().length;
    }

    private static void waitReplication(FileSystem fs, Path file, int blockIdx, int numReplicas) throws IOException, TimeoutException, InterruptedException {
        for (int attempts = 50; attempts > 0; --attempts) {
            int actualReplicas = TestDataNodeHotSwapVolumes.getNumReplicas(fs, file, blockIdx);
            if (actualReplicas == numReplicas) {
                return;
            }
            System.out.printf("Block %d of file %s has %d replicas (desired %d).\n", blockIdx, file.toString(), actualReplicas, numReplicas);
            Thread.sleep(100L);
        }
        throw new TimeoutException("Timed out waiting the " + blockIdx + "-th block of " + file + " to have " + numReplicas + " replicas.");
    }

    private static List<String> getDataDirs(DataNode datanode) {
        return new ArrayList<String>(datanode.getConf().getTrimmedStringCollection("dfs.datanode.data.dir"));
    }

    private static void triggerDeleteReport(DataNode datanode) throws IOException {
        datanode.scheduleAllBlockReport(0L);
        DataNodeTestUtils.triggerDeletionReport(datanode);
    }

    @Test
    public void testParseChangedVolumes() throws IOException {
        this.startDFSCluster(1, 1);
        DataNode dn = this.cluster.getDataNodes().get(0);
        Configuration conf = dn.getConf();
        String oldPaths = conf.get("dfs.datanode.data.dir");
        ArrayList<StorageLocation> oldLocations = new ArrayList<StorageLocation>();
        for (String path : oldPaths.split(",")) {
            oldLocations.add(StorageLocation.parse((String)path));
        }
        Assert.assertFalse((boolean)oldLocations.isEmpty());
        String newPaths = new File(((StorageLocation)oldLocations.get(0)).getUri()).getAbsolutePath() + ",/foo/path1,/foo/path2";
        DataNode.ChangedVolumes changedVolumes = dn.parseChangedVolumes(newPaths);
        List newVolumes = changedVolumes.newLocations;
        Assert.assertEquals((long)2L, (long)newVolumes.size());
        Assert.assertEquals((Object)new File("/foo/path1").getAbsolutePath(), (Object)new File(((StorageLocation)newVolumes.get(0)).getUri()).getAbsolutePath());
        Assert.assertEquals((Object)new File("/foo/path2").getAbsolutePath(), (Object)new File(((StorageLocation)newVolumes.get(1)).getUri()).getAbsolutePath());
        List removedVolumes = changedVolumes.deactivateLocations;
        Assert.assertEquals((long)1L, (long)removedVolumes.size());
        Assert.assertEquals((Object)((StorageLocation)oldLocations.get(1)).getNormalizedUri(), (Object)((StorageLocation)removedVolumes.get(0)).getNormalizedUri());
        Assert.assertEquals((long)1L, (long)changedVolumes.unchangedLocations.size());
        Assert.assertEquals((Object)((StorageLocation)oldLocations.get(0)).getNormalizedUri(), (Object)((StorageLocation)changedVolumes.unchangedLocations.get(0)).getNormalizedUri());
    }

    @Test
    public void testParseChangedVolumesFailures() throws IOException {
        this.startDFSCluster(1, 1);
        DataNode dn = this.cluster.getDataNodes().get(0);
        try {
            dn.parseChangedVolumes("");
            Assert.fail((String)"Should throw IOException: empty inputs.");
        }
        catch (IOException e) {
            GenericTestUtils.assertExceptionContains((String)"No directory is specified.", (Throwable)e);
        }
    }

    @Test
    public void testParseStorageTypeChanges() throws IOException {
        this.startDFSCluster(1, 1);
        DataNode dn = this.cluster.getDataNodes().get(0);
        Configuration conf = dn.getConf();
        List oldLocations = DataNode.getStorageLocations((Configuration)conf);
        String newLoc = String.format("[%s]%s", StorageType.SSD, ((StorageLocation)oldLocations.get(1)).getUri());
        String newDataDirs = ((StorageLocation)oldLocations.get(0)).toString() + "," + newLoc;
        try {
            dn.parseChangedVolumes(newDataDirs);
            Assert.fail((String)"should throw IOE because storage type changes.");
        }
        catch (IOException e) {
            GenericTestUtils.assertExceptionContains((String)"Changing storage type is not allowed", (Throwable)e);
        }
    }

    private void addVolumes(int numNewVolumes) throws InterruptedException, IOException, ReconfigurationException {
        this.addVolumes(numNewVolumes, new CountDownLatch(0));
    }

    private void addVolumes(int numNewVolumes, CountDownLatch waitLatch) throws ReconfigurationException, IOException, InterruptedException {
        File volumeDir;
        DataNode dn = this.cluster.getDataNodes().get(0);
        Configuration conf = dn.getConf();
        String oldDataDir = conf.get("dfs.datanode.data.dir");
        ArrayList<File> newVolumeDirs = new ArrayList<File>();
        StringBuilder newDataDirBuf = new StringBuilder(oldDataDir);
        int startIdx = oldDataDir.split(",").length + 1;
        while ((volumeDir = this.cluster.getInstanceStorageDir(0, startIdx)).exists()) {
            ++startIdx;
        }
        for (int i = startIdx; i < startIdx + numNewVolumes; ++i) {
            File volumeDir2 = this.cluster.getInstanceStorageDir(0, i);
            newVolumeDirs.add(volumeDir2);
            volumeDir2.mkdirs();
            newDataDirBuf.append(",");
            newDataDirBuf.append(StorageLocation.parse((String)volumeDir2.toString()).toString());
        }
        String newDataDir = newDataDirBuf.toString();
        Assert.assertThat((String)"DN did not update its own config", (Object)dn.reconfigurePropertyImpl("dfs.datanode.data.dir", newDataDir), (Matcher)Is.is((Object)conf.get("dfs.datanode.data.dir")));
        waitLatch.await();
        String[] effectiveDataDirs = conf.get("dfs.datanode.data.dir").split(",");
        String[] expectDataDirs = newDataDir.split(",");
        Assert.assertEquals((long)expectDataDirs.length, (long)effectiveDataDirs.length);
        ArrayList<StorageLocation> expectedStorageLocations = new ArrayList<StorageLocation>();
        ArrayList<StorageLocation> effectiveStorageLocations = new ArrayList<StorageLocation>();
        for (int i = 0; i < expectDataDirs.length; ++i) {
            StorageLocation expectLocation = StorageLocation.parse((String)expectDataDirs[i]);
            StorageLocation effectiveLocation = StorageLocation.parse((String)effectiveDataDirs[i]);
            expectedStorageLocations.add(expectLocation);
            effectiveStorageLocations.add(effectiveLocation);
        }
        Comparator<StorageLocation> comparator = new Comparator<StorageLocation>(){

            @Override
            public int compare(StorageLocation o1, StorageLocation o2) {
                return o1.toString().compareTo(o2.toString());
            }
        };
        Collections.sort(expectedStorageLocations, comparator);
        Collections.sort(effectiveStorageLocations, comparator);
        Assert.assertEquals((String)"Effective volumes doesnt match expected", expectedStorageLocations, effectiveStorageLocations);
        for (File volumeDir3 : newVolumeDirs) {
            File curDir = new File(volumeDir3, "current");
            Assert.assertTrue((boolean)curDir.exists());
            Assert.assertTrue((boolean)curDir.isDirectory());
        }
    }

    private List<List<Integer>> getNumBlocksReport(int namesystemIdx) {
        ArrayList<List<Integer>> results = new ArrayList<List<Integer>>();
        String bpid = this.cluster.getNamesystem(namesystemIdx).getBlockPoolId();
        List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = this.cluster.getAllBlockReports(bpid);
        for (Map<DatanodeStorage, BlockListAsLongs> datanodeReport : blockReports) {
            ArrayList<Integer> numBlocksPerDN = new ArrayList<Integer>();
            for (BlockListAsLongs blocks : datanodeReport.values()) {
                numBlocksPerDN.add(blocks.getNumberOfBlocks());
            }
            results.add(numBlocksPerDN);
        }
        return results;
    }

    @Test(timeout=60000L)
    public void testAddOneNewVolume() throws IOException, ReconfigurationException, InterruptedException, TimeoutException {
        this.startDFSCluster(1, 1);
        String bpid = this.cluster.getNamesystem().getBlockPoolId();
        int numBlocks = 10;
        this.addVolumes(1);
        Path testFile = new Path("/test");
        this.createFile(testFile, 10);
        List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = this.cluster.getAllBlockReports(bpid);
        Assert.assertEquals((long)1L, (long)blockReports.size());
        Assert.assertEquals((long)3L, (long)blockReports.get(0).size());
        int minNumBlocks = Integer.MAX_VALUE;
        int maxNumBlocks = Integer.MIN_VALUE;
        for (BlockListAsLongs blockList : blockReports.get(0).values()) {
            minNumBlocks = Math.min(minNumBlocks, blockList.getNumberOfBlocks());
            maxNumBlocks = Math.max(maxNumBlocks, blockList.getNumberOfBlocks());
        }
        Assert.assertTrue((Math.abs(maxNumBlocks - minNumBlocks) <= 1 ? 1 : 0) != 0);
        TestDataNodeHotSwapVolumes.verifyFileLength((FileSystem)this.cluster.getFileSystem(), testFile, 10);
    }

    @Test(timeout=60000L)
    public void testReAddVolumeWithBlocks() throws IOException, ReconfigurationException, InterruptedException, TimeoutException {
        this.startDFSCluster(1, 1);
        String bpid = this.cluster.getNamesystem().getBlockPoolId();
        int numBlocks = 10;
        Path testFile = new Path("/test");
        this.createFile(testFile, 10);
        List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = this.cluster.getAllBlockReports(bpid);
        Assert.assertEquals((long)1L, (long)blockReports.size());
        Assert.assertEquals((long)2L, (long)blockReports.get(0).size());
        DataNode dn = this.cluster.getDataNodes().get(0);
        List<String> oldDirs = TestDataNodeHotSwapVolumes.getDataDirs(dn);
        String newDirs = (String)oldDirs.iterator().next();
        Assert.assertThat((String)"DN did not update its own config", (Object)dn.reconfigurePropertyImpl("dfs.datanode.data.dir", newDirs), (Matcher)Is.is((Object)dn.getConf().get("dfs.datanode.data.dir")));
        TestDataNodeHotSwapVolumes.assertFileLocksReleased(new ArrayList<String>(oldDirs).subList(1, oldDirs.size()));
        this.createFile(new Path("/test2"), 10);
        dn.scheduleAllBlockReport(0L);
        blockReports = this.cluster.getAllBlockReports(bpid);
        Assert.assertEquals((long)1L, (long)blockReports.size());
        Assert.assertEquals((long)1L, (long)blockReports.get(0).size());
        for (BlockListAsLongs blockList : blockReports.get(0).values()) {
            Assert.assertEquals((long)15L, (long)blockList.getNumberOfBlocks());
        }
        Assert.assertThat((String)"DN did not update its own config", (Object)dn.reconfigurePropertyImpl("dfs.datanode.data.dir", String.join((CharSequence)",", oldDirs)), (Matcher)Is.is((Object)dn.getConf().get("dfs.datanode.data.dir")));
        dn.scheduleAllBlockReport(0L);
        blockReports = this.cluster.getAllBlockReports(bpid);
        Assert.assertEquals((long)1L, (long)blockReports.size());
        Assert.assertEquals((long)2L, (long)blockReports.get(0).size());
        int minNumBlocks = Integer.MAX_VALUE;
        int maxNumBlocks = Integer.MIN_VALUE;
        for (BlockListAsLongs blockList : blockReports.get(0).values()) {
            minNumBlocks = Math.min(minNumBlocks, blockList.getNumberOfBlocks());
            maxNumBlocks = Math.max(maxNumBlocks, blockList.getNumberOfBlocks());
        }
        Assert.assertEquals((long)5L, (long)minNumBlocks);
        Assert.assertEquals((long)15L, (long)maxNumBlocks);
    }

    @Test(timeout=60000L)
    public void testAddVolumesDuringWrite() throws IOException, InterruptedException, TimeoutException, ReconfigurationException {
        this.startDFSCluster(1, 1);
        int numVolumes = this.cluster.getStoragesPerDatanode();
        String bpid = this.cluster.getNamesystem().getBlockPoolId();
        Path testFile = new Path("/test");
        int initialBlockCount = numVolumes * 2;
        this.createFile(testFile, initialBlockCount);
        int newVolumeCount = 5;
        this.addVolumes(newVolumeCount);
        numVolumes += newVolumeCount;
        int additionalBlockCount = 9;
        int totalBlockCount = initialBlockCount + additionalBlockCount;
        DFSTestUtil.appendFile((FileSystem)this.cluster.getFileSystem(), testFile, 512 * additionalBlockCount);
        TestDataNodeHotSwapVolumes.verifyFileLength((FileSystem)this.cluster.getFileSystem(), testFile, totalBlockCount);
        List<Integer> expectedNumBlocks = Arrays.asList(1, 1, 1, 1, 1, 4, 4);
        List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = this.cluster.getAllBlockReports(bpid);
        Assert.assertEquals((long)1L, (long)blockReports.size());
        Assert.assertEquals((long)numVolumes, (long)blockReports.get(0).size());
        Map<DatanodeStorage, BlockListAsLongs> dnReport = blockReports.get(0);
        ArrayList<Integer> actualNumBlocks = new ArrayList<Integer>();
        for (BlockListAsLongs blockList : dnReport.values()) {
            actualNumBlocks.add(blockList.getNumberOfBlocks());
        }
        Collections.sort(actualNumBlocks);
        Assert.assertEquals(expectedNumBlocks, actualNumBlocks);
    }

    @Test(timeout=180000L)
    public void testAddVolumesConcurrently() throws IOException, InterruptedException, TimeoutException, ReconfigurationException {
        this.startDFSCluster(1, 1, 10);
        int numVolumes = this.cluster.getStoragesPerDatanode();
        String blockPoolId = this.cluster.getNamesystem().getBlockPoolId();
        Path testFile = new Path("/test");
        int initialBlockCount = numVolumes * 2;
        this.createFile(testFile, initialBlockCount);
        final DataNode dn = this.cluster.getDataNodes().get(0);
        FsDatasetSpi data = dn.data;
        dn.data = (FsDatasetSpi)Mockito.spy((Object)data);
        int newVolumeCount = 40;
        final List<Thread> addVolumeDelayedThreads = Collections.synchronizedList(new ArrayList());
        final AtomicBoolean addVolumeError = new AtomicBoolean(false);
        final AtomicBoolean listStorageError = new AtomicBoolean(false);
        final CountDownLatch addVolumeCompletionLatch = new CountDownLatch(40);
        Thread listStorageThread = new Thread(new Runnable(){

            @Override
            public void run() {
                while (addVolumeCompletionLatch.getCount() != 40L) {
                    int i = 0;
                    while (i++ < 1000) {
                        try {
                            dn.getStorage().listStorageDirectories();
                        }
                        catch (Exception e) {
                            listStorageError.set(true);
                            LOG.error("Error listing storage: " + e);
                        }
                    }
                }
            }
        });
        listStorageThread.start();
        ((FsDatasetSpi)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(final InvocationOnMock invocationOnMock) throws Throwable {
                final Random r = new Random();
                Thread addVolThread = new Thread(new Runnable(){

                    @Override
                    public void run() {
                        try {
                            r.setSeed(Time.now());
                            if (r.nextInt(10) > 4) {
                                int s = r.nextInt(10) + 1;
                                Thread.sleep(s * 100);
                            }
                            invocationOnMock.callRealMethod();
                        }
                        catch (Throwable throwable) {
                            addVolumeError.set(true);
                            LOG.error("Error adding volume: " + throwable);
                        }
                        finally {
                            addVolumeCompletionLatch.countDown();
                        }
                    }
                });
                addVolumeDelayedThreads.add(addVolThread);
                addVolThread.start();
                return null;
            }
        }).when((Object)dn.data)).addVolume((StorageLocation)ArgumentMatchers.any(StorageLocation.class), (List)ArgumentMatchers.any(List.class));
        this.addVolumes(40, addVolumeCompletionLatch);
        numVolumes += 40;
        for (Thread t : addVolumeDelayedThreads) {
            t.join();
        }
        listStorageThread.join();
        Assert.assertEquals((String)"Error adding volumes!", (Object)false, (Object)addVolumeError.get());
        Assert.assertEquals((String)"Error listing storage!", (Object)false, (Object)listStorageError.get());
        int additionalBlockCount = 9;
        int totalBlockCount = initialBlockCount + additionalBlockCount;
        DFSTestUtil.appendFile((FileSystem)this.cluster.getFileSystem(), testFile, 512 * additionalBlockCount);
        TestDataNodeHotSwapVolumes.verifyFileLength((FileSystem)this.cluster.getFileSystem(), testFile, totalBlockCount);
        List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = this.cluster.getAllBlockReports(blockPoolId);
        Assert.assertEquals((long)1L, (long)blockReports.size());
        Assert.assertEquals((long)numVolumes, (long)blockReports.get(0).size());
    }

    @Test(timeout=60000L)
    public void testAddVolumesToFederationNN() throws IOException, TimeoutException, InterruptedException, ReconfigurationException {
        int numNameNodes = 2;
        boolean numDataNodes = true;
        this.startDFSCluster(2, 1);
        Path testFile = new Path("/test");
        this.createFile(0, testFile, 4);
        this.createFile(1, testFile, 4);
        int numNewVolumes = 2;
        this.addVolumes(2);
        DFSTestUtil.appendFile((FileSystem)this.cluster.getFileSystem(0), testFile, 4096);
        List<List<Integer>> actualNumBlocks = this.getNumBlocksReport(0);
        Assert.assertEquals((long)this.cluster.getDataNodes().size(), (long)actualNumBlocks.size());
        List<Integer> blocksOnFirstDN = actualNumBlocks.get(0);
        Collections.sort(blocksOnFirstDN);
        Assert.assertEquals(Arrays.asList(2, 2, 4, 4), blocksOnFirstDN);
        actualNumBlocks = this.getNumBlocksReport(1);
        Assert.assertEquals((long)4L, (long)actualNumBlocks.get(0).size());
        Assert.assertEquals((long)2L, (long)Collections.frequency((Collection)actualNumBlocks.get(0), 0));
    }

    @Test(timeout=60000L)
    public void testRemoveOneVolume() throws ReconfigurationException, InterruptedException, TimeoutException, IOException {
        this.startDFSCluster(1, 1);
        boolean replFactor = true;
        Path testFile = new Path("/test");
        this.createFile(testFile, 10, (short)1);
        DataNode dn = this.cluster.getDataNodes().get(0);
        List<String> oldDirs = TestDataNodeHotSwapVolumes.getDataDirs(dn);
        String newDirs = (String)oldDirs.iterator().next();
        Assert.assertThat((String)"DN did not update its own config", (Object)dn.reconfigurePropertyImpl("dfs.datanode.data.dir", newDirs), (Matcher)Is.is((Object)dn.getConf().get("dfs.datanode.data.dir")));
        TestDataNodeHotSwapVolumes.assertFileLocksReleased(new ArrayList<String>(oldDirs).subList(1, oldDirs.size()));
        dn.scheduleAllBlockReport(0L);
        try {
            DFSTestUtil.readFile((FileSystem)this.cluster.getFileSystem(), testFile);
            Assert.fail((String)"Expect to throw BlockMissingException.");
        }
        catch (BlockMissingException e) {
            GenericTestUtils.assertExceptionContains((String)"Could not obtain block", (Throwable)e);
        }
        Path newFile = new Path("/newFile");
        this.createFile(newFile, 6);
        String bpid = this.cluster.getNamesystem().getBlockPoolId();
        List<Map<DatanodeStorage, BlockListAsLongs>> blockReports = this.cluster.getAllBlockReports(bpid);
        Assert.assertEquals((long)1L, (long)blockReports.size());
        BlockListAsLongs blocksForVolume1 = blockReports.get(0).values().iterator().next();
        Assert.assertEquals((long)11L, (long)blocksForVolume1.getNumberOfBlocks());
    }

    @Test(timeout=60000L)
    public void testReplicatingAfterRemoveVolume() throws InterruptedException, TimeoutException, IOException, ReconfigurationException {
        String dirWithBlock;
        this.startDFSCluster(1, 2);
        DistributedFileSystem fs = this.cluster.getFileSystem();
        int replFactor = 2;
        Path testFile = new Path("/test");
        this.createFile(testFile, 4, (short)2);
        DataNode dn = this.cluster.getDataNodes().get(0);
        List<String> oldDirs = TestDataNodeHotSwapVolumes.getDataDirs(dn);
        ExtendedBlock block = DFSTestUtil.getAllBlocks((FileSystem)fs, testFile).get(1).getBlock();
        FsVolumeSpi volumeWithBlock = dn.getFSDataset().getVolume(block);
        String newDirs = dirWithBlock = "[" + volumeWithBlock.getStorageType() + "]" + volumeWithBlock.getStorageLocation().getUri();
        for (String dir : oldDirs) {
            if (dirWithBlock.startsWith(dir)) continue;
            newDirs = dir;
            break;
        }
        Assert.assertThat((String)"DN did not update its own config", (Object)dn.reconfigurePropertyImpl("dfs.datanode.data.dir", newDirs), (Matcher)Is.is((Object)dn.getConf().get("dfs.datanode.data.dir")));
        oldDirs.remove(newDirs);
        TestDataNodeHotSwapVolumes.assertFileLocksReleased(oldDirs);
        TestDataNodeHotSwapVolumes.triggerDeleteReport(dn);
        TestDataNodeHotSwapVolumes.waitReplication((FileSystem)fs, testFile, 1, 1);
        DFSTestUtil.waitReplication((FileSystem)fs, testFile, (short)2);
    }

    @Test
    public void testAddVolumeFailures() throws IOException {
        Object messages;
        this.startDFSCluster(1, 1);
        String dataDir = this.cluster.getDataDirectory();
        DataNode dn = this.cluster.getDataNodes().get(0);
        ArrayList newDirs = Lists.newArrayList();
        int NUM_NEW_DIRS = 4;
        for (int i = 0; i < 4; ++i) {
            File newVolume = new File(dataDir, "new_vol" + i);
            newDirs.add(newVolume.toString());
            if (i % 2 != 0) continue;
            newVolume.createNewFile();
        }
        String newValue = dn.getConf().get("dfs.datanode.data.dir") + "," + Joiner.on((String)",").join((Iterable)newDirs);
        try {
            dn.reconfigurePropertyImpl("dfs.datanode.data.dir", newValue);
            Assert.fail((String)"Expect to throw IOException.");
        }
        catch (ReconfigurationException e) {
            String errorMessage = e.getCause().getMessage();
            messages = errorMessage.split("\\r?\\n");
            Assert.assertEquals((long)2L, (long)((String[])messages).length);
            Assert.assertThat((Object)messages[0], (Matcher)CoreMatchers.containsString((String)"new_vol0"));
            Assert.assertThat((Object)messages[1], (Matcher)CoreMatchers.containsString((String)"new_vol2"));
        }
        FsDatasetSpi dataset = dn.getFSDataset();
        FsDatasetSpi.FsVolumeReferences volumes = dataset.getFsVolumeReferences();
        messages = null;
        try {
            for (FsVolumeSpi volume : volumes) {
                Assert.assertThat((Object)new File(volume.getStorageLocation().getUri()).toString(), (Matcher)Is.is((Matcher)CoreMatchers.not((Matcher)CoreMatchers.anyOf((Matcher)Is.is(newDirs.get(0)), (Matcher)Is.is(newDirs.get(2))))));
            }
        }
        catch (Throwable throwable) {
            messages = throwable;
            throw throwable;
        }
        finally {
            if (volumes != null) {
                if (messages != null) {
                    try {
                        volumes.close();
                    }
                    catch (Throwable throwable) {
                        ((Throwable)messages).addSuppressed(throwable);
                    }
                } else {
                    volumes.close();
                }
            }
        }
        DataStorage storage = dn.getStorage();
        for (int i = 0; i < storage.getNumStorageDirs(); ++i) {
            Storage.StorageDirectory sd = storage.getStorageDir(i);
            Assert.assertThat((Object)sd.getRoot().toString(), (Matcher)Is.is((Matcher)CoreMatchers.not((Matcher)CoreMatchers.anyOf((Matcher)Is.is(newDirs.get(0)), (Matcher)Is.is(newDirs.get(2))))));
        }
        String[] effectiveVolumes = dn.getConf().get("dfs.datanode.data.dir").split(",");
        Assert.assertEquals((long)4L, (long)effectiveVolumes.length);
        for (String ev : effectiveVolumes) {
            Assert.assertThat((Object)new File(StorageLocation.parse((String)ev).getUri()).getCanonicalPath(), (Matcher)Is.is((Matcher)CoreMatchers.not((Matcher)CoreMatchers.anyOf((Matcher)Is.is(newDirs.get(0)), (Matcher)Is.is(newDirs.get(2))))));
        }
    }

    private static void assertFileLocksReleased(Collection<String> dirs) throws IOException {
        for (String dir : dirs) {
            try {
                FsDatasetTestUtil.assertFileLockReleased(dir);
            }
            catch (IOException e) {
                LOG.warn("{}", (Throwable)e);
            }
        }
    }

    @Test(timeout=600000L)
    public void testRemoveVolumeBeingWritten() throws InterruptedException, TimeoutException, ReconfigurationException, IOException, BrokenBarrierException {
        for (int i = 0; i < 3; ++i) {
            this.testRemoveVolumeBeingWrittenForDatanode(i);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testRemoveVolumeBeingWrittenForDatanode(int dataNodeIdx) throws IOException, ReconfigurationException, TimeoutException, InterruptedException, BrokenBarrierException {
        Serializable exceptions;
        Object block;
        this.startDFSCluster(1, 4);
        int REPLICATION = 3;
        DistributedFileSystem fs = this.cluster.getFileSystem();
        DFSClient client = fs.getClient();
        Path testFile = new Path("/test");
        FSDataOutputStream out = fs.create(testFile, (short)3);
        Random rb = new Random(0L);
        byte[] writeBuf = new byte[256];
        rb.nextBytes(writeBuf);
        out.write(writeBuf);
        out.hflush();
        BlockLocation[] blocks = fs.getFileBlockLocations(testFile, 0L, 512L);
        String[] dataNodeNames = blocks[0].getNames();
        String dataNodeName = dataNodeNames[dataNodeIdx];
        int xferPort = Integer.parseInt(dataNodeName.split(":")[1]);
        DataNode dn = null;
        for (DataNode dataNode : this.cluster.getDataNodes()) {
            if (dataNode.getXferPort() != xferPort) continue;
            dn = dataNode;
            break;
        }
        Assert.assertNotNull(dn);
        final CyclicBarrier barrier = new CyclicBarrier(4);
        final AtomicBoolean done = new AtomicBoolean(false);
        DataNodeFaultInjector newInjector = new DataNodeFaultInjector(){

            public void logDelaySendingAckToUpstream(String upstreamAddr, long delayMs) throws IOException {
                try {
                    if (!done.get()) {
                        barrier.await();
                        Thread.sleep(1000L);
                    }
                }
                catch (InterruptedException | BrokenBarrierException e) {
                    throw new IOException(e);
                }
            }
        };
        DataNodeFaultInjector oldInjector = DataNodeFaultInjector.get();
        try {
            DataNodeFaultInjector.set((DataNodeFaultInjector)newInjector);
            List<String> oldDirs = TestDataNodeHotSwapVolumes.getDataDirs(dn);
            LocatedBlocks lbs = client.getLocatedBlocks("/test", 0L);
            block = lbs.get(0);
            FsVolumeImpl volume = (FsVolumeImpl)dn.getFSDataset().getVolume(block.getBlock());
            String newDirs = oldDirs.stream().filter(d -> !d.contains(volume.getStorageLocation().toString())).collect(Collectors.joining(","));
            exceptions = new ArrayList();
            DataNode dataNode = dn;
            CyclicBarrier reconfigBarrier = new CyclicBarrier(2);
            Thread reconfigThread = new Thread(() -> {
                try {
                    reconfigBarrier.await();
                    barrier.await();
                    Assert.assertThat((String)"DN did not update its own config", (Object)dataNode.reconfigurePropertyImpl("dfs.datanode.data.dir", newDirs), (Matcher)Is.is((Object)dataNode.getConf().get("dfs.datanode.data.dir")));
                    done.set(true);
                }
                catch (InterruptedException | BrokenBarrierException | ReconfigurationException e) {
                    exceptions.add(new IOException(e));
                }
            });
            reconfigThread.start();
            rb.nextBytes(writeBuf);
            out.write(writeBuf);
            reconfigBarrier.await();
            out.hflush();
            out.close();
            reconfigThread.join();
            if (!exceptions.isEmpty()) {
                throw MultipleIOException.createIOException(exceptions);
            }
        }
        finally {
            DataNodeFaultInjector.set((DataNodeFaultInjector)oldInjector);
        }
        FsDatasetSpi fsDatasetSpi = dn.getFSDataset();
        FsDatasetSpi.FsVolumeReferences fsVolumeReferences = fsDatasetSpi.getFsVolumeReferences();
        block = null;
        try {
            for (int i = 0; i < fsVolumeReferences.size(); ++i) {
                System.out.println("Vol: " + fsVolumeReferences.get(i).getBaseURI().toString());
            }
            Assert.assertEquals((String)"Volume remove wasn't successful.", (long)1L, (long)fsVolumeReferences.size());
        }
        catch (Throwable i) {
            block = i;
            throw i;
        }
        finally {
            if (fsVolumeReferences != null) {
                if (block != null) {
                    try {
                        fsVolumeReferences.close();
                    }
                    catch (Throwable i) {
                        ((Throwable)block).addSuppressed(i);
                    }
                } else {
                    fsVolumeReferences.close();
                }
            }
        }
        DFSTestUtil.waitReplication((FileSystem)fs, testFile, (short)3);
        byte[] content = DFSTestUtil.readFileBuffer((FileSystem)fs, testFile);
        Assert.assertEquals((long)512L, (long)content.length);
        for (int i = 0; i < 10; ++i) {
            Path file = new Path("/after-" + i);
            FSDataOutputStream fout = fs.create(file, (short)3);
            exceptions = null;
            try {
                rb.nextBytes(writeBuf);
                fout.write(writeBuf);
                continue;
            }
            catch (Throwable dataNode) {
                exceptions = dataNode;
                throw dataNode;
            }
            finally {
                if (fout != null) {
                    if (exceptions != null) {
                        try {
                            fout.close();
                        }
                        catch (Throwable dataNode) {
                            ((Throwable)exceptions).addSuppressed(dataNode);
                        }
                    } else {
                        fout.close();
                    }
                }
            }
        }
        try (FsDatasetSpi.FsVolumeReferences fsVolumeReferences2 = fsDatasetSpi.getFsVolumeReferences();){
            Assert.assertEquals((String)"Volume remove wasn't successful.", (long)1L, (long)fsVolumeReferences2.size());
            FsVolumeSpi volume = fsVolumeReferences2.get(0);
            String bpid = this.cluster.getNamesystem().getBlockPoolId();
            FsVolumeSpi.BlockIterator blkIter = volume.newBlockIterator(bpid, "test");
            int blockCount = 0;
            while (!blkIter.atEnd()) {
                blkIter.nextBlock();
                ++blockCount;
            }
            Assert.assertTrue((String)String.format("DataNode(%d) should have more than 1 blocks", dataNodeIdx), (blockCount > 1 ? 1 : 0) != 0);
        }
    }

    @Test(timeout=60000L)
    public void testAddBackRemovedVolume() throws IOException, TimeoutException, InterruptedException, ReconfigurationException {
        this.startDFSCluster(1, 2);
        this.createFile(new Path("/test"), 32);
        DataNode dn = this.cluster.getDataNodes().get(0);
        Configuration conf = dn.getConf();
        String oldDataDir = conf.get("dfs.datanode.data.dir");
        String keepDataDir = oldDataDir.split(",")[0];
        String removeDataDir = oldDataDir.split(",")[1];
        Assert.assertThat((String)"DN did not update its own config", (Object)dn.reconfigurePropertyImpl("dfs.datanode.data.dir", keepDataDir), (Matcher)Is.is((Object)dn.getConf().get("dfs.datanode.data.dir")));
        for (int i = 0; i < this.cluster.getNumNameNodes(); ++i) {
            String bpid = this.cluster.getNamesystem(i).getBlockPoolId();
            BlockPoolSliceStorage bpsStorage = dn.getStorage().getBPStorage(bpid);
            for (int j = 0; j < bpsStorage.getNumStorageDirs(); ++j) {
                Storage.StorageDirectory sd = bpsStorage.getStorageDir(j);
                Assert.assertFalse((boolean)sd.getRoot().getAbsolutePath().startsWith(new File(removeDataDir).getAbsolutePath()));
            }
            Assert.assertEquals((long)dn.getStorage().getBPStorage(bpid).getNumStorageDirs(), (long)1L);
        }
        Assert.assertThat((String)"DN did not update its own config", (Object)dn.reconfigurePropertyImpl("dfs.datanode.data.dir", oldDataDir), (Matcher)Is.is((Object)dn.getConf().get("dfs.datanode.data.dir")));
    }

    @Test(timeout=60000L)
    public void testDirectlyReloadAfterCheckDiskError() throws Exception {
        PlatformAssumptions.assumeNotWindows();
        this.startDFSCluster(1, 2);
        this.createFile(new Path("/test"), 32, (short)2);
        DataNode dn = this.cluster.getDataNodes().get(0);
        String oldDataDir = dn.getConf().get("dfs.datanode.data.dir");
        File dirToFail = this.cluster.getInstanceStorageDir(0, 0);
        FsVolumeImpl failedVolume = DataNodeTestUtils.getVolume(dn, dirToFail);
        Assert.assertTrue((String)("No FsVolume was found for " + dirToFail), (failedVolume != null ? 1 : 0) != 0);
        long used = failedVolume.getDfsUsed();
        DataNodeTestUtils.injectDataDirFailure(dirToFail);
        DataNodeTestUtils.waitForDiskError(dn, (FsVolumeSpi)failedVolume);
        this.createFile(new Path("/test1"), 32, (short)2);
        Assert.assertEquals((long)used, (long)failedVolume.getDfsUsed());
        DataNodeTestUtils.restoreDataDirFromFailure(dirToFail);
        LOG.info("reconfiguring DN ");
        Assert.assertThat((String)"DN did not update its own config", (Object)dn.reconfigurePropertyImpl("dfs.datanode.data.dir", oldDataDir), (Matcher)Is.is((Object)dn.getConf().get("dfs.datanode.data.dir")));
        this.createFile(new Path("/test2"), 32, (short)2);
        FsVolumeImpl restoredVolume = DataNodeTestUtils.getVolume(dn, dirToFail);
        Assert.assertTrue((restoredVolume != null ? 1 : 0) != 0);
        Assert.assertTrue((restoredVolume != failedVolume ? 1 : 0) != 0);
        Assert.assertTrue((restoredVolume.getDfsUsed() > used ? 1 : 0) != 0);
    }

    @Test(timeout=100000L)
    public void testFullBlockReportAfterRemovingVolumes() throws IOException, ReconfigurationException {
        Configuration conf = new Configuration();
        conf.setLong("dfs.blocksize", 512L);
        conf.setLong("dfs.blockreport.intervalMsec", 10800000L);
        conf.setLong("dfs.heartbeat.interval", 1080L);
        this.cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
        this.cluster.waitActive();
        DataNode dn = this.cluster.getDataNodes().get(0);
        DatanodeProtocolClientSideTranslatorPB spy = InternalDataNodeTestUtils.spyOnBposToNN(dn, this.cluster.getNameNode());
        File dataDirToKeep = this.cluster.getInstanceStorageDir(0, 0);
        Assert.assertThat((String)"DN did not update its own config", (Object)dn.reconfigurePropertyImpl("dfs.datanode.data.dir", dataDirToKeep.toString()), (Matcher)Is.is((Object)dn.getConf().get("dfs.datanode.data.dir")));
        ((DatanodeProtocolClientSideTranslatorPB)Mockito.verify((Object)spy, (VerificationMode)Mockito.timeout((long)60000L).times(1))).blockReport((DatanodeRegistration)ArgumentMatchers.any(DatanodeRegistration.class), ArgumentMatchers.anyString(), (StorageBlockReport[])ArgumentMatchers.any(StorageBlockReport[].class), (BlockReportContext)ArgumentMatchers.any(BlockReportContext.class));
    }
}

