/*
 * Decompiled with CFR 0.152.
 */
package org.visallo.core.externalResource;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.Timer;
import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
import org.json.JSONObject;
import org.vertexium.Authorizations;
import org.visallo.core.externalResource.ExternalResourceWorker;
import org.visallo.core.ingest.WorkerSpout;
import org.visallo.core.ingest.WorkerTuple;
import org.visallo.core.model.user.AuthorizationRepository;
import org.visallo.core.model.user.UserRepository;
import org.visallo.core.model.workQueue.WorkQueueRepository;
import org.visallo.core.status.MetricEntry;
import org.visallo.core.user.User;
import org.visallo.core.util.VisalloLogger;
import org.visallo.core.util.VisalloLoggerFactory;

public abstract class QueueExternalResourceWorker
extends ExternalResourceWorker {
    public static final String QUEUE_NAME_PREFIX = "externalResource-";
    private final AuthorizationRepository authorizationRepository;
    private WorkQueueRepository workQueueRepository;
    private UserRepository userRepository;
    private volatile boolean shouldRun;
    private Timer processingTimeTimer;
    private Counter totalProcessedCounter;
    private Counter totalErrorCounter;
    private Collection<MetricEntry> metrics;

    protected QueueExternalResourceWorker(AuthorizationRepository authorizationRepository) {
        this.authorizationRepository = authorizationRepository;
    }

    @Override
    protected void prepare(User user) {
        super.prepare(user);
        this.totalProcessedCounter = this.getMetricsManager().counter(this, "total-processed");
        this.totalErrorCounter = this.getMetricsManager().counter(this, "total-errors");
        this.processingTimeTimer = this.getMetricsManager().timer(this, "processing-time");
        this.metrics = new ArrayList<MetricEntry>();
        this.metrics.add(new MetricEntry("totalProcessed", (Metric)this.totalProcessedCounter));
        this.metrics.add(new MetricEntry("totalErrors", (Metric)this.totalErrorCounter));
        this.metrics.add(new MetricEntry("processingTime", (Metric)this.processingTimeTimer));
    }

    @Override
    protected void run() throws Exception {
        VisalloLogger logger = VisalloLoggerFactory.getLogger(this.getClass());
        Authorizations authorizations = this.authorizationRepository.getGraphAuthorizations(this.getUserRepository().getSystemUser(), new String[0]);
        WorkerSpout workerSpout = this.workQueueRepository.createWorkerSpout(this.getQueueName());
        workerSpout.open();
        this.shouldRun = true;
        while (this.shouldRun) {
            WorkerTuple tuple = workerSpout.nextTuple();
            if (tuple == null) {
                Thread.sleep(100L);
                continue;
            }
            try {
                Timer.Context t = this.processingTimeTimer.time();
                Throwable throwable = null;
                try {
                    long startTime = System.currentTimeMillis();
                    JSONObject json = new JSONObject(new String(tuple.getData()));
                    this.process(tuple.getMessageId(), json, authorizations);
                    long endTime = System.currentTimeMillis();
                    logger.debug("completed processing in (%dms)", endTime - startTime);
                    workerSpout.ack(tuple);
                    this.totalProcessedCounter.inc();
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (t == null) continue;
                    if (throwable != null) {
                        try {
                            t.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                        continue;
                    }
                    t.close();
                }
            }
            catch (Throwable ex) {
                logger.error("Could not process tuple: %s", tuple, ex);
                this.totalErrorCounter.inc();
                workerSpout.fail(tuple);
            }
        }
        logger.debug("end runner", new Object[0]);
    }

    @Override
    public void stop() {
        this.shouldRun = false;
    }

    protected abstract void process(Object var1, JSONObject var2, Authorizations var3) throws Exception;

    public abstract String getQueueName();

    @Inject
    public final void setWorkQueueRepository(WorkQueueRepository workQueueRepository) {
        this.workQueueRepository = workQueueRepository;
    }

    public WorkQueueRepository getWorkQueueRepository() {
        return this.workQueueRepository;
    }

    public UserRepository getUserRepository() {
        return this.userRepository;
    }

    @Inject
    public final void setUserRepository(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    @Override
    public Collection<MetricEntry> getMetrics() {
        return this.metrics;
    }
}

