/*
 * Decompiled with CFR 0.152.
 */
package ru.fix.stdlib.concurrency.threads;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.fix.aggregating.profiler.Profiler;
import ru.fix.dynamic.property.api.DynamicProperty;
import ru.fix.dynamic.property.api.DynamicPropertyListener;
import ru.fix.stdlib.concurrency.threads.ProfiledScheduledThreadPoolExecutor;
import ru.fix.stdlib.concurrency.threads.Schedule;

public class ReschedulableScheduler
implements AutoCloseable {
    private static final long DEFAULT_START_DELAY = 0L;
    private final ScheduledExecutorService executorService;
    private final Set<SelfSchedulableTaskWrapper> activeTasks;
    private final Profiler profiler;
    private final AtomicBoolean isShutdown = new AtomicBoolean(false);
    private final Logger log;
    private final String scheduledTasksIndicatorName;

    public ReschedulableScheduler(String poolName, DynamicProperty<Integer> maxPoolSize, Profiler profiler) {
        this.log = LoggerFactory.getLogger((String)(ReschedulableScheduler.class.getName() + "." + poolName));
        this.executorService = new ProfiledScheduledThreadPoolExecutor(poolName, maxPoolSize, profiler);
        this.activeTasks = ConcurrentHashMap.newKeySet();
        this.profiler = profiler;
        this.scheduledTasksIndicatorName = "scheduled.pool." + poolName + ".tasks.count";
        profiler.attachIndicator(this.scheduledTasksIndicatorName, () -> this.activeTasks.size());
    }

    private void detachIndicators() {
        this.profiler.detachIndicator(this.scheduledTasksIndicatorName);
    }

    public ScheduledFuture<?> schedule(DynamicProperty<Schedule> scheduleSupplier, long startDelay, Runnable task) {
        if (this.isShutdown.get()) {
            throw new IllegalStateException("ReschedulableScheduler is shutdown and can not schedule new task. Task: " + task);
        }
        SelfSchedulableTaskWrapper taskWrapper = new SelfSchedulableTaskWrapper(scheduleSupplier, startDelay, task, this.executorService, this.activeTasks::remove, this.log);
        this.activeTasks.add(taskWrapper);
        return taskWrapper.launch();
    }

    public ScheduledFuture<?> schedule(DynamicProperty<Schedule> schedule, Runnable task) {
        return this.schedule(schedule, 0L, task);
    }

    public void shutdown() {
        this.cancelAllTasks(false);
        this.executorService.shutdown();
        this.detachIndicators();
        this.isShutdown.set(true);
    }

    public void shutdownNow() {
        this.cancelAllTasks(true);
        this.executorService.shutdownNow();
        this.detachIndicators();
        this.isShutdown.set(true);
    }

    private void cancelAllTasks(boolean mayInterruptIfRunning) {
        for (SelfSchedulableTaskWrapper task : this.activeTasks.toArray(new SelfSchedulableTaskWrapper[0])) {
            task.cancel(mayInterruptIfRunning);
        }
    }

    public boolean awaitTermination(long timeout, TimeUnit timeUnit) throws InterruptedException {
        return this.executorService.awaitTermination(timeout, timeUnit);
    }

    @Override
    public void close() {
        this.shutdown();
    }

    private static class ScheduleSettings {
        final Schedule.Type type;
        final long periodValue;

        public ScheduleSettings(Schedule.Type type, long periodValue) {
            this.type = type;
            this.periodValue = periodValue;
        }

        long safeDelay() {
            long safeDelay = 15L * this.periodValue / 100L;
            if (safeDelay < 1000L) {
                safeDelay = 1000L;
            } else if (safeDelay > 30000L) {
                safeDelay = 30000L;
            }
            return safeDelay;
        }

        public String toString() {
            return "ScheduleSettings{type=" + this.type + ", periodValue=" + this.periodValue + "}";
        }
    }

    private static class ReschedulableSchedullerFuture
    implements ScheduledFuture<Object> {
        final SelfSchedulableTaskWrapper taskWrapper;

        public ReschedulableSchedullerFuture(SelfSchedulableTaskWrapper taskWrapper) {
            this.taskWrapper = taskWrapper;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return this.taskWrapper.getSchedullerFuture().getDelay(unit);
        }

        @Override
        public int compareTo(Delayed o) {
            return this.taskWrapper.getSchedullerFuture().compareTo(o);
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            this.taskWrapper.cancel(mayInterruptIfRunning);
            return true;
        }

        @Override
        public boolean isCancelled() {
            return this.taskWrapper.getSchedullerFuture().isCancelled();
        }

        @Override
        public boolean isDone() {
            return this.taskWrapper.getSchedullerFuture().isDone();
        }

        @Override
        public Object get() throws InterruptedException, ExecutionException {
            return this.taskWrapper.getSchedullerFuture().get();
        }

        @Override
        public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            return this.taskWrapper.getSchedullerFuture().get(timeout, unit);
        }
    }

    private static class SelfSchedulableTaskWrapper
    implements Runnable {
        private final Logger log;
        private Schedule previousSchedule;
        private DynamicProperty<Schedule> schedule;
        private DynamicPropertyListener<Schedule> scheduleListener;
        private final long startDelay;
        private ScheduledFuture<?> scheduledFuture;
        private final Runnable task;
        private final ReschedulableSchedullerFuture reschedulableFuture = new ReschedulableSchedullerFuture(this);
        private Consumer<SelfSchedulableTaskWrapper> cancelHandler;
        private final ScheduledExecutorService executorService;
        private volatile ScheduleSettings settings;
        private volatile long lastExecutedTs = 0L;
        private AtomicBoolean taskIsRunning = new AtomicBoolean(false);

        public SelfSchedulableTaskWrapper(DynamicProperty<Schedule> schedule, long startDelay, Runnable task, ScheduledExecutorService executorService, Consumer<SelfSchedulableTaskWrapper> cancelHandler, Logger log) {
            this.schedule = schedule;
            this.startDelay = startDelay;
            this.task = task;
            this.executorService = executorService;
            this.cancelHandler = cancelHandler;
            this.log = log;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            ScheduledFuture<?> scheduledFuture;
            SelfSchedulableTaskWrapper selfSchedulableTaskWrapper = this;
            synchronized (selfSchedulableTaskWrapper) {
                scheduledFuture = this.scheduledFuture;
            }
            if (!this.taskIsRunning.compareAndSet(false, true)) {
                this.log.trace("Preventing concurrent task launch; scheduledFuture={} with hash={}", scheduledFuture, (Object)System.identityHashCode(scheduledFuture));
                return;
            }
            try {
                ScheduleSettings currSettings = this.settings;
                if (currSettings.type == Schedule.Type.RATE) {
                    long now = System.currentTimeMillis();
                    if (now < this.lastExecutedTs + currSettings.periodValue - currSettings.safeDelay()) {
                        this.log.trace("skip wrong invocation; now={}, lastExecutedTs={}, currSettings={}; scheduledFuture={} with hash={}", new Object[]{now, this.lastExecutedTs, currSettings, scheduledFuture, System.identityHashCode(scheduledFuture)});
                        return;
                    }
                    this.lastExecutedTs = now;
                }
                this.log.trace("running task; scheduledFuture={} with hash={}", scheduledFuture, (Object)System.identityHashCode(scheduledFuture));
                this.task.run();
            }
            catch (Throwable exc) {
                this.log.error("ReschedulableScheduler task failed due to: " + exc.getMessage(), exc);
            }
            finally {
                this.log.trace("Set taskIsRunning flag to false; scheduledFuture={} with hash={}", scheduledFuture, (Object)System.identityHashCode(scheduledFuture));
                this.taskIsRunning.compareAndSet(true, false);
                this.checkPreviousScheduleAndRestartTask((Schedule)this.schedule.get());
            }
        }

        private synchronized void checkPreviousScheduleAndRestartTask(Schedule schedule) {
            if (this.reschedulableFuture.isCancelled()) {
                return;
            }
            if (!this.previousSchedule.equals(schedule)) {
                this.previousSchedule = schedule;
                this.log.trace("checkPreviousScheduleAndRestartTask cancelling  scheduledFuture {} with hash={}", this.scheduledFuture, (Object)System.identityHashCode(this.scheduledFuture));
                this.scheduledFuture.cancel(false);
                this.scheduledFuture = this.schedule(this, schedule, this.startDelay);
                this.log.trace("checkPreviousScheduleAndRestartTask new scheduledFuture {} with hash={} is scheduled", this.scheduledFuture, (Object)System.identityHashCode(this.scheduledFuture));
            }
        }

        public synchronized ScheduledFuture<?> launch() {
            this.scheduleListener = (oldVal, newVal) -> this.checkPreviousScheduleAndRestartTask((Schedule)newVal);
            this.schedule.addListener(this.scheduleListener);
            Schedule schedule = (Schedule)this.schedule.get();
            this.scheduledFuture = this.schedule(this, schedule, this.startDelay);
            this.previousSchedule = schedule;
            this.log.trace("scheduledFuture={} with hash={} is launched", this.scheduledFuture, (Object)System.identityHashCode(this.scheduledFuture));
            return this.reschedulableFuture;
        }

        private synchronized ScheduledFuture<?> schedule(SelfSchedulableTaskWrapper taskWrapper, Schedule schedule, long startDelay) {
            long periodValue = schedule.getValue();
            Schedule.Type type = schedule.getType();
            this.settings = new ScheduleSettings(type, periodValue);
            switch (type) {
                case RATE: {
                    return this.executorService.scheduleAtFixedRate(taskWrapper, startDelay, periodValue, TimeUnit.MILLISECONDS);
                }
                case DELAY: {
                    return this.executorService.scheduleWithFixedDelay(taskWrapper, startDelay, periodValue, TimeUnit.MILLISECONDS);
                }
            }
            throw new IllegalArgumentException("Invalid schedule type: " + type);
        }

        synchronized ScheduledFuture<?> getSchedullerFuture() {
            return this.scheduledFuture;
        }

        public synchronized void cancel(boolean mayInterruptIfRunning) {
            this.log.trace("cancelling scheduledFuture {} with hash={}", this.scheduledFuture, (Object)System.identityHashCode(this.scheduledFuture));
            this.schedule.removeListener(this.scheduleListener);
            this.scheduledFuture.cancel(mayInterruptIfRunning);
            this.cancelHandler.accept(this);
        }
    }
}

