/*
 * Decompiled with CFR 0.152.
 */
package org.vanilladb.comm.protocols.urb;

import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.sf.appia.core.AppiaEventException;
import net.sf.appia.core.Event;
import net.sf.appia.core.Layer;
import net.sf.appia.core.Session;
import org.vanilladb.comm.process.ProcessList;
import org.vanilladb.comm.process.ProcessState;
import org.vanilladb.comm.protocols.beb.Broadcast;
import org.vanilladb.comm.protocols.events.ProcessListInit;
import org.vanilladb.comm.protocols.rb.MessageId;
import org.vanilladb.comm.protocols.tcpfd.AllProcessesReady;
import org.vanilladb.comm.protocols.tcpfd.FailureDetected;
import org.vanilladb.comm.protocols.urb.UniformReliableBroadcast;

public class UniformReliableBroadcastSession
extends Session {
    private static Logger logger = Logger.getLogger(UniformReliableBroadcastSession.class.getName());
    private ProcessList processList;
    private Map<Integer, Set<Integer>> delivered = new HashMap<Integer, Set<Integer>>();
    private Map<Integer, Map<Integer, UniformReliableBroadcast>> pending = new HashMap<Integer, Map<Integer, UniformReliableBroadcast>>();
    private Map<Integer, Map<Integer, Set<Integer>>> acks = new HashMap<Integer, Map<Integer, Set<Integer>>>();
    private int sequenceNumber = 0;

    UniformReliableBroadcastSession(Layer layer) {
        super(layer);
    }

    @Override
    public void handle(Event event) {
        if (event instanceof ProcessListInit) {
            this.handleProcessListInit((ProcessListInit)event);
        } else if (event instanceof AllProcessesReady) {
            this.handleAllProcessesReady((AllProcessesReady)event);
        } else if (event instanceof FailureDetected) {
            this.handleFailureDetected((FailureDetected)event);
        } else if (event instanceof UniformReliableBroadcast) {
            if (event.getDir() == -1) {
                this.handleBroadcastRequest((UniformReliableBroadcast)event);
            } else {
                this.handleBroadcastDeliver((UniformReliableBroadcast)event);
            }
        }
        this.tryDeliver();
    }

    private void handleProcessListInit(ProcessListInit event) {
        if (logger.isLoggable(Level.FINE)) {
            logger.fine("Received ProcessListInit");
        }
        this.processList = event.copyProcessList();
        try {
            event.go();
        }
        catch (AppiaEventException e) {
            e.printStackTrace();
        }
        for (int pid = 0; pid < this.processList.getSize(); ++pid) {
            this.delivered.put(pid, new HashSet());
            this.pending.put(pid, new HashMap());
            this.acks.put(pid, new HashMap());
        }
    }

    private void handleAllProcessesReady(AllProcessesReady event) {
        if (logger.isLoggable(Level.FINE)) {
            logger.fine("Received AllProcessesReady");
        }
        for (int i = 0; i < this.processList.getSize(); ++i) {
            this.processList.getProcess(i).setState(ProcessState.CORRECT);
        }
        try {
            event.go();
        }
        catch (AppiaEventException e) {
            e.printStackTrace();
        }
    }

    private void handleFailureDetected(FailureDetected event) {
        if (logger.isLoggable(Level.FINE)) {
            logger.fine("Received FailureDetected (failed id = " + event.getFailedProcessId() + ")");
        }
        this.processList.getProcess(event.getFailedProcessId()).setState(ProcessState.FAILED);
        try {
            event.go();
        }
        catch (AppiaEventException e) {
            e.printStackTrace();
        }
    }

    private void handleBroadcastRequest(UniformReliableBroadcast event) {
        if (logger.isLoggable(Level.FINE)) {
            logger.fine("Received a Broadcast request from a upper layer");
        }
        int seqNum = this.sequenceNumber++;
        int selfPid = this.processList.getSelfId();
        MessageId id = new MessageId(selfPid, seqNum);
        event.getMessage().pushObject(id);
        try {
            this.pending.get(selfPid).put(seqNum, (UniformReliableBroadcast)event.cloneEvent());
        }
        catch (CloneNotSupportedException e) {
            e.printStackTrace();
        }
        try {
            event.setSourceSession(this);
            event.setDir(-1);
            event.init();
            event.go();
        }
        catch (AppiaEventException e) {
            e.printStackTrace();
        }
    }

    private void handleBroadcastDeliver(UniformReliableBroadcast event) {
        if (logger.isLoggable(Level.FINE)) {
            logger.fine("Received a delivered broadcast from a lower layer");
        }
        MessageId id = (MessageId)event.getMessage().popObject();
        int sourcePid = id.getSourceProcessId();
        int seqNum = id.getSequenceNumber();
        int senderPid = this.processList.getId((SocketAddress)event.source);
        this.receiveAck(id, senderPid);
        if (this.pending.get(sourcePid).containsKey(seqNum)) {
            try {
                this.pending.get(sourcePid).put(seqNum, (UniformReliableBroadcast)event.cloneEvent());
            }
            catch (CloneNotSupportedException e) {
                e.printStackTrace();
            }
            try {
                event.setSourceSession(this);
                event.setDir(-1);
                event.init();
                event.go();
            }
            catch (AppiaEventException e) {
                e.printStackTrace();
            }
        }
    }

    private void receiveAck(MessageId id, int senderPid) {
        Set<Integer> ackSet = this.acks.get(id.getSourceProcessId()).get(id.getSequenceNumber());
        if (ackSet == null) {
            ackSet = new HashSet<Integer>();
            this.acks.get(id.getSourceProcessId()).put(id.getSequenceNumber(), ackSet);
        }
        ackSet.add(senderPid);
    }

    private void tryDeliver() {
        ArrayList<Integer> deletedFromPending = new ArrayList<Integer>();
        for (Map.Entry<Integer, Map<Integer, UniformReliableBroadcast>> entry : this.pending.entrySet()) {
            int sourcePid = entry.getKey();
            Map<Integer, UniformReliableBroadcast> messages = entry.getValue();
            deletedFromPending.clear();
            for (Map.Entry<Integer, UniformReliableBroadcast> messageEntry : messages.entrySet()) {
                int seqNum = messageEntry.getKey();
                Broadcast message = messageEntry.getValue();
                if (!this.canDeliver(sourcePid, seqNum)) continue;
                this.deliver(sourcePid, seqNum, message);
                deletedFromPending.add(seqNum);
            }
            for (Integer sn : deletedFromPending) {
                messages.remove(sn);
            }
        }
    }

    private boolean canDeliver(int sourcePid, int seqNum) {
        Set<Integer> ackProcesses = this.acks.get(sourcePid).get(seqNum);
        return ackProcesses != null && ackProcesses.containsAll(this.processList.getCorrectProcessIds());
    }

    private void deliver(int sourcePid, int seqNum, Broadcast message) {
        this.delivered.get(sourcePid).add(seqNum);
        try {
            message.setSourceSession(this);
            message.setDir(1);
            message.init();
            message.go();
        }
        catch (AppiaEventException e) {
            e.printStackTrace();
        }
    }
}

