/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geode.distributed.internal.tcpserver;

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.StreamCorruptedException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import javax.net.ssl.SSLException;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.distributed.internal.tcpserver.ProtocolChecker;
import org.apache.geode.distributed.internal.tcpserver.ShutdownRequest;
import org.apache.geode.distributed.internal.tcpserver.ShutdownResponse;
import org.apache.geode.distributed.internal.tcpserver.TcpHandler;
import org.apache.geode.distributed.internal.tcpserver.TcpSocketCreator;
import org.apache.geode.distributed.internal.tcpserver.TcpSocketCreatorImpl;
import org.apache.geode.distributed.internal.tcpserver.VersionRequest;
import org.apache.geode.distributed.internal.tcpserver.VersionResponse;
import org.apache.geode.internal.serialization.ObjectDeserializer;
import org.apache.geode.internal.serialization.ObjectSerializer;
import org.apache.geode.internal.serialization.UnsupportedSerializationVersionException;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.internal.serialization.VersionedDataInputStream;
import org.apache.geode.internal.serialization.VersionedDataOutputStream;
import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.logging.log4j.Logger;

public class TcpServer {
    public static final int GOSSIPVERSION = 1002;
    public static final int OLDGOSSIPVERSION = 1001;
    @MutableForTesting(value="The map used here is mutable, because some tests modify it")
    private static final Map<Integer, Short> GOSSIP_TO_GEMFIRE_VERSION_MAP = TcpServer.createGossipToVersionMap();
    public static final int GOSSIP_BYTE = 0;
    private static final String P2P_BACKLOG_PROPERTY_NAME = "p2p.backlog";
    public static final long SHUTDOWN_WAIT_TIME = 60000L;
    private static final Logger logger = LogService.getLogger();
    private final int readTimeout;
    private final int backlogLimit;
    private final ProtocolChecker protocolChecker;
    private final ObjectDeserializer objectDeserializer;
    private final ObjectSerializer objectSerializer;
    private int port;
    private ServerSocket srv_sock = null;
    private InetAddress bind_address;
    private volatile boolean shuttingDown = false;
    private final TcpHandler handler;
    private ExecutorService executor;
    private final Supplier<ExecutorService> executorServiceSupplier;
    private final String threadName;
    private volatile Thread serverThread;
    protected TcpSocketCreator socketCreator;
    private final LongSupplier nanoTimeSupplier;

    private static Map<Integer, Short> createGossipToVersionMap() {
        HashMap<Integer, Short> map = new HashMap<Integer, Short>();
        map.put(1002, Version.GFE_71.ordinal());
        map.put(1001, Version.GFE_57.ordinal());
        return map;
    }

    public TcpServer(int port, InetAddress bind_address, TcpHandler handler, String threadName, ProtocolChecker protocolChecker, LongSupplier nanoTimeSupplier, Supplier<ExecutorService> executorServiceSupplier, TcpSocketCreator socketCreator, ObjectSerializer objectSerializer, ObjectDeserializer objectDeserializer, String readTimeoutPropertyName, String backlogLimitPropertyName) {
        this.port = port;
        this.bind_address = bind_address;
        this.handler = handler;
        this.protocolChecker = protocolChecker;
        this.executorServiceSupplier = executorServiceSupplier;
        this.executor = executorServiceSupplier.get();
        this.threadName = threadName;
        this.nanoTimeSupplier = nanoTimeSupplier;
        this.socketCreator = socketCreator == null ? new TcpSocketCreatorImpl() : socketCreator;
        this.objectSerializer = objectSerializer;
        this.objectDeserializer = objectDeserializer;
        this.readTimeout = Integer.getInteger(readTimeoutPropertyName, 60000);
        int p2pBacklog = Integer.getInteger(P2P_BACKLOG_PROPERTY_NAME, 1000);
        this.backlogLimit = Integer.getInteger(backlogLimitPropertyName, p2pBacklog);
    }

    public void restarting() throws IOException {
        this.shuttingDown = false;
        this.startServerThread();
        if (this.executor == null || this.executor.isShutdown()) {
            this.executor = this.executorServiceSupplier.get();
        }
        logger.info("TcpServer@" + System.identityHashCode(this) + " restarting: completed.  Server thread=" + this.serverThread + '@' + System.identityHashCode(this.serverThread) + ";alive=" + this.serverThread.isAlive());
    }

