/*
 * Decompiled with CFR 0.152.
 */
package org.spf4j.perf.impl;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.DatagramChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spf4j.base.AbstractRunnable;
import org.spf4j.concurrent.DefaultExecutor;
import org.spf4j.perf.CloseableMeasurementRecorder;
import org.spf4j.perf.MeasurementsInfo;
import org.spf4j.perf.impl.MeasurementsInfoImpl;
import org.spf4j.perf.impl.RecorderFactory;
import org.spf4j.perf.impl.ms.graphite.GraphiteUdpStore;
import org.spf4j.recyclable.ObjectCreationException;

public final class GraphiteUdpStoreTest {
    private static final Logger LOG;
    private static volatile boolean terminated;
    private static volatile Future<?> server;
    private static final BlockingQueue<String> QUEUE;
    private static final File TSDB_TXT;

    @Test
    public void testGraphiteUdpStore() throws IOException, ObjectCreationException, InterruptedException {
        GraphiteUdpStore store = new GraphiteUdpStore("127.0.0.1", 1976);
        long id = store.alocateMeasurements((MeasurementsInfo)new MeasurementsInfoImpl((Object)"bla", "ms", new String[]{"val1", "val2", "val3"}, new String[]{"ms", "ms", "ms"}), 0);
        store.saveMeasurements(id, 1L, new long[]{2L, 3L, 5L});
        LOG.debug("measurements sent: {} {} {} {}", new Object[]{1L, 2L, 3L, 5L});
        String line = QUEUE.poll(5L, TimeUnit.SECONDS);
        LOG.debug("measurements received: {} ", (Object)line);
        Assert.assertEquals((Object)"bla/val1 2 1", (Object)line);
        line = QUEUE.poll(5L, TimeUnit.SECONDS);
        LOG.debug("measurements received: {} ", (Object)line);
        Assert.assertEquals((Object)"bla/val2 3 1", (Object)line);
        line = QUEUE.poll(5L, TimeUnit.SECONDS);
        LOG.debug("measurements received: {} ", (Object)line);
        Assert.assertEquals((Object)"bla/val3 5 1", (Object)line);
    }

    @Before
    public void beforeTest() {
        QUEUE.drainTo(new ArrayList());
    }

    @Test
    @SuppressFBWarnings(value={"MDM_THREAD_YIELD"})
    public void testStore() throws InterruptedException, IOException {
        CloseableMeasurementRecorder recorder = RecorderFactory.createScalableQuantizedRecorder2((Object)"test measurement", (String)"ms", (int)1000, (int)10, (int)0, (int)6, (int)10);
        for (int i = 0; i < 100; ++i) {
            recorder.record((long)i);
            Thread.sleep(100L);
        }
        recorder.close();
        RecorderFactory.MEASUREMENT_STORE.flush();
        List<String> lines = Files.readAllLines(TSDB_TXT.toPath(), StandardCharsets.UTF_8);
        LOG.debug("measurements = {}", lines);
        Assert.assertThat(lines, (Matcher)Matchers.hasItem((Matcher)Matchers.allOf((Matcher)Matchers.containsString((String)"Q6_7"), (Matcher)Matchers.containsString((String)"test measurement"))));
        String line = QUEUE.poll(5L, TimeUnit.SECONDS);
        Assert.assertThat((Object)line, (Matcher)Matchers.containsString((String)"test-measurement"));
    }

    @BeforeClass
    public static void runUdpServer() {
        server = DefaultExecutor.INSTANCE.submit((Runnable)new AbstractRunnable(true){

            public void doRun() throws IOException, InterruptedException {
                DatagramChannel channel = DatagramChannel.open();
                InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 1976);
                channel.socket().bind(inetSocketAddress);
                ByteBuffer bb = ByteBuffer.allocate(512);
                while (!terminated) {
                    bb.rewind();
                    try {
                        channel.receive(bb);
                    }
                    catch (ClosedByInterruptException ex) {
                        break;
                    }
                    byte[] rba = new byte[bb.position()];
                    bb.rewind();
                    bb.get(rba);
                    String receivedString = new String(rba, StandardCharsets.UTF_8);
                    Object[] lines = receivedString.split("\n");
                    LOG.debug("Received = {}", lines);
                    for (Object line : lines) {
                        QUEUE.put(line);
                    }
                }
            }
        });
    }

    @AfterClass
    public static void stopUdpServer() {
        terminated = true;
        server.cancel(true);
    }

    static {
        File tsdb;
        LOG = LoggerFactory.getLogger(GraphiteUdpStoreTest.class);
        terminated = false;
        QUEUE = new LinkedBlockingQueue<String>();
        try {
            tsdb = File.createTempFile("ttt", "tsdb");
            TSDB_TXT = File.createTempFile("ttt", "tsdbtxt");
        }
        catch (IOException ex) {
            throw new ExceptionInInitializerError(ex);
        }
        System.setProperty("spf4j.perf.ms.config", "TSDB@" + tsdb.getAbsolutePath() + ",TSDB_TXT@" + TSDB_TXT.getAbsolutePath() + ",GRAPHITE_UDP@127.0.0.1:1976");
    }
}

