/*
 * Decompiled with CFR 0.152.
 */
package org.spf4j.io.tcp;

import com.google.common.annotations.Beta;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.Service;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spf4j.base.Closeables;
import org.spf4j.base.TimeSource;
import org.spf4j.concurrent.RestartableServiceImpl;
import org.spf4j.ds.UpdateablePriorityQueue;
import org.spf4j.failsafe.RetryPolicy;
import org.spf4j.io.tcp.AcceptorSelectorEventHandler;
import org.spf4j.io.tcp.ClientHandler;
import org.spf4j.io.tcp.DeadlineAction;
import org.spf4j.io.tcp.SelectorEventHandler;

@SuppressFBWarnings(value={"HES_EXECUTOR_NEVER_SHUTDOWN"})
@Beta
public final class TcpServer
extends RestartableServiceImpl {
    private static final Logger LOG = LoggerFactory.getLogger(TcpServer.class);
    private final int serverPort;

    public TcpServer(ExecutorService executor, ClientHandler handlerFactory, int serverPort, int acceptBacklog) {
        this(executor, handlerFactory, serverPort, acceptBacklog, 60000);
    }

    public TcpServer(final ExecutorService executor, final ClientHandler handlerFactory, final int serverPort, final int acceptBacklog, final int bindTimeoutMillis) {
        super(new Supplier<Service>(){

            public Service get() {
                return new TcpServerGuavaService(executor, handlerFactory, serverPort, acceptBacklog, bindTimeoutMillis);
            }
        });
        this.serverPort = serverPort;
    }

    @Override
    public String getServiceName() {
        return "TCP:LISTEN:" + this.serverPort;
    }

    public static final class TcpServerGuavaService
    extends AbstractExecutionThreadService
    implements Closeable {
        private final ExecutorService executor;
        private final ClientHandler handlerFactory;
        private final int serverPort;
        private final int acceptBacklog;
        private final int bindTimeoutMillis;
        private volatile boolean shouldRun;
        private volatile Selector selector;
        private volatile ServerSocketChannel serverCh;

        public TcpServerGuavaService(ExecutorService executor, ClientHandler handlerFactory, int serverPort, int acceptBacklog, int bindTimeoutMillis) {
            this.executor = executor;
            this.handlerFactory = handlerFactory;
            this.acceptBacklog = acceptBacklog;
            this.serverPort = serverPort;
            this.shouldRun = true;
            this.selector = null;
            this.bindTimeoutMillis = bindTimeoutMillis;
        }

        protected void startUp() throws Exception {
            this.selector = Selector.open();
            try {
                this.serverCh = (ServerSocketChannel)RetryPolicy.defaultPolicy().call(() -> {
                    ServerSocketChannel sc = ServerSocketChannel.open();
                    try {
                        sc.bind(new InetSocketAddress(this.serverPort), this.acceptBacklog);
                        sc.configureBlocking(false);
                        return sc;
                    }
                    catch (IOException | RuntimeException e) {
                        sc.close();
                        throw e;
                    }
                }, IOException.class, (long)this.bindTimeoutMillis, TimeUnit.MILLISECONDS);
            }
            catch (IOException | RuntimeException e) {
                this.selector.close();
                throw e;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @SuppressFBWarnings(value={"AFBR_ABNORMAL_FINALLY_BLOCK_RETURN"})
        public void run() throws IOException {
            Selector sel = this.selector;
            try {
                ArrayBlockingQueue<Runnable> tasksToRunBySelector = new ArrayBlockingQueue<Runnable>(64);
                UpdateablePriorityQueue<DeadlineAction> deadlineActions = new UpdateablePriorityQueue<DeadlineAction>(64, DeadlineAction.COMPARATOR);
                new AcceptorSelectorEventHandler(this.serverCh, this.handlerFactory, sel, this.executor, tasksToRunBySelector, deadlineActions).initialInterestRegistration();
                while (this.shouldRun) {
                    Runnable task;
                    DeadlineAction peek;
                    int nrSelectors = sel.select(100L);
                    if (nrSelectors > 0) {
                        Set<SelectionKey> selectedKeys = sel.selectedKeys();
                        Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
                        while (keyIterator.hasNext()) {
                            SelectionKey skey = keyIterator.next();
                            Object attachment = skey.attachment();
                            if (attachment instanceof SelectorEventHandler) {
                                SelectorEventHandler seh = (SelectorEventHandler)attachment;
                                try {
                                    if (seh.canRunAsync()) {
                                        seh.runAsync(skey);
                                    } else {
                                        seh.run(skey);
                                    }
                                }
                                catch (CancelledKeyException ex) {
                                    LOG.debug("Canceled key {}", (Object)skey, (Object)ex);
                                }
                            }
                            keyIterator.remove();
                        }
                    }
                    long currentTime = TimeSource.nanoTime();
                    while ((peek = deadlineActions.peek()) != null && peek.getDeadline() - currentTime <= 0L) {
                        deadlineActions.poll().getAction().run();
                    }
                    while ((task = (Runnable)tasksToRunBySelector.poll()) != null) {
                        task.run();
                    }
                }
            }
            catch (Throwable throwable) {
                IOException closeAll = Closeables.closeAll((Exception)Closeables.closeSelectorChannels(sel), sel, this.serverCh);
                if (closeAll != null) {
                    throw closeAll;
                }
                throw throwable;
            }
            IOException closeAll = Closeables.closeAll((Exception)Closeables.closeSelectorChannels(sel), sel, this.serverCh);
            if (closeAll != null) {
                throw closeAll;
            }
        }

        protected Executor executor() {
            return this.executor;
        }

        protected String serviceName() {
            return "TCP:LISTEN:" + this.serverPort;
        }

        protected void triggerShutdown() {
            this.shouldRun = false;
            this.selector.wakeup();
        }

        @Override
        public void close() throws IOException {
            this.stopAsync().awaitTerminated();
        }

        public String toString() {
            return "TcpServer{executor=" + this.executor + ", handlerFactory=" + this.handlerFactory + ", serverPort=" + this.serverPort + ", acceptBacklog=" + this.acceptBacklog + ", shouldRun=" + this.shouldRun + ", selector=" + this.selector + '}';
        }
    }
}