    public void start() throws IOException {
        this.shuttingDown = false;
        this.startServerThread();
        this.handler.init(this);
    }

    private void startServerThread() throws IOException {
        this.initializeServerSocket();
        if (this.serverThread == null || !this.serverThread.isAlive()) {
            this.serverThread = new LoggingThread(this.threadName, this::run);
            this.serverThread.start();
        }
    }

    private void initializeServerSocket() throws IOException {
        if (this.srv_sock == null || this.srv_sock.isClosed()) {
            if (this.bind_address == null) {
                this.srv_sock = this.socketCreator.forCluster().createServerSocket(this.port, this.backlogLimit);
                this.bind_address = this.srv_sock.getInetAddress();
            } else {
                this.srv_sock = this.socketCreator.forCluster().createServerSocket(this.port, this.backlogLimit, this.bind_address);
            }
            if (this.port <= 0) {
                this.port = this.srv_sock.getLocalPort();
            }
            if (logger.isInfoEnabled()) {
                logger.info("Locator was created at " + new Date());
                logger.info("Listening on port " + this.getPort() + " bound on address " + this.bind_address);
            }
            this.srv_sock.setReuseAddress(true);
        }
    }

    public void join(long millis) throws InterruptedException {
        if (this.isAlive()) {
            this.serverThread.join(millis);
        }
    }

    public void join() throws InterruptedException {
        if (this.isAlive()) {
            this.serverThread.join();
        }
    }

    public boolean isAlive() {
        return this.serverThread != null && this.serverThread.isAlive();
    }

    public boolean isShuttingDown() {
        return this.shuttingDown;
    }

    public SocketAddress getSocketAddress() {
        return this.srv_sock.getLocalSocketAddress();
    }

