/*
 * Decompiled with CFR 0.152.
 */
package io.nats.streaming.examples;

import io.nats.client.Connection;
import io.nats.client.Consumer;
import io.nats.client.ErrorListener;
import io.nats.client.NUID;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.streaming.AckHandler;
import io.nats.streaming.Message;
import io.nats.streaming.MessageHandler;
import io.nats.streaming.NatsStreaming;
import io.nats.streaming.Options;
import io.nats.streaming.StreamingConnection;
import io.nats.streaming.Subscription;
import io.nats.streaming.SubscriptionOptions;
import io.nats.streaming.examples.benchmark.Benchmark;
import io.nats.streaming.examples.benchmark.Sample;
import io.nats.streaming.examples.benchmark.Utils;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class StanBench {
    private int numMsgs = 100000;
    private int numPubs = 1;
    private int numSubs = 0;
    private boolean async = false;
    private int size = 128;
    private boolean ignoreOld = false;
    private int maxPubAcksInFlight = 1000;
    private String clientId = "benchmark";
    private String clusterId = "test-cluster";
    private String urls = "nats://localhost:4222";
    private String subject;
    private final AtomicInteger published = new AtomicInteger();
    private final AtomicInteger received = new AtomicInteger();
    private String csvFileName;
    private io.nats.client.Options natsOptions;
    private Thread shutdownHook;
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private boolean secure;
    private Benchmark bench;
    private static final String usageString = "\nUsage: nats-bench [-s server] [--tls] [-c clusterid] [-id clientid] [-np #pubs] [-ns #subs] [-n #msg] [-mpa #pubacks] [-ms size] [-io] [-a] [-csv file] <subject>\n\nOptions:\n    -s   <urls>                     NATS Streaming server URLs (separated by comma)\n    -cid                            NATS Streaming cluster ID\n    -id                             Benchmark process base client ID\n    -tls                            Use TLS secure connection\n    -np                             Number of concurrent publishers\n    -ns                             Number of concurrent subscribers\n    -n                              Number of messages to publish\n    -a                              Async message publishing\n    -ms                             Message size in bytes\n    -io                             Subscribers ignore old messages\n    -ms                             Message size in bytes\n    -mpa                            Max number of published acks in flight\n    -csv                            Save bench data to csv file\n";

    public StanBench(String[] args) {
        if (args == null || args.length < 1) {
            this.usage();
            return;
        }
        this.parseArgs(args);
    }

    public StanBench(Properties properties) {
        this.urls = properties.getProperty("bench.stan.servers", this.urls);
        this.clientId = properties.getProperty("bench.streaming.client.id", this.clientId);
        this.clusterId = properties.getProperty("bench.stan.cluster.id", this.clusterId);
        this.secure = Boolean.parseBoolean(properties.getProperty("bench.stan.secure", Boolean.toString(this.secure)));
        this.numMsgs = Integer.parseInt(properties.getProperty("bench.stan.msg.count", Integer.toString(this.numMsgs)));
        this.maxPubAcksInFlight = Integer.parseInt(properties.getProperty("bench.stan.pub.maxpubacks", Integer.toString(this.maxPubAcksInFlight)));
        this.size = Integer.parseInt(properties.getProperty("bench.stan.msg.size", Integer.toString(this.numSubs)));
        this.numPubs = Integer.parseInt(properties.getProperty("bench.stan.pubs", Integer.toString(this.numPubs)));
        this.numSubs = Integer.parseInt(properties.getProperty("bench.stan.subs", Integer.toString(this.numSubs)));
        this.csvFileName = properties.getProperty("bench.stan.csv.filename", null);
        this.subject = properties.getProperty("bench.stan.subject", NUID.nextGlobal());
        this.async = Boolean.parseBoolean(properties.getProperty("bench.stan.pub.async", Boolean.toString(this.async)));
        this.ignoreOld = Boolean.parseBoolean(properties.getProperty("bench.stan.sub.ignoreold", Boolean.toString(this.ignoreOld)));
    }

    public void run() throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        Phaser phaser = new Phaser();
        this.installShutdownHook();
        phaser.register();
        String[] servers = this.urls.split(",");
        Options.Builder builder = new Options.Builder();
        builder.noReconnect();
        builder.connectionName("StanBench");
        builder.servers(servers);
        if (this.secure) {
            builder.secure();
        }
        builder.errorListener(new ErrorListener(){

            public void slowConsumerDetected(Connection conn, Consumer consumer) {
                System.err.println("Slow consumer detected on client side.");
            }

            public void exceptionOccurred(Connection conn, Exception ex) {
                System.err.printf("Connection exception %s, connection status is %s\n", ex, conn.getStatus());
                System.err.printf("Sent=%d, Received=%d\n", StanBench.this.published.get(), StanBench.this.received.get());
            }

            public void errorOccurred(Connection conn, String err) {
                System.err.println("Error message from server " + err);
            }
        });
        this.natsOptions = builder.build();
        this.bench = new Benchmark("NATS Streaming");
        for (int i = 0; i < this.numSubs; ++i) {
            phaser.register();
            String subId = String.format("%s-sub-%d", this.clientId, i);
            exec.execute(new SubWorker(phaser, this.numMsgs, this.size, this.ignoreOld, subId));
        }
        phaser.arriveAndAwaitAdvance();
        List<Integer> pubCounts = Utils.msgsPerClient(this.numMsgs, this.numPubs);
        for (int i = 0; i < this.numPubs; ++i) {
            phaser.register();
            String pubId = String.format("%s-pub-%d", this.clientId, i);
            exec.execute(new PubWorker(phaser, pubCounts.get(i), this.size, this.async, pubId));
        }
        System.out.printf("Starting benchmark [msgs=%d, msgsize=%d, pubs=%d, subs=%d]\n", this.numMsgs, this.size, this.numPubs, this.numSubs);
        phaser.arriveAndAwaitAdvance();
        Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
        this.bench.close();
        System.out.println(this.bench.report());
        if (this.csvFileName != null) {
            String csv = this.bench.csv();
            Path csvFile = Paths.get(this.csvFileName, new String[0]);
            Files.write(csvFile, Collections.singletonList(csv), Charset.forName("UTF-8"), new OpenOption[0]);
        }
        exec.shutdown();
    }

    private void installShutdownHook() {
        this.shutdownHook = new Thread(new Runnable(){

            @Override
            public void run() {
                System.err.println("\nCaught CTRL-C, shutting down gracefully...\n");
                StanBench.this.shutdown.set(true);
                System.err.printf("Sent=%d\n", StanBench.this.published.get());
                System.err.printf("Received=%d\n", StanBench.this.received.get());
            }
        });
        Runtime.getRuntime().addShutdownHook(this.shutdownHook);
    }

    private void usage() {
        System.err.println(usageString);
        System.exit(-1);
    }

    private void parseArgs(String[] args) {
        ArrayList<String> argList = new ArrayList<String>(Arrays.asList(args));
        this.subject = (String)argList.get(argList.size() - 1);
        argList.remove(argList.size() - 1);
        if (this.subject.startsWith("-")) {
            this.usage();
        }
        Iterator it = argList.iterator();
        block30: while (it.hasNext()) {
            String arg;
            switch (arg = (String)it.next()) {
                case "-s": 
                case "--server": {
                    if (!it.hasNext()) {
                        this.usage();
                    }
                    it.remove();
                    this.urls = (String)it.next();
                    it.remove();
                    continue block30;
                }
                case "--tls": {
                    this.secure = true;
                    it.remove();
                    continue block30;
                }
                case "-np": {
                    if (!it.hasNext()) {
                        this.usage();
                    }
                    it.remove();
                    this.numPubs = Integer.parseInt((String)it.next());
                    it.remove();
                    continue block30;
                }
                case "-ns": {
                    if (!it.hasNext()) {
                        this.usage();
                    }
                    it.remove();
                    this.numSubs = Integer.parseInt((String)it.next());
                    it.remove();
                    continue block30;
                }
                case "-n": {
                    if (!it.hasNext()) {
                        this.usage();
                    }
                    it.remove();
                    this.numMsgs = Integer.parseInt((String)it.next());
                    it.remove();
                    continue block30;
                }
                case "-a": {
                    this.async = true;
                    it.remove();
                    continue block30;
                }
                case "-ms": {
                    if (!it.hasNext()) {
                        this.usage();
                    }
                    it.remove();
                    this.size = Integer.parseInt((String)it.next());
                    it.remove();
                    continue block30;
                }
                case "-io": {
                    this.ignoreOld = true;
                    it.remove();
                    continue block30;
                }
                case "-mpa": {
                    if (!it.hasNext()) {
                        this.usage();
                    }
                    it.remove();
                    this.maxPubAcksInFlight = Integer.parseInt((String)it.next());
                    it.remove();
                    continue block30;
                }
                case "-id": {
                    if (!it.hasNext()) {
                        this.usage();
                    }
                    it.remove();
                    this.clientId = (String)it.next();
                    it.remove();
                    continue block30;
                }
                case "-c": 
                case "-cid": {
                    if (!it.hasNext()) {
                        this.usage();
                    }
                    it.remove();
                    this.clusterId = (String)it.next();
                    it.remove();
                    continue block30;
                }
                case "-csv": {
                    if (!it.hasNext()) {
                        this.usage();
                    }
                    it.remove();
                    this.csvFileName = (String)it.next();
                    it.remove();
                    continue block30;
                }
            }
            System.err.printf("Unexpected token: '%s'\n", arg);
            this.usage();
        }
    }

    private static Properties loadProperties(String configPath) {
        try {
            FileInputStream is = new FileInputStream(configPath);
            Properties prop = new Properties();
            prop.load(is);
            return prop;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static void main(String[] args) {
        try {
            if (args.length == 1 && args[0].endsWith(".properties")) {
                Properties properties = StanBench.loadProperties(args[0]);
                new StanBench(properties).run();
            } else {
                new StanBench(args).run();
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
        System.exit(0);
    }

    class PubWorker
    extends Worker {
        private final boolean async;

        PubWorker(Phaser phaser, int numMsgs, int size, boolean async, String pubId) {
            super(phaser, numMsgs, size, pubId);
            this.async = async;
        }

        @Override
        public void run() {
            try {
                this.runPublisher();
                this.phaser.arrive();
            }
            catch (Exception e) {
                e.printStackTrace();
                this.phaser.arrive();
            }
        }

        public void runPublisher() throws Exception {
            try (Connection nc = Nats.connect((io.nats.client.Options)StanBench.this.natsOptions);){
                Options pubOpts = StanBench.this.maxPubAcksInFlight > 0 ? new Options.Builder().maxPubAcksInFlight(StanBench.this.maxPubAcksInFlight).natsConn(nc).build() : new Options.Builder().natsConn(nc).build();
                try (StreamingConnection sc = NatsStreaming.connect((String)StanBench.this.clusterId, (String)this.workerClientId, (Options)pubOpts);){
                    byte[] msg = null;
                    if (this.size > 0) {
                        msg = new byte[this.size];
                    }
                    long start = System.nanoTime();
                    if (this.async) {
                        final CountDownLatch latch = new CountDownLatch(1);
                        AckHandler acb = new AckHandler(){

                            public void onAck(String nuid, Exception ex) {
                                if (StanBench.this.published.incrementAndGet() >= PubWorker.this.numMsgs) {
                                    latch.countDown();
                                }
                            }
                        };
                        for (int i = 0; i < this.numMsgs; ++i) {
                            try {
                                sc.publish(StanBench.this.subject, msg, acb);
                                continue;
                            }
                            catch (Exception e) {
                                System.err.printf("streaming-bench: error during publish", e);
                            }
                        }
                        latch.await();
                    } else {
                        for (int i = 0; i < this.numMsgs; ++i) {
                            try {
                                sc.publish(StanBench.this.subject, msg);
                                StanBench.this.published.incrementAndGet();
                                continue;
                            }
                            catch (Exception e) {
                                System.err.printf("streaming-bench: error during publish", e);
                            }
                        }
                    }
                    StanBench.this.bench.addPubSample(new Sample(this.numMsgs, this.size, start, System.nanoTime(), nc.getStatistics()));
                    System.out.printf("Publisher connection stats: \n" + nc.getStatistics(), new Object[0]);
                }
            }
        }
    }

    class SubWorker
    extends Worker {
        private final boolean ignoreOld;

        SubWorker(Phaser phaser, int numMsgs, int size, boolean ignoreOld, String subId) {
            super(phaser, numMsgs, size, subId);
            this.ignoreOld = ignoreOld;
        }

        @Override
        public void run() {
            try {
                this.runSubscriber();
            }
            catch (Exception e) {
                e.printStackTrace();
                this.phaser.arrive();
            }
        }

        public void runSubscriber() throws Exception {
            try (final Connection nc = Nats.connect((io.nats.client.Options)StanBench.this.natsOptions);){
                Options opts = new Options.Builder().natsConn(nc).build();
                final StreamingConnection sc = NatsStreaming.connect((String)StanBench.this.clusterId, (String)StanBench.this.clientId, (Options)opts);
                SubscriptionOptions sopts = this.ignoreOld ? new SubscriptionOptions.Builder().deliverAllAvailable().build() : new SubscriptionOptions.Builder().build();
                final long start = System.nanoTime();
                Subscription sub = sc.subscribe(StanBench.this.subject, new MessageHandler(){

                    public void onMessage(Message msg) {
                        StanBench.this.received.incrementAndGet();
                        if (StanBench.this.received.get() >= SubWorker.this.numMsgs) {
                            StanBench.this.bench.addSubSample(new Sample(SubWorker.this.numMsgs, SubWorker.this.size, start, System.nanoTime(), nc.getStatistics()));
                            System.out.printf("Subscriber connection stats: " + nc.getStatistics(), new Object[0]);
                            SubWorker.this.phaser.arrive();
                            try {
                                sc.close();
                            }
                            catch (IOException | TimeoutException e) {
                                System.err.printf("streaming-bench: exception thrown during subscriber connection close", e);
                            }
                            catch (InterruptedException e) {
                                System.err.printf("Interrupted during subscriber connection close", e);
                                Thread.currentThread().interrupt();
                            }
                        }
                    }
                }, sopts);
                this.phaser.arrive();
                while (StanBench.this.received.get() < this.numMsgs) {
                    try {
                        Thread.sleep(100L);
                    }
                    catch (InterruptedException e) {
                        // empty catch block
                        break;
                    }
                }
            }
        }
    }

    class Worker
    implements Runnable {
        final Phaser phaser;
        final int numMsgs;
        final int size;
        final String workerClientId;

        Worker(Phaser phaser, int numMsgs, int size, String workerClientId) {
            Thread.currentThread().setName(workerClientId);
            this.phaser = phaser;
            this.numMsgs = numMsgs;
            this.size = size;
            this.workerClientId = workerClientId;
        }

        @Override
        public void run() {
        }
    }
}

