/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.staging;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.neo4j.test.OtherThreadExecutor;
import org.neo4j.test.rule.concurrent.OtherThreadRule;
import org.neo4j.unsafe.impl.batchimport.Configuration;
import org.neo4j.unsafe.impl.batchimport.staging.BatchSender;
import org.neo4j.unsafe.impl.batchimport.staging.ProcessorStep;
import org.neo4j.unsafe.impl.batchimport.staging.ProducerStep;
import org.neo4j.unsafe.impl.batchimport.staging.Stage;
import org.neo4j.unsafe.impl.batchimport.staging.StageControl;
import org.neo4j.unsafe.impl.batchimport.staging.StageExecution;
import org.neo4j.unsafe.impl.batchimport.staging.Step;
import org.neo4j.unsafe.impl.batchimport.stats.Key;
import org.neo4j.unsafe.impl.batchimport.stats.Keys;
import org.neo4j.unsafe.impl.batchimport.stats.StatsProvider;

public class ProcessorStepTest {
    @Rule
    public final OtherThreadRule<Void> t2 = new OtherThreadRule();

    @Test
    public void shouldUpholdProcessOrderingGuarantee() throws Exception {
        StageControl control = (StageControl)Mockito.mock(StageControl.class);
        MyProcessorStep step = new MyProcessorStep(control, 0);
        step.start(1);
        step.processors(4);
        int batches = 10;
        for (int i = 0; i < batches; ++i) {
            step.receive(i, i);
        }
        step.endOfUpstream();
        step.awaitCompleted();
        Assert.assertEquals((long)batches, (long)step.nextExpected.get());
        step.close();
    }

    @Test
    public void shouldHaveTaskQueueSizeEqualToMaxNumberOfProcessors() throws Exception {
        StageControl control = (StageControl)Mockito.mock(StageControl.class);
        CountDownLatch latch = new CountDownLatch(1);
        int processors = 2;
        final int maxProcessors = 5;
        Configuration configuration = new Configuration(){

            public int maxNumberOfProcessors() {
                return maxProcessors;
            }
        };
        BlockingProcessorStep<Void> step = new BlockingProcessorStep<Void>(control, configuration, 2, latch);
        step.start(1);
        step.processors(1);
        for (int i = 0; i < 2 + maxProcessors; ++i) {
            step.receive(i, null);
        }
        Future receiveFuture = this.t2.execute(this.receive(2, step));
        this.t2.get().waitUntilThreadState(new Thread.State[]{Thread.State.TIMED_WAITING});
        latch.countDown();
        receiveFuture.get();
    }

    @Test
    public void shouldRecycleDoneBatches() throws Exception {
        StageControl control = (StageControl)Mockito.mock(StageControl.class);
        MyProcessorStep step = new MyProcessorStep(control, 0);
        step.start(1);
        int batches = 10;
        for (int i = 0; i < batches; ++i) {
            step.receive(i, i);
        }
        step.endOfUpstream();
        step.awaitCompleted();
        ((StageControl)Mockito.verify((Object)control, (VerificationMode)Mockito.times((int)batches))).recycle(Matchers.any());
        step.close();
    }

    @Test
    public void shouldBeAbleToPropagatePanicOnBlockedProcessorsWhenLast() throws InterruptedException {
        this.shouldBeAbleToPropagatePanicOnBlockedProcessors(2, 1);
    }

    @Test
    public void shouldBeAbleToPropagatePanicOnBlockedProcessorsWhenNotLast() throws InterruptedException {
        this.shouldBeAbleToPropagatePanicOnBlockedProcessors(3, 1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shouldBeAbleToPropagatePanicOnBlockedProcessors(int numProcessors, int failingProcessorIndex) throws InterruptedException {
        final String exceptionMessage = "Failing just for fun";
        Configuration configuration = Configuration.DEFAULT;
        CountDownLatch latch = new CountDownLatch(1);
        Stage stage = new Stage("Test", "Part", configuration, 1);
        stage.add((Step)ProcessorStepTest.intProducer(configuration, stage, configuration.maxNumberOfProcessors() * 2));
        BlockingProcessorStep<Integer> failingProcessor = null;
        for (int i = 0; i < numProcessors; ++i) {
            if (failingProcessorIndex == i) {
                failingProcessor = new BlockingProcessorStep<Integer>(stage.control(), configuration, 1, latch){

                    @Override
                    protected void process(Integer batch, BatchSender sender) throws Throwable {
                        super.process(batch, sender);
                        throw new RuntimeException(exceptionMessage);
                    }
                };
                stage.add((Step)failingProcessor);
                continue;
            }
            stage.add(ProcessorStepTest.intProcessor(configuration, stage));
        }
        try {
            StageExecution execution = stage.execute();
            while (failingProcessor.stats().stat((Key)Keys.received_batches).asLong() < (long)(configuration.maxNumberOfProcessors() + 1)) {
                Thread.sleep(10L);
            }
            latch.countDown();
            execution.awaitCompletion();
            try {
                execution.assertHealthy();
                Assert.fail((String)"Should have failed");
            }
            catch (RuntimeException e) {
                Assert.assertEquals((Object)exceptionMessage, (Object)e.getMessage());
            }
        }
        finally {
            stage.close();
        }
    }

    private static ProducerStep intProducer(Configuration configuration, Stage stage, final int batches) {
        return new ProducerStep(stage.control(), configuration){

            protected void process() {
                for (int i = 0; i < batches; ++i) {
                    this.sendDownstream(i);
                }
            }

            protected long position() {
                return 0L;
            }
        };
    }

    private static ProcessorStep<Integer> intProcessor(Configuration configuration, Stage stage) {
        return new ProcessorStep<Integer>(stage.control(), "processor", configuration, 1, new StatsProvider[0]){

            protected void process(Integer batch, BatchSender sender) {
                sender.send((Object)batch);
            }
        };
    }

    private OtherThreadExecutor.WorkerCommand<Void, Void> receive(int processors, ProcessorStep<Void> step) {
        return state -> {
            step.receive((long)processors, null);
            return null;
        };
    }

    private static class MyProcessorStep
    extends ProcessorStep<Integer> {
        private final AtomicInteger nextExpected = new AtomicInteger();

        private MyProcessorStep(StageControl control, int maxProcessors) {
            super(control, "test", Configuration.DEFAULT, maxProcessors, new StatsProvider[0]);
        }

        protected void process(Integer batch, BatchSender sender) {
            this.nextExpected.incrementAndGet();
        }
    }

    private static class BlockingProcessorStep<T>
    extends ProcessorStep<T> {
        private final CountDownLatch latch;

        BlockingProcessorStep(StageControl control, Configuration configuration, int maxProcessors, CountDownLatch latch) {
            super(control, "test", configuration, maxProcessors, new StatsProvider[0]);
            this.latch = latch;
        }

        protected void process(T batch, BatchSender sender) throws Throwable {
            this.latch.await();
        }
    }
}

