/*
 * Decompiled with CFR 0.152.
 */
package com.xebialabs.deployit.plugin.python;

import com.xebialabs.deployit.booter.local.utils.Closeables;
import com.xebialabs.deployit.engine.spi.execution.ExecutionStateListener;
import com.xebialabs.deployit.engine.spi.execution.StepExecutionStateEvent;
import com.xebialabs.deployit.engine.spi.execution.TaskExecutionStateEvent;
import com.xebialabs.deployit.plugin.api.flow.ExecutionContext;
import com.xebialabs.deployit.plugin.python.PythonManagingContainer;
import com.xebialabs.deployit.plugin.python.PythonStep;
import com.xebialabs.deployit.plugin.python.PythonVarsConverter;
import com.xebialabs.deployit.plugin.remoting.scripts.ScriptUtils;
import com.xebialabs.overthere.OverthereConnection;
import com.xebialabs.overthere.OverthereFile;
import com.xebialabs.overthere.OverthereProcess;
import com.xebialabs.overthere.RuntimeIOException;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class PythonDaemon
implements ExecutionStateListener {
    private static final String DAEMON_SCRIPT_NAME = "daemon.py";
    private static final String DAEMON_SCRIPT_PATH = "python/daemon/daemon.py";
    private static final String DAEMON_CHECKPOINT = "DEPLOYIT-DAEMON-CHECKPOINT";
    private static final String DAEMON_EXIT_CODE_MARKER = "DEPLOYIT-DAEMON-EXIT-CODE:";
    private static final int FLUSH_DELAY_MS = 5000;
    private static final int FLUSH_CHECK_INTERVAL_MS = 1000;
    private static final Timer flushTimer = new Timer("Daemon-AutoFlushTimer", true);
    private final PythonManagingContainer container;
    private transient OverthereConnection connection;
    private transient OverthereProcess process;
    private transient OutputStream stdin;
    private transient Thread stdoutT;
    private transient Thread stderrT;
    private transient AtomicReference<CountDownLatch> checkpoint = new AtomicReference<CountDownLatch>(new CountDownLatch(2));
    private transient AtomicReference<ExecutionContext> currentContext = new AtomicReference();
    private transient AtomicInteger lastExitCode = new AtomicInteger(-1);
    private static Logger logger = LoggerFactory.getLogger(PythonDaemon.class);

    public PythonDaemon(PythonManagingContainer container) {
        this.container = container;
    }

    public boolean isAlive() {
        return this.connection != null && this.process != null && this.stdin != null && this.stdoutT != null && this.stderrT != null;
    }

    public void start(ExecutionContext context) {
        context.logOutput("Starting the daemon on [" + this.container.getHost() + "]");
        this.connection = this.container.getHost().getConnection();
        OverthereFile uploadedDaemonScript = this.uploadDaemonScript();
        this.waitForDaemonStart(context, uploadedDaemonScript);
    }

    private OverthereFile uploadDaemonScript() {
        String daemonScript = this.generateDaemonScript();
        PythonStep.dumpPythonScript(DAEMON_SCRIPT_NAME, daemonScript);
        logger.debug("Uploading the daemon script daemon.py");
        return ScriptUtils.uploadScript((OverthereConnection)this.connection, (String)DAEMON_SCRIPT_NAME, (String)daemonScript);
    }

    private String generateDaemonScript() {
        StringBuilder b = new StringBuilder();
        PythonStep.appendRuntimeScripts(this.container, b);
        b.append("#\nconnectFromDaemon()\n");
        b.append(ScriptUtils.loadScript((String)DAEMON_SCRIPT_PATH));
        b.append("#\ndisconnectFromDaemon()\n");
        return b.toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitForDaemonStart(ExecutionContext ctx, OverthereFile uploadedDaemonScript) {
        this.currentContext.set(ctx);
        try {
            logger.debug("Starting the daemon");
            this.process = this.connection.startProcess(this.container.getScriptCommandLine(uploadedDaemonScript));
            logger.debug("Starting the daemon stream stdout and stderr handler threads");
            this.stdin = this.process.getStdin();
            this.startStdoutReaderThread();
            this.startStderrReaderThread();
            logger.debug("Waiting for the daemon to finish starting");
            this.waitForCheckpoints();
            logger.debug("The daemon has started");
        }
        finally {
            this.currentContext.set(null);
        }
    }

    private void startStdoutReaderThread() {
        String stdoutTname = "Daemon stdout";
        if (logger.isDebugEnabled()) {
            stdoutTname = stdoutTname + " for " + this.container.getId();
        }
        this.stdoutT = new Thread((Runnable)new StdoutThread(this.process.getStdout()), stdoutTname);
        this.stdoutT.setDaemon(true);
        this.stdoutT.start();
    }

    private void startStderrReaderThread() {
        String stderrTname = "Daemon stderr";
        if (logger.isDebugEnabled()) {
            stderrTname = stderrTname + " for " + this.container.getId();
        }
        this.stderrT = new Thread((Runnable)new StderrThread(this.process.getStderr()), stderrTname);
        this.stderrT.setDaemon(true);
        this.stderrT.start();
    }

    public int executePythonScript(ExecutionContext ctx, OverthereFile script) {
        this.currentContext.set(ctx);
        try {
            logger.debug("Resetting countdown latch to 2");
            this.checkpoint.set(new CountDownLatch(2));
            logger.debug("Resetting last exit code to -1");
            this.lastExitCode.set(-1);
            logger.info("Executing uploaded script [{}] on [{}] (with daemon)", (Object)script.getPath(), (Object)this.connection);
            this.sendLine("runScriptFromDaemon(" + PythonVarsConverter.toPythonString(script.getPath()) + ")");
            logger.debug("Waiting for the daemon to finish executing the command");
            this.waitForCheckpoints();
            int exitCode = this.lastExitCode.get();
            logger.debug("Returning last exit code [{}]", (Object)exitCode);
            int n = exitCode;
            return n;
        }
        catch (IOException exc) {
            throw new RuntimeIOException(String.format("Cannot execute script [%s] on [%s]", script.getPath(), this.container.getHost()), (Throwable)exc);
        }
        finally {
            this.currentContext.set(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stop() {
        logger.info("Stopping the daemon on [{}]", (Object)this.container.getHost());
        try {
            this.sendLine("EXIT");
            this.process.waitFor();
            this.process = null;
        }
        catch (IOException | InterruptedException | RuntimeException exc) {
            logger.error("Error stopping daemon", (Throwable)exc);
        }
        finally {
            Closeables.closeQuietly((Closeable)this.connection);
            this.connection = null;
        }
    }

    private void sendLine(String line) throws IOException {
        logger.debug("Sending line [{}] to the daemon", (Object)line);
        this.stdin.write((line + this.container.getHost().getOs().getLineSeparator()).getBytes());
        this.stdin.flush();
    }

    private void countdownCheckpointLatch() {
        logger.debug("Counting down the checkpoint latch");
        this.checkpoint.get().countDown();
        logger.debug("Done counting down the checkpoint latch");
    }

    private void waitForCheckpoints() {
        try {
            logger.debug("Waiting for the checkpoint latch");
            this.checkpoint.get().await();
            logger.debug("Done waiting for the checkpoint latch");
        }
        catch (InterruptedException exc) {
            throw new RuntimeException("Interrupted waiting for checkpoints", exc);
        }
    }

    public void stepStateChanged(StepExecutionStateEvent event) {
    }

    public void taskStateChanged(TaskExecutionStateEvent event) {
        if (this.isAlive() && event.currentState().isPassiveAfterExecuting()) {
            this.stop();
        }
    }

    private class StderrThread
    extends StreamThread {
        StderrThread(InputStream out) {
            super(out);
        }

        @Override
        void log(String line) {
            ExecutionContext c = (ExecutionContext)PythonDaemon.this.currentContext.get();
            if (c != null) {
                c.logError(line);
            }
        }
    }

    private class StdoutThread
    extends StreamThread {
        StdoutThread(InputStream out) {
            super(out);
        }

        @Override
        void log(String line) {
            ExecutionContext c = (ExecutionContext)PythonDaemon.this.currentContext.get();
            if (c != null) {
                c.logOutput(line);
            }
        }
    }

    private abstract class StreamThread
    implements Runnable {
        private Reader out;
        private final StringBuffer lineBuffer;

        StreamThread(InputStream out) {
            this.out = new InputStreamReader(out);
            this.lineBuffer = new StringBuffer();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block20: {
                this.cleanMDC();
                final long[] flushAfter = new long[1];
                TimerTask flushTimerTask = new TimerTask(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        StringBuffer stringBuffer = StreamThread.this.lineBuffer;
                        synchronized (stringBuffer) {
                            if (flushAfter[0] < System.currentTimeMillis()) {
                                if (StreamThread.this.lineBuffer.length() > 0) {
                                    String line = StreamThread.this.lineBuffer.toString();
                                    StreamThread.this.lineBuffer.setLength(0);
                                    logger.debug("Partial line has not been updated for 5000ms, flushing line: [{}]", (Object)line);
                                    StreamThread.this.lineReceived(line);
                                }
                                flushAfter[0] = System.currentTimeMillis() + 5000L;
                            }
                        }
                    }
                };
                flushTimer.schedule(flushTimerTask, 5000L, 1000L);
                try {
                    char prevC = '\u0000';
                    while (true) {
                        try {
                            while (true) {
                                int cInt;
                                if ((cInt = this.out.read()) == -1) {
                                    this.connectionLost();
                                    break block20;
                                }
                                char c = (char)cInt;
                                try {
                                    StringBuffer stringBuffer;
                                    if (c != '\r' && c != '\n') {
                                        logger.trace("Adding char [{}] to line buffer", (Object)Character.valueOf(c));
                                        stringBuffer = this.lineBuffer;
                                        synchronized (stringBuffer) {
                                            this.lineBuffer.append(c);
                                            continue;
                                        }
                                    }
                                    if (c == '\n' && prevC == '\r') {
                                        logger.trace("Skipping LF after CR");
                                        continue;
                                    }
                                    stringBuffer = this.lineBuffer;
                                    synchronized (stringBuffer) {
                                        flushAfter[0] = System.currentTimeMillis() + 5000L;
                                        String line = this.lineBuffer.toString();
                                        this.lineBuffer.setLength(0);
                                        logger.trace("Newline found, flushing line: [{}]", (Object)line);
                                        this.lineReceived(line);
                                        continue;
                                    }
                                }
                                finally {
                                    prevC = c;
                                    continue;
                                }
                                break;
                            }
                        }
                        catch (IOException exc) {
                            this.exceptionReceived(exc);
                            continue;
                        }
                        break;
                    }
                }
                finally {
                    flushTimerTask.cancel();
                }
            }
        }

        private void cleanMDC() {
            String username = MDC.get((String)"username");
            String taskId = MDC.get((String)"taskId");
            MDC.clear();
            if (username != null) {
                MDC.put((String)"username", (String)username);
            }
            if (taskId != null) {
                MDC.put((String)"taskId", (String)taskId);
            }
        }

        void lineReceived(String line) {
            if (line.startsWith(PythonDaemon.DAEMON_CHECKPOINT)) {
                logger.debug("Detected checkpoint.");
                PythonDaemon.this.countdownCheckpointLatch();
            } else if (line.startsWith(PythonDaemon.DAEMON_EXIT_CODE_MARKER)) {
                logger.debug("Detected exit code. Parsing it.");
                String exitCodeStr = line.substring(PythonDaemon.DAEMON_EXIT_CODE_MARKER.length());
                try {
                    int exitCode = Integer.parseInt(exitCodeStr);
                    logger.debug("Exit code parsed: {}", (Object)exitCode);
                    PythonDaemon.this.lastExitCode.set(exitCode);
                }
                catch (NumberFormatException ignored) {
                    logger.debug("Cannot parse exit code [{}]", (Object)exitCodeStr);
                }
            } else {
                this.log(line);
            }
        }

        abstract void log(String var1);

        void exceptionReceived(IOException exc) {
            logger.debug("Error in connection to the daemon", (Throwable)exc);
            ExecutionContext c = (ExecutionContext)PythonDaemon.this.currentContext.get();
            if (c != null) {
                c.logError("Error in connection to the daemon", (Throwable)exc);
            }
            this.connectionLost();
        }

        void connectionLost() {
            logger.debug("Lost connection to the daemon");
            PythonDaemon.this.countdownCheckpointLatch();
        }
    }
}

