/*
 * Decompiled with CFR 0.152.
 */
package org.spf4j.failsafe.concurrent;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spf4j.base.AbstractRunnable;
import org.spf4j.base.UncheckedExecutionException;
import org.spf4j.concurrent.DefaultExecutor;
import org.spf4j.concurrent.InterruptibleCompletableFuture;
import org.spf4j.failsafe.RetryPredicate;
import org.spf4j.failsafe.concurrent.ConditionalConsumer;
import org.spf4j.failsafe.concurrent.DelayedTask;
import org.spf4j.failsafe.concurrent.FailSafeExecutor;
import org.spf4j.failsafe.concurrent.RetryFutureTask;

public final class FailSafeExecutorImpl
implements FailSafeExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(FailSafeExecutorImpl.class);
    private static final Future<?> SHUTDOWN = new Future(){

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            throw new UnsupportedOperationException();
        }

        @Override
        public boolean isCancelled() {
            throw new UnsupportedOperationException();
        }

        @Override
        public boolean isDone() {
            throw new UnsupportedOperationException();
        }

        public Object get() {
            throw new UnsupportedOperationException();
        }

        public Object get(long timeout, TimeUnit unit) {
            throw new UnsupportedOperationException();
        }
    };
    private final ExecutorService executionService;
    private final DelayQueue<DelayedTask<RetryFutureTask<?>>> executionEvents = new DelayQueue();
    private volatile Future<?> retryManagerFuture;
    private final Object sync = new Object();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startRetryManager() {
        Future<?> rm = this.retryManagerFuture;
        if (rm == null) {
            Object object = this.sync;
            synchronized (object) {
                rm = this.retryManagerFuture;
                if (rm == null) {
                    this.retryManagerFuture = rm = DefaultExecutor.INSTANCE.submit(new RetryManager());
                    LOG.debug("Retry manager started {}", rm);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shutdownRetryManager() {
        Object object = this.sync;
        synchronized (object) {
            Future<?> rmf = this.retryManagerFuture;
            if (rmf != null && rmf != SHUTDOWN) {
                rmf.cancel(true);
                this.retryManagerFuture = SHUTDOWN;
            }
        }
    }

    public FailSafeExecutorImpl(ExecutorService exec) {
        this.executionService = exec;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws InterruptedException {
        Object object = this.sync;
        synchronized (object) {
            Future<?> rmf = this.retryManagerFuture;
            if (rmf != null && rmf != SHUTDOWN) {
                this.retryManagerFuture = SHUTDOWN;
                rmf.cancel(true);
                try {
                    rmf.get();
                }
                catch (ExecutionException ex) {
                    throw new UncheckedExecutionException(ex);
                }
                catch (CancellationException cancellationException) {
                    // empty catch block
                }
            }
        }
    }

    public void initiateClose() {
        this.shutdownRetryManager();
    }

    @Override
    public <A> Future<A> submit(Callable<? extends A> task, RetryPredicate<A, ? extends Callable<? extends A>> predicate) {
        RetryFutureTask<? extends A> result = new RetryFutureTask<A>(task, predicate, this.executionEvents, this::startRetryManager);
        this.executionService.execute(result);
        return result;
    }

    @Override
    public <A> CompletableFuture<A> submitRx(Callable<? extends A> task, RetryPredicate<A, ? extends Callable<? extends A>> predicate, Supplier<InterruptibleCompletableFuture<A>> cfSupplier) {
        InterruptibleCompletableFuture result = cfSupplier.get();
        ConsumableRetryFutureTask<? extends A> rft = new ConsumableRetryFutureTask<A>(f -> {
            Object r;
            try {
                r = f.get();
            }
            catch (ExecutionException ex) {
                return result.completeExceptionally(ex.getCause());
            }
            catch (Throwable ex) {
                return result.completeExceptionally(ex);
            }
            return result.complete(r);
        }, task, predicate, this.executionEvents, this::startRetryManager);
        result.setToCancel(rft);
        this.executionService.execute(rft);
        return result;
    }

    @Override
    public <A> Future<A> submit(Callable<? extends A> task, RetryPredicate<A, ? extends Callable<? extends A>> predicate, int nrHedges, long hedgeDelay, TimeUnit unit) {
        if (nrHedges <= 0) {
            return this.submit(task, predicate);
        }
        int nrFut = nrHedges + 1;
        Future[] futures = new Future[nrFut];
        ArrayBlockingQueue queue = new ArrayBlockingQueue(1);
        FirstFuture result = new FirstFuture(futures, queue);
        ConsumableRetryFutureTask future = new ConsumableRetryFutureTask(result, task, predicate, this.executionEvents, this::startRetryManager);
        this.startRetryManager();
        futures[0] = future;
        Runnable[] submits = new Runnable[nrFut];
        submits[0] = () -> this.executionService.execute(future);
        for (int i = 1; i < nrFut; ++i) {
            ConsumableRetryFutureTask f;
            futures[i] = f = new ConsumableRetryFutureTask(result, task, predicate, this.executionEvents, this::startRetryManager);
            if (hedgeDelay > 0L) {
                DelayedTask delayedExecution = new DelayedTask(f, unit.toNanos(hedgeDelay));
                f.setExec(delayedExecution);
                submits[i] = () -> this.executionEvents.add(delayedExecution);
                continue;
            }
            submits[i] = () -> this.executionService.execute(f);
        }
        for (Runnable submit : submits) {
            submit.run();
        }
        return result;
    }

    @Override
    public <A> CompletableFuture<A> submitRx(Callable<? extends A> task, RetryPredicate<A, ? extends Callable<? extends A>> predicate, int nrHedges, long hedgeDelay, TimeUnit unit, Supplier<InterruptibleCompletableFuture<A>> cfSupplier) {
        if (nrHedges <= 0) {
            return this.submitRx(task, predicate);
        }
        final InterruptibleCompletableFuture<A> result = cfSupplier.get();
        int nrFut = nrHedges + 1;
        Future[] futures = new Future[nrFut];
        ArrayBlockingQueue queue = new ArrayBlockingQueue(1);
        FirstFuture resultX = new FirstFuture<A>(futures, queue){

            @Override
            @SuppressFBWarnings(value={"NOS_NON_OWNED_SYNCHRONIZATION", "EXS_EXCEPTION_SOFTENING_NO_CHECKED"})
            public boolean accept(Future<A> finished) {
                boolean accepted = super.accept(finished);
                if (accepted) {
                    Object r;
                    try {
                        r = finished.get();
                    }
                    catch (ExecutionException ex) {
                        if (!result.completeExceptionally(ex.getCause())) {
                            throw new IllegalStateException(ex);
                        }
                        return true;
                    }
                    catch (Throwable ex) {
                        if (!result.completeExceptionally(ex)) {
                            throw new IllegalStateException(ex);
                        }
                        return true;
                    }
                    if (!result.complete(r)) {
                        throw new IllegalStateException();
                    }
                    return true;
                }
                return false;
            }
        };
        result.setToCancel(resultX);
        ConsumableRetryFutureTask future = new ConsumableRetryFutureTask(resultX, task, predicate, this.executionEvents, this::startRetryManager);
        this.startRetryManager();
        futures[0] = future;
        Runnable[] submits = new Runnable[nrFut];
        submits[0] = () -> this.executionService.execute(future);
        for (int i = 1; i < nrFut; ++i) {
            ConsumableRetryFutureTask f;
            futures[i] = f = new ConsumableRetryFutureTask(resultX, task, predicate, this.executionEvents, this::startRetryManager);
            if (hedgeDelay > 0L) {
                DelayedTask delayedExecution = new DelayedTask(f, unit.toNanos(hedgeDelay));
                f.setExec(delayedExecution);
                submits[i] = () -> this.executionEvents.add(delayedExecution);
                continue;
            }
            submits[i] = () -> this.executionService.execute(f);
        }
        for (Runnable submit : submits) {
            submit.run();
        }
        return result;
    }

    @Override
    public <A> void execute(Callable<? extends A> task, RetryPredicate<A, ? extends Callable<? extends A>> predicate) {
        RetryFutureTask<? extends A> result = new RetryFutureTask<A>(task, predicate, this.executionEvents, this::startRetryManager);
        this.executionService.execute(result);
    }

    public String toString() {
        return "RetryExecutor{executionService=" + this.executionService + ", executionEvents=" + this.executionEvents + ", retryManagerFuture=" + this.retryManagerFuture + ", sync=" + this.sync + '}';
    }

    private static class ConsumableRetryFutureTask<T>
    extends RetryFutureTask<T> {
        private final ConditionalConsumer<Future<T>> consumer;

        ConsumableRetryFutureTask(ConditionalConsumer<Future<T>> consumer, Callable<T> callable, RetryPredicate<T, Callable<? extends T>> retryPredicate, DelayQueue<DelayedTask<RetryFutureTask<?>>> delayedTasks, Runnable onRetry) {
            super(callable, retryPredicate, delayedTasks, onRetry);
            this.consumer = consumer;
        }

        @Override
        public void done() {
            this.consumer.accept(this);
        }
    }

    @SuppressFBWarnings(value={"NOS_NON_OWNED_SYNCHRONIZATION"})
    private static class FirstFuture<T>
    implements Future<T>,
    ConditionalConsumer<Future<T>> {
        private final Future<T>[] futures;
        private final BlockingQueue<Future<T>> queue;
        private boolean first = true;

        FirstFuture(Future<T>[] futures, BlockingQueue<Future<T>> queue) {
            this.futures = futures;
            this.queue = queue;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public boolean accept(Future<T> finished) {
            FirstFuture firstFuture = this;
            synchronized (firstFuture) {
                if (this.first) {
                    this.first = false;
                    this.queue.add(finished);
                    for (int i = 0; i < this.futures.length; ++i) {
                        Future<T> f = this.futures[i];
                        if (f != null && f != finished) {
                            f.cancel(true);
                        }
                        this.futures[i] = null;
                    }
                    return true;
                }
                return false;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            FirstFuture firstFuture = this;
            synchronized (firstFuture) {
                boolean result = true;
                for (Future<T> f : this.futures) {
                    if (f == null || f.cancel(mayInterruptIfRunning)) continue;
                    result = false;
                }
                return result;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public boolean isCancelled() {
            FirstFuture firstFuture = this;
            synchronized (firstFuture) {
                boolean result = true;
                for (Future<T> f : this.futures) {
                    if (f == null || f.isCancelled()) continue;
                    result = false;
                }
                return result;
            }
        }

        @Override
        public boolean isDone() {
            boolean result = true;
            for (Future<T> f : this.futures) {
                if (f == null || f.isDone()) continue;
                result = false;
            }
            return result;
        }

        @Override
        public T get() throws InterruptedException, ExecutionException {
            return this.queue.take().get();
        }

        @Override
        public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            Future<T> poll = this.queue.poll(timeout, unit);
            if (poll == null) {
                throw new TimeoutException("Timed out after " + timeout + " " + (Object)((Object)unit));
            }
            return poll.get();
        }
    }

    private class RetryManager
    extends AbstractRunnable {
        RetryManager() {
            super("RetryManager");
        }

        @Override
        public void doRun() {
            while (FailSafeExecutorImpl.this.retryManagerFuture != SHUTDOWN) {
                try {
                    DelayedTask event = (DelayedTask)FailSafeExecutorImpl.this.executionEvents.poll(1L, TimeUnit.MINUTES);
                    if (event == null) continue;
                    RetryFutureTask runnable = (RetryFutureTask)event.getRunnable();
                    FailSafeExecutorImpl.this.executionService.execute(runnable);
                }
                catch (InterruptedException ex) {
                    LOG.debug("Interrupted Retry manager, shuting down, events scheduled: {}", (Object)FailSafeExecutorImpl.this.executionEvents, (Object)ex);
                    break;
                }
            }
        }
    }
}

