/*
 * Decompiled with CFR 0.152.
 */
package net.greghaines.jesque.worker;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import net.greghaines.jesque.Config;
import net.greghaines.jesque.Job;
import net.greghaines.jesque.JobFailure;
import net.greghaines.jesque.WorkerStatus;
import net.greghaines.jesque.json.ObjectMapperFactory;
import net.greghaines.jesque.utils.JedisUtils;
import net.greghaines.jesque.utils.JesqueUtils;
import net.greghaines.jesque.utils.ScriptUtils;
import net.greghaines.jesque.utils.VersionUtils;
import net.greghaines.jesque.worker.DefaultExceptionHandler;
import net.greghaines.jesque.worker.DefaultFailQueueStrategy;
import net.greghaines.jesque.worker.ExceptionHandler;
import net.greghaines.jesque.worker.FailQueueStrategy;
import net.greghaines.jesque.worker.JobExecutor;
import net.greghaines.jesque.worker.JobFactory;
import net.greghaines.jesque.worker.NextQueueStrategy;
import net.greghaines.jesque.worker.RecoveryStrategy;
import net.greghaines.jesque.worker.Worker;
import net.greghaines.jesque.worker.WorkerAware;
import net.greghaines.jesque.worker.WorkerEvent;
import net.greghaines.jesque.worker.WorkerEventEmitter;
import net.greghaines.jesque.worker.WorkerListenerDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.exceptions.JedisException;

