/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.io.asyncfs;

import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.MessageLite;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemLinkResolver;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.proto.SecurityProtos;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption;
import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.GenericFutureListener;
import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public final class FanOutOneBlockAsyncDFSOutputHelper {
    private static final Logger LOG = LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class);
    public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";
    public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10;
    private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
    public static final long HEART_BEAT_SEQNO = -1L;
    public static final int READ_TIMEOUT = 60000;
    private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];
    private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER;
    private static final StorageTypeSetter STORAGE_TYPE_SETTER;
    private static final BlockAdder BLOCK_ADDER;
    private static final LeaseManager LEASE_MANAGER;
    private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
    private static final PBHelper PB_HELPER;
    private static final ChecksumCreater CHECKSUM_CREATER;
    private static final FileCreator FILE_CREATOR;

    private FanOutOneBlockAsyncDFSOutputHelper() {
    }

    private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
        final Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning", new Class[0]);
        isClientRunningMethod.setAccessible(true);
        return new DFSClientAdaptor(){

            @Override
            public boolean isClientRunning(DFSClient client) {
                try {
                    return (Boolean)isClientRunningMethod.invoke((Object)client, new Object[0]);
                }
                catch (IllegalAccessException | InvocationTargetException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    private static LeaseManager createLeaseManager() throws NoSuchMethodException {
        final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease", Long.TYPE, DFSOutputStream.class);
        beginFileLeaseMethod.setAccessible(true);
        final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", Long.TYPE);
        endFileLeaseMethod.setAccessible(true);
        return new LeaseManager(){

            @Override
            public void begin(DFSClient client, long inodeId) {
                try {
                    beginFileLeaseMethod.invoke((Object)client, inodeId, null);
                }
                catch (IllegalAccessException | InvocationTargetException e) {
                    throw new RuntimeException(e);
                }
            }

            @Override
            public void end(DFSClient client, long inodeId) {
                try {
                    endFileLeaseMethod.invoke((Object)client, inodeId);
                }
                catch (IllegalAccessException | InvocationTargetException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    private static PipelineAckStatusGetter createPipelineAckStatusGetter27() throws NoSuchMethodException {
        Class<Enum> ecnClass;
        final Method getFlagListMethod = DataTransferProtos.PipelineAckProto.class.getMethod("getFlagList", new Class[0]);
        try {
            ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN").asSubclass(Enum.class);
        }
        catch (ClassNotFoundException e) {
            String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please update your WAL Provider to not make use of the 'asyncfs' provider. See HBASE-16110 for more information.";
            LOG.error(msg, (Throwable)e);
            throw new Error(msg, e);
        }
        final Enum disabledECN = Enum.valueOf(ecnClass, "DISABLED");
        final Method getReplyMethod = DataTransferProtos.PipelineAckProto.class.getMethod("getReply", Integer.TYPE);
        final Method combineHeaderMethod = PipelineAck.class.getMethod("combineHeader", ecnClass, DataTransferProtos.Status.class);
        final Method getStatusFromHeaderMethod = PipelineAck.class.getMethod("getStatusFromHeader", Integer.TYPE);
        return new PipelineAckStatusGetter(){

            @Override
            public DataTransferProtos.Status get(DataTransferProtos.PipelineAckProto ack) {
                try {
                    Integer headerFlag;
                    List flagList = (List)getFlagListMethod.invoke((Object)ack, new Object[0]);
                    if (flagList.isEmpty()) {
                        DataTransferProtos.Status reply = (DataTransferProtos.Status)getReplyMethod.invoke((Object)ack, 0);
                        headerFlag = (Integer)combineHeaderMethod.invoke(null, disabledECN, reply);
                    } else {
                        headerFlag = (Integer)flagList.get(0);
                    }
                    return (DataTransferProtos.Status)getStatusFromHeaderMethod.invoke(null, headerFlag);
                }
                catch (IllegalAccessException | InvocationTargetException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    private static PipelineAckStatusGetter createPipelineAckStatusGetter26() throws NoSuchMethodException {
        final Method getStatusMethod = DataTransferProtos.PipelineAckProto.class.getMethod("getStatus", Integer.TYPE);
        return new PipelineAckStatusGetter(){

            @Override
            public DataTransferProtos.Status get(DataTransferProtos.PipelineAckProto ack) {
                try {
                    return (DataTransferProtos.Status)getStatusMethod.invoke((Object)ack, 0);
                }
                catch (IllegalAccessException | InvocationTargetException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    private static PipelineAckStatusGetter createPipelineAckStatusGetter() throws NoSuchMethodException {
        try {
            return FanOutOneBlockAsyncDFSOutputHelper.createPipelineAckStatusGetter27();
        }
        catch (NoSuchMethodException e) {
            LOG.debug("Can not get expected method " + e.getMessage() + ", this usually because your Hadoop is pre 2.7.0, try the methods in Hadoop 2.6.x instead.");
            return FanOutOneBlockAsyncDFSOutputHelper.createPipelineAckStatusGetter26();
        }
    }

    private static StorageTypeSetter createStorageTypeSetter() throws NoSuchMethodException {
        final Method setStorageTypeMethod = DataTransferProtos.OpWriteBlockProto.Builder.class.getMethod("setStorageType", HdfsProtos.StorageTypeProto.class);
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (HdfsProtos.StorageTypeProto storageTypeProto : HdfsProtos.StorageTypeProto.values()) {
            builder.put((Object)storageTypeProto.name(), (Object)storageTypeProto);
        }
        final ImmutableMap name2ProtoEnum = builder.build();
        return new StorageTypeSetter(){

            @Override
            public DataTransferProtos.OpWriteBlockProto.Builder set(DataTransferProtos.OpWriteBlockProto.Builder builder, Enum<?> storageType) {
                Object protoEnum = name2ProtoEnum.get((Object)storageType.name());
                try {
                    setStorageTypeMethod.invoke((Object)builder, protoEnum);
                }
                catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
                    throw new RuntimeException(e);
                }
                return builder;
            }
        };
    }

    private static BlockAdder createBlockAdder() throws NoSuchMethodException {
        for (Method method : ClientProtocol.class.getMethods()) {
            if (!method.getName().equals("addBlock")) continue;
            final Method addBlockMethod = method;
            Class<?>[] paramTypes = addBlockMethod.getParameterTypes();
            if (paramTypes[paramTypes.length - 1] == String[].class) {
                return new BlockAdder(){

                    @Override
                    public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes) throws IOException {
                        try {
                            return (LocatedBlock)addBlockMethod.invoke((Object)namenode, src, clientName, previous, excludeNodes, fileId, favoredNodes);
                        }
                        catch (IllegalAccessException e) {
                            throw new RuntimeException(e);
                        }
                        catch (InvocationTargetException e) {
                            Throwables.propagateIfPossible((Throwable)e.getTargetException(), IOException.class);
                            throw new RuntimeException(e);
                        }
                    }
                };
            }
            return new BlockAdder(){

                @Override
                public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName, ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes) throws IOException {
                    try {
                        return (LocatedBlock)addBlockMethod.invoke((Object)namenode, src, clientName, previous, excludeNodes, fileId, favoredNodes, null);
                    }
                    catch (IllegalAccessException e) {
                        throw new RuntimeException(e);
                    }
                    catch (InvocationTargetException e) {
                        Throwables.propagateIfPossible((Throwable)e.getTargetException(), IOException.class);
                        throw new RuntimeException(e);
                    }
                }
            };
        }
        throw new NoSuchMethodException("Can not find addBlock method in ClientProtocol");
    }

    private static PBHelper createPBHelper() throws NoSuchMethodException {
        Class<Object> helperClass;
        String clazzName = "org.apache.hadoop.hdfs.protocolPB.PBHelperClient";
        try {
            helperClass = Class.forName(clazzName);
        }
        catch (ClassNotFoundException e) {
            helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
            LOG.debug("" + clazzName + " not found (Hadoop is pre-2.8.0?); using " + helperClass.toString() + " instead.");
        }
        final Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
        final Method convertTokenMethod = helperClass.getMethod("convert", Token.class);
        return new PBHelper(){

            @Override
            public HdfsProtos.ExtendedBlockProto convert(ExtendedBlock b) {
                try {
                    return (HdfsProtos.ExtendedBlockProto)convertEBMethod.invoke(null, b);
                }
                catch (IllegalAccessException | InvocationTargetException e) {
                    throw new RuntimeException(e);
                }
            }

            @Override
            public SecurityProtos.TokenProto convert(Token<?> tok) {
                try {
                    return (SecurityProtos.TokenProto)convertTokenMethod.invoke(null, tok);
                }
                catch (IllegalAccessException | InvocationTargetException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    private static ChecksumCreater createChecksumCreater28(final Method getConfMethod, Class<?> confClass) throws NoSuchMethodException {
        for (Method method : confClass.getMethods()) {
            if (!method.getName().equals("createChecksum")) continue;
            final Method createChecksumMethod = method;
            return new ChecksumCreater(){

                @Override
                public DataChecksum createChecksum(DFSClient client) {
                    try {
                        return (DataChecksum)createChecksumMethod.invoke(getConfMethod.invoke((Object)client, new Object[0]), new Object[]{null});
                    }
                    catch (IllegalAccessException | InvocationTargetException e) {
                        throw new RuntimeException(e);
                    }
                }
            };
        }
        throw new NoSuchMethodException("Can not find createChecksum method in DfsClientConf");
    }

    private static ChecksumCreater createChecksumCreater27(final Method getConfMethod, Class<?> confClass) throws NoSuchMethodException {
        final Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum", new Class[0]);
        createChecksumMethod.setAccessible(true);
        return new ChecksumCreater(){

            @Override
            public DataChecksum createChecksum(DFSClient client) {
                try {
                    return (DataChecksum)createChecksumMethod.invoke(getConfMethod.invoke((Object)client, new Object[0]), new Object[0]);
                }
                catch (IllegalAccessException | InvocationTargetException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    private static ChecksumCreater createChecksumCreater() throws NoSuchMethodException, ClassNotFoundException {
        Method getConfMethod = DFSClient.class.getMethod("getConf", new Class[0]);
        try {
            return FanOutOneBlockAsyncDFSOutputHelper.createChecksumCreater28(getConfMethod, Class.forName("org.apache.hadoop.hdfs.client.impl.DfsClientConf"));
        }
        catch (ClassNotFoundException e) {
            LOG.debug("No DfsClientConf class found, should be hadoop 2.7-", (Throwable)e);
            return FanOutOneBlockAsyncDFSOutputHelper.createChecksumCreater27(getConfMethod, Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
        }
    }

    private static FileCreator createFileCreator3() throws NoSuchMethodException {
        Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, String.class, EnumSetWritable.class, Boolean.TYPE, Short.TYPE, Long.TYPE, CryptoProtocolVersion[].class, String.class);
        return (instance, src, masked, clientName, flag, createParent, replication, blockSize, supportedVersions) -> (HdfsFileStatus)createMethod.invoke((Object)instance, src, masked, clientName, flag, createParent, replication, blockSize, supportedVersions, null);
    }

    private static FileCreator createFileCreator2() throws NoSuchMethodException {
        Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, String.class, EnumSetWritable.class, Boolean.TYPE, Short.TYPE, Long.TYPE, CryptoProtocolVersion[].class);
        return (instance, src, masked, clientName, flag, createParent, replication, blockSize, supportedVersions) -> (HdfsFileStatus)createMethod.invoke((Object)instance, src, masked, clientName, flag, createParent, replication, blockSize, supportedVersions);
    }

    private static FileCreator createFileCreator() throws NoSuchMethodException {
        try {
            return FanOutOneBlockAsyncDFSOutputHelper.createFileCreator3();
        }
        catch (NoSuchMethodException e) {
            LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x");
            return FanOutOneBlockAsyncDFSOutputHelper.createFileCreator2();
        }
    }

    static void beginFileLease(DFSClient client, long inodeId) {
        LEASE_MANAGER.begin(client, inodeId);
    }

    static void endFileLease(DFSClient client, long inodeId) {
        LEASE_MANAGER.end(client, inodeId);
    }

    static DataChecksum createChecksum(DFSClient client) {
        return CHECKSUM_CREATER.createChecksum(client);
    }

    static DataTransferProtos.Status getStatus(DataTransferProtos.PipelineAckProto ack) {
        return PIPELINE_ACK_STATUS_GETTER.get(ack);
    }

    private static void processWriteBlockResponse(Channel channel, final DatanodeInfo dnInfo, final Promise<Channel> promise, final int timeoutMs) {
        channel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler((long)timeoutMs, 0L, 0L, TimeUnit.MILLISECONDS), new ProtobufVarint32FrameDecoder(), new ProtobufDecoder((MessageLite)DataTransferProtos.BlockOpResponseProto.getDefaultInstance()), new SimpleChannelInboundHandler<DataTransferProtos.BlockOpResponseProto>(){

            protected void channelRead0(ChannelHandlerContext ctx, DataTransferProtos.BlockOpResponseProto resp) throws Exception {
                ChannelHandler handler;
                DataTransferProtos.Status pipelineStatus = resp.getStatus();
                if (PipelineAck.isRestartOOBStatus((DataTransferProtos.Status)pipelineStatus)) {
                    throw new IOException("datanode " + dnInfo + " is restarting");
                }
                String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink();
                if (resp.getStatus() != DataTransferProtos.Status.SUCCESS) {
                    if (resp.getStatus() == DataTransferProtos.Status.ERROR_ACCESS_TOKEN) {
                        throw new InvalidBlockTokenException("Got access token error, status message " + resp.getMessage() + ", " + logInfo);
                    }
                    throw new IOException("Got error, status=" + resp.getStatus().name() + ", status message " + resp.getMessage() + ", " + logInfo);
                }
                ChannelPipeline p = ctx.pipeline();
                while ((handler = p.removeLast()) != null && !(handler instanceof IdleStateHandler)) {
                }
                ctx.channel().config().setAutoRead(false);
                promise.trySuccess((Object)ctx.channel());
            }

            public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                promise.tryFailure((Throwable)new IOException("connection to " + dnInfo + " is closed"));
            }

            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                if (evt instanceof IdleStateEvent && ((IdleStateEvent)evt).state() == IdleState.READER_IDLE) {
                    promise.tryFailure((Throwable)new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
                } else {
                    super.userEventTriggered(ctx, evt);
                }
            }

            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                promise.tryFailure(cause);
            }
        }});
    }

    private static void requestWriteBlock(Channel channel, Enum<?> storageType, DataTransferProtos.OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
        DataTransferProtos.OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build();
        int protoLen = proto.getSerializedSize();
        ByteBuf buffer = channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size((int)protoLen) + protoLen);
        buffer.writeShort(28);
        buffer.writeByte((int)Op.WRITE_BLOCK.code);
        proto.writeDelimitedTo((OutputStream)new ByteBufOutputStream(buffer));
        channel.writeAndFlush((Object)buffer);
    }

    private static void initialize(Configuration conf, final Channel channel, final DatanodeInfo dnInfo, final Enum<?> storageType, final DataTransferProtos.OpWriteBlockProto.Builder writeBlockProtoBuilder, final int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken, final Promise<Channel> promise) throws IOException {
        Promise saslPromise = channel.eventLoop().newPromise();
        FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, (Promise<Void>)saslPromise);
        saslPromise.addListener((GenericFutureListener)new FutureListener<Void>(){

            public void operationComplete(Future<Void> future) throws Exception {
                if (future.isSuccess()) {
                    FanOutOneBlockAsyncDFSOutputHelper.processWriteBlockResponse(channel, dnInfo, (Promise<Channel>)promise, timeoutMs);
                    FanOutOneBlockAsyncDFSOutputHelper.requestWriteBlock(channel, storageType, writeBlockProtoBuilder);
                } else {
                    promise.tryFailure(future.cause());
                }
            }
        });
    }

    private static List<Future<Channel>> connectToDataNodes(final Configuration conf, DFSClient client, String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) {
        StorageType[] storageTypes = locatedBlock.getStorageTypes();
        DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
        boolean connectToDnViaHostname = conf.getBoolean("dfs.client.use.datanode.hostname", false);
        int timeoutMs = conf.getInt("dfs.client.socket-timeout", 60000);
        ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
        blockCopy.setNumBytes(locatedBlock.getBlockSize());
        DataTransferProtos.ClientOperationHeaderProto header = DataTransferProtos.ClientOperationHeaderProto.newBuilder().setBaseHeader(DataTransferProtos.BaseHeaderProto.newBuilder().setBlock(PB_HELPER.convert(blockCopy)).setToken(PB_HELPER.convert(locatedBlock.getBlockToken()))).setClientName(clientName).build();
        DataTransferProtos.ChecksumProto checksumProto = DataTransferProtoUtil.toProto((DataChecksum)summer);
        DataTransferProtos.OpWriteBlockProto.Builder writeBlockProtoBuilder = DataTransferProtos.OpWriteBlockProto.newBuilder().setHeader(header).setStage(DataTransferProtos.OpWriteBlockProto.BlockConstructionStage.valueOf((String)stage.name())).setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()).setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS).setRequestedChecksum(checksumProto).setCachingStrategy(DataTransferProtos.CachingStrategyProto.newBuilder().setDropBehind(true).build());
        ArrayList<Future<Channel>> futureList = new ArrayList<Future<Channel>>(datanodeInfos.length);
        for (int i = 0; i < datanodeInfos.length; ++i) {
            final DatanodeInfo dnInfo = datanodeInfos[i];
            StorageType storageType = storageTypes[i];
            Promise promise = eventLoopGroup.next().newPromise();
            futureList.add((Future<Channel>)promise);
            String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
            ((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group(eventLoopGroup)).channel(channelClass)).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)timeoutMs)).handler((ChannelHandler)new ChannelInitializer<Channel>(){

                protected void initChannel(Channel ch) throws Exception {
                }
            })).connect((SocketAddress)NetUtils.createSocketAddr((String)dnAddr)).addListener((GenericFutureListener)new ChannelFutureListener((Enum)storageType, writeBlockProtoBuilder, timeoutMs, client, locatedBlock, promise){
                final /* synthetic */ Enum val$storageType;
                final /* synthetic */ DataTransferProtos.OpWriteBlockProto.Builder val$writeBlockProtoBuilder;
                final /* synthetic */ int val$timeoutMs;
                final /* synthetic */ DFSClient val$client;
                final /* synthetic */ LocatedBlock val$locatedBlock;
                final /* synthetic */ Promise val$promise;
                {
                    this.val$storageType = enum_;
                    this.val$writeBlockProtoBuilder = builder;
                    this.val$timeoutMs = n;
                    this.val$client = dFSClient;
                    this.val$locatedBlock = locatedBlock;
                    this.val$promise = promise;
                }

                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        FanOutOneBlockAsyncDFSOutputHelper.initialize(conf, future.channel(), dnInfo, this.val$storageType, this.val$writeBlockProtoBuilder, this.val$timeoutMs, this.val$client, (Token<BlockTokenIdentifier>)this.val$locatedBlock.getBlockToken(), (Promise<Channel>)this.val$promise);
                    } else {
                        this.val$promise.tryFailure(future.cause());
                    }
                }
            });
        }
        return futureList;
    }

    /*
     * Exception decompiling
     */
    private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [9[CATCHBLOCK]], but top level block is 4[TRYBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    public static FanOutOneBlockAsyncDFSOutput createOutput(final DistributedFileSystem dfs, Path f, final boolean overwrite, final boolean createParent, final short replication, final long blockSize, final EventLoopGroup eventLoopGroup, final Class<? extends Channel> channelClass) throws IOException {
        return (FanOutOneBlockAsyncDFSOutput)new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>(){

            public FanOutOneBlockAsyncDFSOutput doCall(Path p) throws IOException, UnresolvedLinkException {
                return FanOutOneBlockAsyncDFSOutputHelper.createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, blockSize, eventLoopGroup, (Class<? extends Channel>)channelClass);
            }

            public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException {
                throw new UnsupportedOperationException();
            }
        }.resolve((FileSystem)dfs, f);
    }

    public static boolean shouldRetryCreate(RemoteException e) {
        return e.getClassName().endsWith("RetryStartFileException");
    }

    static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName, ExtendedBlock block, long fileId) {
        int retry = 0;
        while (true) {
            try {
                if (namenode.complete(src, clientName, block, fileId)) {
                    FanOutOneBlockAsyncDFSOutputHelper.endFileLease(client, fileId);
                    return;
                }
                LOG.warn("complete file " + src + " not finished, retry = " + retry);
            }
            catch (RemoteException e) {
                IOException ioe = e.unwrapRemoteException();
                if (ioe instanceof LeaseExpiredException) {
                    LOG.warn("lease for file " + src + " is expired, give up", (Throwable)e);
                    return;
                }
                LOG.warn("complete file " + src + " failed, retry = " + retry, (Throwable)e);
            }
            catch (Exception e) {
                LOG.warn("complete file " + src + " failed, retry = " + retry, (Throwable)e);
            }
            FanOutOneBlockAsyncDFSOutputHelper.sleepIgnoreInterrupt(retry);
            ++retry;
        }
    }

    static void sleepIgnoreInterrupt(int retry) {
        try {
            Thread.sleep(ConnectionUtils.getPauseTime((long)100L, (int)retry));
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    static {
        try {
            PIPELINE_ACK_STATUS_GETTER = FanOutOneBlockAsyncDFSOutputHelper.createPipelineAckStatusGetter();
            STORAGE_TYPE_SETTER = FanOutOneBlockAsyncDFSOutputHelper.createStorageTypeSetter();
            BLOCK_ADDER = FanOutOneBlockAsyncDFSOutputHelper.createBlockAdder();
            LEASE_MANAGER = FanOutOneBlockAsyncDFSOutputHelper.createLeaseManager();
            DFS_CLIENT_ADAPTOR = FanOutOneBlockAsyncDFSOutputHelper.createDFSClientAdaptor();
            PB_HELPER = FanOutOneBlockAsyncDFSOutputHelper.createPBHelper();
            CHECKSUM_CREATER = FanOutOneBlockAsyncDFSOutputHelper.createChecksumCreater();
            FILE_CREATOR = FanOutOneBlockAsyncDFSOutputHelper.createFileCreator();
        }
        catch (Exception e) {
            String msg = "Couldn't properly initialize access to HDFS internals. Please update your WAL Provider to not make use of the 'asyncfs' provider. See HBASE-16110 for more information.";
            LOG.error(msg, (Throwable)e);
            throw new Error(msg, e);
        }
    }

    public static class NameNodeException
    extends IOException {
        private static final long serialVersionUID = 3143237406477095390L;

        public NameNodeException(Throwable cause) {
            super(cause);
        }
    }

    static final class CancelOnClose
    implements CancelableProgressable {
        private final DFSClient client;

        public CancelOnClose(DFSClient client) {
            this.client = client;
        }

        @Override
        public boolean progress() {
            return DFS_CLIENT_ADAPTOR.isClientRunning(this.client);
        }
    }

    private static interface FileCreator {
        default public HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize, CryptoProtocolVersion[] supportedVersions) throws Exception {
            try {
                return (HdfsFileStatus)this.createObject(instance, src, masked, clientName, flag, createParent, replication, blockSize, supportedVersions);
            }
            catch (InvocationTargetException e) {
                if (e.getCause() instanceof Exception) {
                    throw (Exception)e.getCause();
                }
                throw new RuntimeException(e.getCause());
            }
        }

        public Object createObject(ClientProtocol var1, String var2, FsPermission var3, String var4, EnumSetWritable<CreateFlag> var5, boolean var6, short var7, long var8, CryptoProtocolVersion[] var10) throws Exception;
    }

    private static interface ChecksumCreater {
        public DataChecksum createChecksum(DFSClient var1);
    }

    private static interface PBHelper {
        public HdfsProtos.ExtendedBlockProto convert(ExtendedBlock var1);

        public SecurityProtos.TokenProto convert(Token<?> var1);
    }

    private static interface DFSClientAdaptor {
        public boolean isClientRunning(DFSClient var1);
    }

    private static interface LeaseManager {
        public void begin(DFSClient var1, long var2);

        public void end(DFSClient var1, long var2);
    }

    private static interface BlockAdder {
        public LocatedBlock addBlock(ClientProtocol var1, String var2, String var3, ExtendedBlock var4, DatanodeInfo[] var5, long var6, String[] var8) throws IOException;
    }

    private static interface StorageTypeSetter {
        public DataTransferProtos.OpWriteBlockProto.Builder set(DataTransferProtos.OpWriteBlockProto.Builder var1, Enum<?> var2);
    }

    private static interface PipelineAckStatusGetter {
        public DataTransferProtos.Status get(DataTransferProtos.PipelineAckProto var1);
    }
}

