/*
 * Decompiled with CFR 0.152.
 */
package org.restcomm.chain.impl;

import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.restcomm.chain.ParallelProcessorChain;
import org.restcomm.chain.impl.MalformedProcessorChainException;
import org.restcomm.chain.processor.Processor;
import org.restcomm.chain.processor.impl.DefaultDPIProcessor;
import org.restcomm.chain.processor.impl.DefaultProcessor;
import org.restcomm.chain.processor.impl.ImmutableMessage;
import org.restcomm.chain.processor.impl.MutableMessage;
import org.restcomm.chain.processor.impl.ProcessorParsingException;

public abstract class DefaultParallelProcessorChain
extends DefaultDPIProcessor
implements ParallelProcessorChain {
    private static transient Logger LOG = Logger.getLogger(DefaultParallelProcessorChain.class);
    private Processor nextLink;
    private HashMap<Integer, Processor> processors = new HashMap();

    public DefaultParallelProcessorChain() {
    }

    public DefaultParallelProcessorChain(String name) {
        super(name);
    }

    @Override
    public void process(MutableMessage message) throws ProcessorParsingException {
        MutableMessage immutableMessage = message;
        this.fireProcessingEvent(immutableMessage, (Processor)((Object)this.getCallback()));
        ExecutorService taskExecutor = Executors.newFixedThreadPool(this.processors.size());
        for (Processor processor : this.processors.values()) {
            taskExecutor.execute(new ProcessorTask(immutableMessage, processor));
        }
        taskExecutor.shutdown();
        try {
            taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            LOG.error((Object)e.getMessage());
        }
        if (message != null && LOG.isDebugEnabled()) {
            LOG.debug((Object)("<< DPC " + (Object)((Object)this.type) + " output message [" + message + "]"));
        }
        this.fireEndEvent(message, (Processor)((Object)this.getCallback()));
        Processor nextLink = null;
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("DPC " + (Object)((Object)this.type) + " from callback " + this.getName() + " chain " + this.getCallback()));
        }
        if ((nextLink = this.getNextLink((DefaultProcessor)((Object)this.getCallback()))) != null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("DPC " + (Object)((Object)this.type) + " from callback " + this.getName() + " nextlink " + nextLink));
            }
            nextLink.process(message);
        }
    }

    @Override
    public String getName() {
        return "Raw Default Parallel Chain Implementation";
    }

    @Override
    public int getId() {
        return this.hashCode();
    }

    @Override
    public void link(Processor processor) throws MalformedProcessorChainException {
        if (processor == null) {
            throw new MalformedProcessorChainException("Processors could not be null");
        }
        this.processors.put(processor.getId(), processor);
    }

    @Override
    public Processor getNextLink(Processor processor) {
        return this.nextLink;
    }

    @Override
    public void setNextLink(Processor processor) throws MalformedProcessorChainException {
        this.nextLink = processor;
    }

    class ProcessorTask
    implements Runnable {
        private ImmutableMessage message;
        private Processor processor;

        ProcessorTask(ImmutableMessage message, Processor processor) {
            this.message = message;
            this.processor = processor;
        }

        @Override
        public void run() {
            try {
                DefaultParallelProcessorChain.this.fireProcessingEvent(this.message, this.processor);
                this.processor.getCallback().doProcess(this.message);
                if (DefaultParallelProcessorChain.this.chain != null && LOG.isDebugEnabled()) {
                    LOG.debug((Object)("DPC " + (Object)((Object)DefaultParallelProcessorChain.this.type) + " from callback " + DefaultParallelProcessorChain.this.getCallback() + " chain " + DefaultParallelProcessorChain.this.chain));
                }
                DefaultParallelProcessorChain.this.fireEndEvent(this.message, this.processor);
            }
            catch (ProcessorParsingException e) {
                LOG.error((Object)e.getMessage());
            }
        }
    }
}