    public int getPort() {
        return this.port;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void run() {
        Socket sock = null;
        while (!this.shuttingDown) {
            if (this.srv_sock.isClosed()) {
                this.shuttingDown = true;
                break;
            }
            try {
                try {
                    sock = this.srv_sock.accept();
                }
                catch (SSLException ex) {
                    logger.error("Locator stopping due to SSL configuration problem.", (Throwable)ex);
                    this.shuttingDown = true;
                    continue;
                }
                this.processRequest(sock);
            }
            catch (Exception ex) {
                if (this.shuttingDown) continue;
                logger.error("exception=", (Throwable)ex);
            }
        }
        if (!this.srv_sock.isClosed()) {
            try {
                this.srv_sock.close();
            }
            catch (IOException ex) {
                logger.warn("exception closing server socket during shutdown", (Throwable)ex);
            }
        }
        if (this.shuttingDown) {
            logger.info("locator shutting down");
            this.executor.shutdown();
            try {
                this.executor.awaitTermination(60000L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException ignore) {
                Thread.currentThread().interrupt();
            }
            this.handler.shutDown();
            TcpServer tcpServer = this;
            synchronized (tcpServer) {
                this.notifyAll();
            }
        }
    }

    private void processRequest(Socket socket) {
        this.executor.execute(() -> {
            long startTime = this.nanoTimeSupplier.getAsLong();
            DataInputStream input = null;
            try {
                socket.setSoTimeout(this.readTimeout);
                this.socketCreator.forCluster().handshakeIfSocketIsSSL(socket, this.readTimeout);
                try {
                    input = new DataInputStream(socket.getInputStream());
                }
                catch (StreamCorruptedException e) {
                    logger.debug("Discarding illegal request from " + socket.getInetAddress().getHostAddress() + ":" + socket.getPort(), (Throwable)e);
                    try {
                        socket.close();
                    }
                    catch (IOException iOException) {
                        // empty catch block
                    }
                    return;
                }
                int firstByte = input.readUnsignedByte();
                boolean handled = this.protocolChecker.checkProtocol(socket, input, firstByte);
                if (!handled) {
                    if (firstByte == 0) {
                        this.processOneConnection(socket, startTime, input);
                    } else {
                        this.rejectUnknownProtocolConnection(socket, firstByte);
                    }
                }
            }
            catch (EOFException | SocketException firstByte) {
            }
            catch (SocketTimeoutException ex) {
                String sender = null;
                if (socket != null) {
                    sender = socket.getInetAddress().getHostAddress();
                }
                logger.info("Exception in processing request from " + sender + ": " + ex.getMessage());
            }
            catch (ClassNotFoundException ex) {
                String sender = null;
                if (socket != null) {
                    sender = socket.getInetAddress().getHostAddress();
                }
                logger.info("Unable to process request from " + sender + " exception=" + ex.getMessage());
            }
            catch (Exception ex) {
                String sender = null;
                if (socket != null) {
                    sender = socket.getInetAddress().getHostAddress();
                }
                if (ex instanceof IOException) {
                    if (!socket.isClosed()) {
                        logger.info("Exception in processing request from " + sender, (Throwable)ex);
                    }
                } else {
                    logger.fatal("Exception in processing request from " + sender, (Throwable)ex);
                }
            }
            catch (Throwable ex) {
                String sender = null;
                if (socket != null) {
                    sender = socket.getInetAddress().getHostAddress();
                }
                try {
                    logger.fatal("Exception in processing request from " + sender, ex);
                }
                catch (Throwable t) {
                    t.printStackTrace();
                }
            }
            finally {
                try {
                    socket.close();
                }
                catch (IOException ex) {}
            }
        });
    }

    private void processOneConnection(Socket socket, long startTime, DataInputStream input) throws IOException, UnsupportedSerializationVersionException, ClassNotFoundException {
        int gossipVersion = 0;
        for (int i = 0; i < 3; ++i) {
            gossipVersion = (gossipVersion << 8) + (0xFF & input.readUnsignedByte());
        }
        if (gossipVersion <= TcpServer.getCurrentGossipVersion() && GOSSIP_TO_GEMFIRE_VERSION_MAP.containsKey(gossipVersion)) {
            Object response;
            short versionOrdinal = GOSSIP_TO_GEMFIRE_VERSION_MAP.get(gossipVersion);
            if (Version.GFE_71.compareTo(versionOrdinal) <= 0) {
                versionOrdinal = input.readShort();
            }
            if (logger.isDebugEnabled() && versionOrdinal != Version.CURRENT_ORDINAL) {
                logger.debug("Locator reading request from " + socket.getInetAddress() + " with version " + Version.fromOrdinal((short)versionOrdinal));
            }
            input = new VersionedDataInputStream((InputStream)input, Version.fromOrdinal((short)versionOrdinal));
            Object request = this.objectDeserializer.readObject((DataInput)input);
            if (logger.isDebugEnabled()) {
                logger.debug("Locator received request " + request + " from " + socket.getInetAddress());
            }
            if (request instanceof ShutdownRequest) {
                this.shuttingDown = true;
                this.srv_sock.close();
                response = new ShutdownResponse();
            } else {
                response = request instanceof VersionRequest ? this.handleVersionRequest(request) : this.handler.processRequest(request);
            }
            this.handler.endRequest(request, startTime);
            long startTime2 = this.nanoTimeSupplier.getAsLong();
            if (response != null) {
                DataOutputStream output = new DataOutputStream(socket.getOutputStream());
                if (versionOrdinal != Version.CURRENT_ORDINAL) {
                    output = new VersionedDataOutputStream((OutputStream)output, Version.fromOrdinal((short)versionOrdinal));
                }
                this.objectSerializer.writeObject(response, (DataOutput)output);
                output.flush();
            }
            this.handler.endResponse(request, startTime2);
        } else {
            this.rejectUnknownProtocolConnection(socket, gossipVersion);
        }
    }

    private void rejectUnknownProtocolConnection(Socket socket, int gossipVersion) {
        try {
            socket.getOutputStream().write("unknown protocol version".getBytes());
            socket.getOutputStream().flush();
            socket.close();
        }
        catch (IOException e) {
            logger.debug("exception in sending reply to process using unknown protocol " + gossipVersion, (Throwable)e);
        }
    }

    protected Object handleVersionRequest(Object request) {
        VersionResponse response = new VersionResponse();
        response.setVersionOrdinal(Version.CURRENT_ORDINAL);
        return response;
    }

    public static int getCurrentGossipVersion() {
        return 1002;
    }

    public static int getOldGossipVersion() {
        return 1001;
    }
}

