/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public abstract class CheckpointIDCounterTestBase
extends TestLogger {
    protected abstract CheckpointIDCounter createCheckpointIdCounter() throws Exception;

    @Test
    public void testCounterIsNeverNegative() throws Exception {
        CheckpointIDCounter counter = this.createCheckpointIdCounter();
        try {
            counter.start();
            Assert.assertThat((Object)counter.get(), (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(0L)));
        }
        finally {
            counter.shutdown(JobStatus.FINISHED);
        }
    }

    @Test
    public void testSerialIncrementAndGet() throws Exception {
        CheckpointIDCounter counter = this.createCheckpointIdCounter();
        try {
            counter.start();
            Assert.assertEquals((long)1L, (long)counter.getAndIncrement());
            Assert.assertEquals((long)2L, (long)counter.get());
            Assert.assertEquals((long)2L, (long)counter.getAndIncrement());
            Assert.assertEquals((long)3L, (long)counter.get());
            Assert.assertEquals((long)3L, (long)counter.getAndIncrement());
            Assert.assertEquals((long)4L, (long)counter.get());
            Assert.assertEquals((long)4L, (long)counter.getAndIncrement());
        }
        finally {
            counter.shutdown(JobStatus.FINISHED);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentGetAndIncrement() throws Exception {
        int numThreads = 8;
        CountDownLatch startLatch = new CountDownLatch(1);
        CheckpointIDCounter counter = this.createCheckpointIdCounter();
        counter.start();
        ExecutorService executor = null;
        try {
            executor = Executors.newFixedThreadPool(8);
            ArrayList<Future<List<Long>>> resultFutures = new ArrayList<Future<List<Long>>>(8);
            for (int i = 0; i < 8; ++i) {
                resultFutures.add(executor.submit(new Incrementer(startLatch, counter)));
            }
            startLatch.countDown();
            int expectedTotal = 1024;
            ArrayList all = new ArrayList(1024);
            for (Future future : resultFutures) {
                List counts = (List)future.get();
                all.addAll(counts);
            }
            Collections.sort(all);
            Assert.assertEquals((long)1024L, (long)all.size());
            long current = 0L;
            Iterator iterator = all.iterator();
            while (iterator.hasNext()) {
                long val = (Long)iterator.next();
                Assert.assertEquals((long)(++current), (long)val);
            }
            Assert.assertEquals((long)1025L, (long)counter.get());
            Assert.assertEquals((long)1025L, (long)counter.getAndIncrement());
        }
        finally {
            if (executor != null) {
                executor.shutdown();
            }
            counter.shutdown(JobStatus.FINISHED);
        }
    }

    @Test
    public void testSetCount() throws Exception {
        CheckpointIDCounter counter = this.createCheckpointIdCounter();
        counter.start();
        counter.setCount(1337L);
        Assert.assertEquals((long)1337L, (long)counter.get());
        Assert.assertEquals((long)1337L, (long)counter.getAndIncrement());
        Assert.assertEquals((long)1338L, (long)counter.get());
        Assert.assertEquals((long)1338L, (long)counter.getAndIncrement());
        counter.shutdown(JobStatus.FINISHED);
    }

    private static class Incrementer
    implements Callable<List<Long>> {
        private static final int NumIncrements = 128;
        private final CountDownLatch startLatch;
        private final CheckpointIDCounter counter;

        public Incrementer(CountDownLatch startLatch, CheckpointIDCounter counter) {
            this.startLatch = startLatch;
            this.counter = counter;
        }

        @Override
        public List<Long> call() throws Exception {
            Random rand = new Random();
            ArrayList<Long> counts = new ArrayList<Long>();
            this.startLatch.await();
            for (int i = 0; i < 128; ++i) {
                counts.add(this.counter.getAndIncrement());
                Thread.sleep(rand.nextInt(20));
            }
            return counts;
        }
    }
}