public class WorkerImpl
implements Worker {
    private static final Logger LOG = LoggerFactory.getLogger(WorkerImpl.class);
    private static final AtomicLong WORKER_COUNTER = new AtomicLong(0L);
    protected static final long EMPTY_QUEUE_SLEEP_TIME = 500L;
    protected static final long RECONNECT_SLEEP_TIME = 5000L;
    protected static final int RECONNECT_ATTEMPTS = 120;
    private static final String LPOPLPUSH_LUA = "/workerScripts/jesque_lpoplpush.lua";
    private static final String POP_LUA = "/workerScripts/jesque_pop.lua";
    private static final String POP_FROM_MULTIPLE_PRIO_QUEUES = "/workerScripts/fromMultiplePriorityQueues.lua";
    private static volatile boolean threadNameChangingEnabled = false;
    private final NextQueueStrategy nextQueueStrategy;
    protected final Config config;
    protected final Jedis jedis;
    protected final String namespace;
    protected final BlockingDeque<String> queueNames = new LinkedBlockingDeque<String>();
    private final String name;
    protected final WorkerListenerDelegate listenerDelegate = new WorkerListenerDelegate();
    protected final AtomicReference<JobExecutor.State> state = new AtomicReference<JobExecutor.State>(JobExecutor.State.NEW);
    private final AtomicBoolean paused = new AtomicBoolean(false);
    private final AtomicBoolean processingJob = new AtomicBoolean(false);
    private final AtomicReference<String> popScriptHash = new AtomicReference<Object>(null);
    private final AtomicReference<String> lpoplpushScriptHash = new AtomicReference<Object>(null);
    private final AtomicReference<String> multiPriorityQueuesScriptHash = new AtomicReference<Object>(null);
    private final long workerId = WORKER_COUNTER.getAndIncrement();
    private final String threadNameBase = "Worker-" + this.workerId + " Jesque-" + VersionUtils.getVersion() + ": ";
    private final AtomicReference<Thread> threadRef = new AtomicReference<Object>(null);
    private final AtomicReference<ExceptionHandler> exceptionHandlerRef = new AtomicReference<DefaultExceptionHandler>(new DefaultExceptionHandler());
    private final AtomicReference<FailQueueStrategy> failQueueStrategyRef;
    private final JobFactory jobFactory;

    public static boolean isThreadNameChangingEnabled() {
        return threadNameChangingEnabled;
    }

    public static void setThreadNameChangingEnabled(boolean enabled) {
        threadNameChangingEnabled = enabled;
    }

    protected static void checkQueues(Iterable<String> queues) {
        if (queues == null) {
            throw new IllegalArgumentException("queues must not be null");
        }
        for (String queue : queues) {
            if (queue != null && !"".equals(queue)) continue;
            throw new IllegalArgumentException("queues' members must not be null: " + queues);
        }
    }

    public WorkerImpl(Config config, Collection<String> queues, JobFactory jobFactory) {
        this(config, queues, jobFactory, new Jedis(config.getHost(), config.getPort(), config.getTimeout()));
    }

    public WorkerImpl(Config config, Collection<String> queues, JobFactory jobFactory, Jedis jedis) {
        this(config, queues, jobFactory, jedis, NextQueueStrategy.DRAIN_WHILE_MESSAGES_EXISTS);
    }

    public WorkerImpl(Config config, Collection<String> queues, JobFactory jobFactory, Jedis jedis, NextQueueStrategy nextQueueStrategy) {
        if (config == null) {
            throw new IllegalArgumentException("config must not be null");
        }
        if (jobFactory == null) {
            throw new IllegalArgumentException("jobFactory must not be null");
        }
        if (jedis == null) {
            throw new IllegalArgumentException("jedis must not be null");
        }
        if (nextQueueStrategy == null) {
            throw new IllegalArgumentException("nextQueueStrategy must not be null");
        }
        WorkerImpl.checkQueues(queues);
        this.nextQueueStrategy = nextQueueStrategy;
        this.config = config;
        this.jobFactory = jobFactory;
        this.namespace = config.getNamespace();
        this.jedis = jedis;
        this.failQueueStrategyRef = new AtomicReference<DefaultFailQueueStrategy>(new DefaultFailQueueStrategy(this.namespace));
        this.authenticateAndSelectDB();
        this.setQueues(queues);
        this.name = this.createName();
    }

    public long getWorkerId() {
        return this.workerId;
    }

    @Override
    public void run() {
        block5: {
            block6: {
                if (!this.state.compareAndSet(JobExecutor.State.NEW, JobExecutor.State.RUNNING)) break block6;
                try {
                    this.renameThread("RUNNING");
                    this.threadRef.set(Thread.currentThread());
                    this.jedis.sadd(this.key("workers"), new String[]{this.name});
                    this.jedis.set(this.key("worker", this.name, "started"), new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ").format(new Date()));
                    this.listenerDelegate.fireEvent(WorkerEvent.WORKER_START, this, null, null, null, null, null);
                    this.loadRedisScripts();
                    this.poll();
                    this.renameThread("STOPPING");
                    this.listenerDelegate.fireEvent(WorkerEvent.WORKER_STOP, this, null, null, null, null, null);
                }
                catch (Exception ex) {
                    try {
                        LOG.error("Uncaught exception in worker run-loop!", (Throwable)ex);
                        this.listenerDelegate.fireEvent(WorkerEvent.WORKER_ERROR, this, null, null, null, null, ex);
                        this.renameThread("STOPPING");
                        this.listenerDelegate.fireEvent(WorkerEvent.WORKER_STOP, this, null, null, null, null, null);
                    }
                    catch (Throwable throwable) {
                        this.renameThread("STOPPING");
                        this.listenerDelegate.fireEvent(WorkerEvent.WORKER_STOP, this, null, null, null, null, null);
                        this.jedis.srem(this.key("workers"), new String[]{this.name});
                        this.jedis.del(new String[]{this.key("worker", this.name), this.key("worker", this.name, "started"), this.key("stat", "failed", this.name), this.key("stat", "processed", this.name)});
                        this.jedis.quit();
                        this.threadRef.set(null);
                        throw throwable;
                    }
                    this.jedis.srem(this.key("workers"), new String[]{this.name});
                    this.jedis.del(new String[]{this.key("worker", this.name), this.key("worker", this.name, "started"), this.key("stat", "failed", this.name), this.key("stat", "processed", this.name)});
                    this.jedis.quit();
                    this.threadRef.set(null);
                    break block5;
                }
                this.jedis.srem(this.key("workers"), new String[]{this.name});
                this.jedis.del(new String[]{this.key("worker", this.name), this.key("worker", this.name, "started"), this.key("stat", "failed", this.name), this.key("stat", "processed", this.name)});
                this.jedis.quit();
                this.threadRef.set(null);
                break block5;
            }
            if (JobExecutor.State.RUNNING.equals((Object)this.state.get())) {
                throw new IllegalStateException("This WorkerImpl is already running");
            }
            throw new IllegalStateException("This WorkerImpl is shutdown");
        }
    }

    @Override
    public void end(boolean now) {
        if (now) {
            this.state.set(JobExecutor.State.SHUTDOWN_IMMEDIATE);
            Thread workerThread = this.threadRef.get();
            if (workerThread != null) {
                workerThread.interrupt();
            }
        } else {
            this.state.set(JobExecutor.State.SHUTDOWN);
        }
        this.togglePause(false);
    }

    @Override
    public boolean isShutdown() {
        return JobExecutor.State.SHUTDOWN.equals((Object)this.state.get()) || JobExecutor.State.SHUTDOWN_IMMEDIATE.equals((Object)this.state.get());
    }

    @Override
    public boolean isPaused() {
        return this.paused.get();
    }

    @Override
    public boolean isProcessingJob() {
        return this.processingJob.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void togglePause(boolean paused) {
        this.paused.set(paused);
        AtomicBoolean atomicBoolean = this.paused;
        synchronized (atomicBoolean) {
            this.paused.notifyAll();
        }
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    public WorkerEventEmitter getWorkerEventEmitter() {
        return this.listenerDelegate;
    }

    @Override
    public Collection<String> getQueues() {
        return Collections.unmodifiableCollection(this.queueNames);
    }

    @Override
    public void addQueue(String queueName) {
        if (queueName == null || "".equals(queueName)) {
            throw new IllegalArgumentException("queueName must not be null or empty: " + queueName);
        }
        this.queueNames.add(queueName);
    }

    @Override
    public void removeQueue(String queueName, boolean all) {
        if (queueName == null || "".equals(queueName)) {
            throw new IllegalArgumentException("queueName must not be null or empty: " + queueName);
        }
        if (all) {
            boolean tryAgain = true;
            while (tryAgain) {
                tryAgain = this.queueNames.remove(queueName);
            }
        } else {
            this.queueNames.remove(queueName);
        }
    }

    @Override
    public void removeAllQueues() {
        this.queueNames.clear();
    }

    @Override
    public void setQueues(Collection<String> queues) {
        WorkerImpl.checkQueues(queues);
        this.queueNames.clear();
        this.queueNames.addAll(queues == ALL_QUEUES ? this.jedis.smembers(this.key("queues")) : queues);
    }

    @Override
    public JobFactory getJobFactory() {
        return this.jobFactory;
    }

    @Override
    public ExceptionHandler getExceptionHandler() {
        return this.exceptionHandlerRef.get();
    }

    @Override
    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        if (exceptionHandler == null) {
            throw new IllegalArgumentException("exceptionHandler must not be null");
        }
        this.exceptionHandlerRef.set(exceptionHandler);
    }

    public FailQueueStrategy getFailQueueStrategy() {
        return this.failQueueStrategyRef.get();
    }

    public void setFailQueueStrategy(FailQueueStrategy failQueueStrategy) {
        if (failQueueStrategy == null) {
            throw new IllegalArgumentException("failQueueStrategy must not be null");
        }
        this.failQueueStrategyRef.set(failQueueStrategy);
    }

    @Override
    public void join(long millis) throws InterruptedException {
        Thread workerThread = this.threadRef.get();
        if (workerThread != null && workerThread.isAlive()) {
            workerThread.join(millis);
        }
    }

    protected int getReconnectAttempts() {
        return 120;
    }

    protected void poll() {
        int missCount = 0;
        String curQueue = null;
        while (JobExecutor.State.RUNNING.equals((Object)this.state.get())) {
            try {
                if (threadNameChangingEnabled) {
                    this.renameThread("Waiting for " + JesqueUtils.join(",", this.queueNames));
                }
                if ((curQueue = this.getNextQueue()) == null) continue;
                this.checkPaused();
                if (!JobExecutor.State.RUNNING.equals((Object)this.state.get())) continue;
                this.listenerDelegate.fireEvent(WorkerEvent.WORKER_POLL, this, curQueue, null, null, null, null);
                String payload = this.pop(curQueue);
                if (payload != null) {
                    this.process((Job)ObjectMapperFactory.get().readValue(payload, Job.class), curQueue);
                    missCount = 0;
                    continue;
                }
                if (!this.shouldSleep(++missCount) || !JobExecutor.State.RUNNING.equals((Object)this.state.get())) continue;
                missCount = 0;
                Thread.sleep(500L);
            }
            catch (InterruptedException ie) {
                if (this.isShutdown()) continue;
                this.recoverFromException(curQueue, ie);
            }
            catch (JsonParseException | JsonMappingException e) {
                this.removeInFlight(curQueue, true);
                this.recoverFromException(curQueue, (Exception)e);
            }
            catch (Exception e) {
                this.recoverFromException(curQueue, e);
            }
        }
    }

    private boolean shouldSleep(int missCount) {
        return NextQueueStrategy.RESET_TO_HIGHEST_PRIORITY.equals((Object)this.nextQueueStrategy) || missCount >= this.queueNames.size();
    }

    protected String getNextQueue() throws InterruptedException {
        String nextQueue;
        switch (this.nextQueueStrategy) {
            case DRAIN_WHILE_MESSAGES_EXISTS: {
                String nextPollQueue = this.queueNames.poll(500L, TimeUnit.MILLISECONDS);
                if (nextPollQueue != null) {
                    this.queueNames.add(nextPollQueue);
                }
                nextQueue = nextPollQueue;
                break;
            }
            case RESET_TO_HIGHEST_PRIORITY: {
                nextQueue = JesqueUtils.join(",", this.queueNames);
                break;
            }
            default: {
                throw new RuntimeException("Unimplemented 'nextQueueStrategy'");
            }
        }
        return nextQueue;
    }

    protected String pop(String curQueue) {
        String key = this.key("queue", curQueue);
        String now = Long.toString(System.currentTimeMillis());
        String inflightKey = this.key("inflight", this.name, curQueue);
        switch (this.nextQueueStrategy) {
            case DRAIN_WHILE_MESSAGES_EXISTS: {
                return (String)this.jedis.evalsha(this.popScriptHash.get(), 3, new String[]{key, inflightKey, JesqueUtils.createRecurringHashKey(key), now});
            }
            case RESET_TO_HIGHEST_PRIORITY: {
                return (String)this.jedis.evalsha(this.multiPriorityQueuesScriptHash.get(), 3, new String[]{curQueue, inflightKey, this.namespace, now});
            }
        }
        throw new RuntimeException("Unimplemented 'nextQueueStrategy'");
    }

    protected void recoverFromException(String curQueue, Exception ex) {
        RecoveryStrategy recoveryStrategy = this.exceptionHandlerRef.get().onException(this, ex, curQueue);
        switch (recoveryStrategy) {
            case RECONNECT: {
                LOG.info("Reconnecting to Redis in response to exception", (Throwable)ex);
                int reconAttempts = this.getReconnectAttempts();
                if (!JedisUtils.reconnect(this.jedis, reconAttempts, 5000L)) {
                    LOG.warn("Terminating in response to exception after " + reconAttempts + " to reconnect", (Throwable)ex);
                    this.end(false);
                    break;
                }
                this.authenticateAndSelectDB();
                LOG.info("Reconnected to Redis");
                try {
                    this.loadRedisScripts();
                }
                catch (IOException e) {
                    LOG.error("Failed to reload Lua scripts after reconnect", (Throwable)e);
                }
                break;
            }
            case TERMINATE: {
                LOG.warn("Terminating in response to exception", (Throwable)ex);
                this.end(false);
                break;
            }
            case PROCEED: {
                this.listenerDelegate.fireEvent(WorkerEvent.WORKER_ERROR, this, curQueue, null, null, null, ex);
                break;
            }
            default: {
                LOG.error("Unknown RecoveryStrategy: " + (Object)((Object)recoveryStrategy) + " while attempting to recover from the following exception; worker proceeding...", (Throwable)ex);
            }
        }
    }

    private void authenticateAndSelectDB() {
        if (this.config.getPassword() != null) {
            this.jedis.auth(this.config.getPassword());
        }
        this.jedis.select(this.config.getDatabase());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void checkPaused() throws IOException {
        if (this.paused.get()) {
            AtomicBoolean atomicBoolean = this.paused;
            synchronized (atomicBoolean) {
                if (this.paused.get()) {
                    this.jedis.set(this.key("worker", this.name), this.pauseMsg());
                }
                while (this.paused.get()) {
                    try {
                        this.paused.wait();
                    }
                    catch (InterruptedException ie) {
                        LOG.warn("Worker interrupted", (Throwable)ie);
                    }
                }
                this.jedis.del(this.key("worker", this.name));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void process(Job job, String curQueue) {
        boolean success = false;
        try {
            this.processingJob.set(true);
            if (threadNameChangingEnabled) {
                this.renameThread("Processing " + curQueue + " since " + System.currentTimeMillis());
            }
            this.listenerDelegate.fireEvent(WorkerEvent.JOB_PROCESS, this, curQueue, job, null, null, null);
            this.jedis.set(this.key("worker", this.name), this.statusMsg(curQueue, job));
            Object instance = this.jobFactory.materializeJob(job);
            Object result = this.execute(job, curQueue, instance);
            this.success(job, instance, result, curQueue);
            success = true;
            this.removeInFlight(curQueue, success);
        }
        catch (Throwable thrwbl) {
            try {
                this.failure(thrwbl, job, curQueue);
                this.removeInFlight(curQueue, success);
            }
            catch (Throwable throwable) {
                this.removeInFlight(curQueue, success);
                this.jedis.del(this.key("worker", this.name));
                this.processingJob.set(false);
                throw throwable;
            }
            this.jedis.del(this.key("worker", this.name));
            this.processingJob.set(false);
        }
        this.jedis.del(this.key("worker", this.name));
        this.processingJob.set(false);
    }

    private void removeInFlight(String curQueue, boolean skipRequeue) {
        if (JobExecutor.State.SHUTDOWN_IMMEDIATE.equals((Object)this.state.get()) && !skipRequeue) {
            this.lpoplpush(this.key("inflight", this.name, curQueue), this.key("queue", curQueue));
        } else {
            this.jedis.lpop(this.key("inflight", this.name, curQueue));
        }
    }

    protected Object execute(Job job, String curQueue, Object instance) throws Exception {
        Object result;
        if (instance instanceof WorkerAware) {
            ((WorkerAware)instance).setWorker(this);
        }
        this.listenerDelegate.fireEvent(WorkerEvent.JOB_EXECUTE, this, curQueue, job, instance, null, null);
        if (instance instanceof Callable) {
            result = ((Callable)instance).call();
        } else if (instance instanceof Runnable) {
            ((Runnable)instance).run();
            result = null;
        } else {
            throw new ClassCastException("Instance must be a Runnable or a Callable: " + instance.getClass().getName() + " - " + instance);
        }
        return result;
    }

    protected void success(Job job, Object runner, Object result, String curQueue) {
        JedisUtils.ensureJedisConnection(this.jedis);
        try {
            this.jedis.incr(this.key("stat", "processed"));
            this.jedis.incr(this.key("stat", "processed", this.name));
        }
        catch (JedisException je) {
            LOG.warn("Error updating success stats for job=" + job, (Throwable)je);
        }
        this.listenerDelegate.fireEvent(WorkerEvent.JOB_SUCCESS, this, curQueue, job, runner, result, null);
    }

    protected void failure(Throwable thrwbl, Job job, String curQueue) {
        JedisUtils.ensureJedisConnection(this.jedis);
        try {
            this.jedis.incr(this.key("stat", "failed"));
            this.jedis.incr(this.key("stat", "failed", this.name));
            FailQueueStrategy strategy = this.failQueueStrategyRef.get();
            String failQueueKey = strategy.getFailQueueKey(thrwbl, job, curQueue);
            if (failQueueKey != null) {
                int failQueueMaxItems = strategy.getFailQueueMaxItems(curQueue);
                if (failQueueMaxItems > 0) {
                    Long currentItems = this.jedis.llen(failQueueKey);
                    if (currentItems >= (long)failQueueMaxItems) {
                        Transaction tx = this.jedis.multi();
                        tx.ltrim(failQueueKey, 1L, -1L);
                        tx.rpush(failQueueKey, new String[]{this.failMsg(thrwbl, curQueue, job)});
                        tx.exec();
                    } else {
                        this.jedis.rpush(failQueueKey, new String[]{this.failMsg(thrwbl, curQueue, job)});
                    }
                } else {
                    this.jedis.rpush(failQueueKey, new String[]{this.failMsg(thrwbl, curQueue, job)});
                }
            }
        }
        catch (JedisException je) {
            LOG.warn("Error updating failure stats for throwable=" + thrwbl + " job=" + job, (Throwable)je);
        }
        catch (IOException ioe) {
            LOG.warn("Error serializing failure payload for throwable=" + thrwbl + " job=" + job, (Throwable)ioe);
        }
        this.listenerDelegate.fireEvent(WorkerEvent.JOB_FAILURE, this, curQueue, job, null, null, thrwbl);
    }

    protected String failMsg(Throwable thrwbl, String queue, Job job) throws IOException {
        JobFailure failure = new JobFailure();
        failure.setFailedAt(new Date());
        failure.setWorker(this.name);
        failure.setQueue(queue);
        failure.setPayload(job);
        failure.setThrowable(thrwbl);
        return ObjectMapperFactory.get().writeValueAsString((Object)failure);
    }

    protected String statusMsg(String queue, Job job) throws IOException {
        WorkerStatus status = new WorkerStatus();
        status.setRunAt(new Date());
        status.setQueue(queue);
        status.setPayload(job);
        return ObjectMapperFactory.get().writeValueAsString((Object)status);
    }

    protected String pauseMsg() throws IOException {
        WorkerStatus status = new WorkerStatus();
        status.setRunAt(new Date());
        status.setPaused(this.isPaused());
        return ObjectMapperFactory.get().writeValueAsString((Object)status);
    }

    protected String createName() {
        StringBuilder buf = new StringBuilder(128);
        try {
            buf.append(InetAddress.getLocalHost().getHostName()).append(":").append(ManagementFactory.getRuntimeMXBean().getName().split("@")[0]).append('-').append(this.workerId).append(":").append("JAVA_DYNAMIC_QUEUES");
            for (String queueName : this.queueNames) {
                buf.append(',').append(queueName);
            }
        }
        catch (UnknownHostException uhe) {
            throw new RuntimeException(uhe);
        }
        return buf.toString();
    }

    protected String key(String ... parts) {
        return JesqueUtils.createKey(this.namespace, parts);
    }

    protected void renameThread(String msg) {
        Thread.currentThread().setName(this.threadNameBase + msg);
    }

    protected String lpoplpush(String from, String to) {
        return (String)this.jedis.evalsha(this.lpoplpushScriptHash.get(), 2, new String[]{from, to});
    }

    private void loadRedisScripts() throws IOException {
        this.popScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_LUA)));
        this.lpoplpushScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(LPOPLPUSH_LUA)));
        this.multiPriorityQueuesScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_FROM_MULTIPLE_PRIO_QUEUES)));
    }

    public String toString() {
        return this.namespace + ":" + "worker" + ":" + this.name;
    }
}

