/*
 * Decompiled with CFR 0.152.
 */
package org.visallo.model.queue.inmemory;

import com.google.inject.Inject;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.json.JSONObject;
import org.vertexium.Graph;
import org.visallo.core.config.Configuration;
import org.visallo.core.ingest.WorkerSpout;
import org.visallo.core.ingest.WorkerTuple;
import org.visallo.core.model.WorkQueueNames;
import org.visallo.core.model.workQueue.Priority;
import org.visallo.core.model.workQueue.WorkQueueRepository;
import org.visallo.core.status.model.QueueStatus;
import org.visallo.core.status.model.Status;

public class InMemoryWorkQueueRepository
extends WorkQueueRepository {
    private static Map<String, List<byte[]>> queues = new HashMap<String, List<byte[]>>();
    private List<WorkQueueRepository.BroadcastConsumer> broadcastConsumers = new ArrayList<WorkQueueRepository.BroadcastConsumer>();

    @Inject
    public InMemoryWorkQueueRepository(Graph graph, WorkQueueNames workQueueNames, Configuration configuration) {
        super(graph, workQueueNames, configuration);
    }

    protected void broadcastJson(JSONObject json) {
        for (WorkQueueRepository.BroadcastConsumer consumer : this.broadcastConsumers) {
            consumer.broadcastReceived(json);
        }
    }

    public void pushOnQueue(String queueName, byte[] data, Priority priority) {
        LOGGER.debug("push on queue: %s: %s", new Object[]{queueName, data});
        this.addToQueue(queueName, data, priority);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void addToQueue(String queueName, byte[] data, Priority priority) {
        List<byte[]> queue;
        List<byte[]> list = queue = InMemoryWorkQueueRepository.getQueue(queueName);
        synchronized (list) {
            if (priority == Priority.HIGH) {
                queue.add(0, data);
            } else {
                queue.add(data);
            }
            queue.notifyAll();
        }
    }

    public void flush() {
    }

    public void format() {
        InMemoryWorkQueueRepository.clearQueue();
    }

    public void subscribeToBroadcastMessages(WorkQueueRepository.BroadcastConsumer broadcastConsumer) {
        this.broadcastConsumers.add(broadcastConsumer);
    }

    public WorkerSpout createWorkerSpout(String queueName) {
        final List<byte[]> queue = InMemoryWorkQueueRepository.getQueue(queueName);
        return new WorkerSpout(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public WorkerTuple nextTuple() throws Exception {
                List list = queue;
                synchronized (list) {
                    if (queue.size() == 0) {
                        Thread.sleep(100L);
                        return null;
                    }
                    byte[] entry = (byte[])queue.remove(0);
                    if (entry == null) {
                        Thread.sleep(100L);
                        return null;
                    }
                    return new WorkerTuple((Object)"", entry);
                }
            }
        };
    }

    public Map<String, Status> getQueuesStatus() {
        HashMap<String, Status> results = new HashMap<String, Status>();
        for (Map.Entry<String, List<byte[]>> queue : queues.entrySet()) {
            results.put(queue.getKey(), (Status)new QueueStatus(queue.getValue().size()));
        }
        return results;
    }

    public static void clearQueue() {
        queues.clear();
    }

    protected void deleteQueue(String queueName) {
        queues.remove(queueName);
    }

    public static List<byte[]> getQueue(String queueName) {
        List<byte[]> queue = queues.get(queueName);
        if (queue == null) {
            queue = new LinkedList<byte[]>();
            queues.put(queueName, queue);
        }
        return queue;
    }
}

