/*
 * Decompiled with CFR 0.152.
 */
package org.vanilladb.comm.protocols.allAckURB;

import java.net.SocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;
import net.sf.appia.core.AppiaEventException;
import net.sf.appia.core.Event;
import net.sf.appia.core.Layer;
import net.sf.appia.core.Session;
import net.sf.appia.core.events.SendableEvent;
import net.sf.appia.core.events.channel.ChannelInit;
import org.vanilladb.comm.protocols.events.Crash;
import org.vanilladb.comm.protocols.events.ProcessInitEvent;
import org.vanilladb.comm.protocols.utils.Debug;
import org.vanilladb.comm.protocols.utils.MessageEntry;
import org.vanilladb.comm.protocols.utils.MessageID;
import org.vanilladb.comm.protocols.utils.ProcessSet;
import org.vanilladb.comm.protocols.utils.SampleProcess;

public class AllAckURBSession
extends Session {
    private final int DELIVERED_SHRINK_SIZE = 100;
    private ProcessSet processes;
    private long seqNumber;
    private Set<MessageID> received;
    private Set<MessageID> delivered;
    private long[] old_delivered;
    private Map<MessageID, MessageEntry> ack;
    private List<MessageID> toBeDeletedAck;

    public AllAckURBSession(Layer layer) {
        super(layer);
    }

    public void handle(Event event) {
        if (event instanceof ChannelInit) {
            this.handleChannelInit((ChannelInit)event);
        } else if (event instanceof ProcessInitEvent) {
            this.handleProcessInitEvent((ProcessInitEvent)event);
        } else if (event instanceof SendableEvent) {
            if (event.getDir() == -1) {
                this.urbBroadcast((SendableEvent)event);
            } else {
                this.bebDeliver((SendableEvent)event);
            }
        } else if (event instanceof Crash) {
            this.handleCrash((Crash)event);
        } else {
            try {
                event.go();
            }
            catch (AppiaEventException e) {
                e.printStackTrace();
            }
        }
        this.urbTryDeliver();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void urbTryDeliver() {
        AllAckURBSession allAckURBSession = this;
        synchronized (allAckURBSession) {
            for (MessageEntry entry : this.ack.values()) {
                if (!this.canDeliver(entry)) continue;
                this.delivered.add(entry.messageID);
                this.received.remove(entry.messageID);
                this.toBeDeletedAck.add(entry.messageID);
                this.shrinkDelivered(entry.messageID);
                this.urbDeliver(entry.event, entry.messageID.process);
            }
            for (MessageID key : this.toBeDeletedAck) {
                this.ack.remove(key);
            }
            this.toBeDeletedAck.clear();
        }
    }

    private boolean canDeliver(MessageEntry entry) {
        int procSize = this.processes.getSize();
        for (int i = 0; i < procSize; ++i) {
            if (!this.processes.getProcess(i).isCorrect() || entry.acks[i]) continue;
            return false;
        }
        return this.old_delivered[entry.messageID.process] < entry.messageID.seqNumber && !this.delivered.contains(entry.messageID) && this.received.contains(entry.messageID);
    }

    private void handleChannelInit(ChannelInit init) {
        try {
            init.go();
        }
        catch (AppiaEventException e) {
            e.printStackTrace();
        }
        this.received = new HashSet<MessageID>();
        this.delivered = new HashSet<MessageID>();
        this.ack = new HashMap<MessageID, MessageEntry>();
        this.toBeDeletedAck = new LinkedList<MessageID>();
    }

    private void handleProcessInitEvent(ProcessInitEvent event) {
        this.processes = event.getProcessSet();
        this.old_delivered = new long[this.processes.getSize()];
        for (int i = 0; i < this.processes.getSize(); ++i) {
            this.old_delivered[i] = -1L;
        }
        try {
            event.go();
        }
        catch (AppiaEventException e) {
            e.printStackTrace();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void urbBroadcast(SendableEvent event) {
        SampleProcess self = this.processes.getSelfProcess();
        MessageID msgID = new MessageID(self.getProcessNumber(), this.seqNumber);
        Debug.print("URB: broadcasting message from " + msgID.process + "with seqNumber = " + msgID.seqNumber);
        ++this.seqNumber;
        AllAckURBSession allAckURBSession = this;
        synchronized (allAckURBSession) {
            this.received.add(msgID);
        }
        event.getMessage().pushObject((Object)msgID);
        try {
            event.go();
        }
        catch (AppiaEventException e) {
            e.printStackTrace();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void bebDeliver(SendableEvent event) {
        Debug.print("URB: Received message from beb.");
        SendableEvent clone = null;
        try {
            clone = (SendableEvent)event.cloneEvent();
        }
        catch (CloneNotSupportedException e) {
            e.printStackTrace();
            return;
        }
        MessageID msgID = (MessageID)clone.getMessage().popObject();
        AllAckURBSession allAckURBSession = this;
        synchronized (allAckURBSession) {
            this.addAck(clone, msgID);
            if (this.old_delivered[msgID.process] < msgID.seqNumber && !this.received.contains(msgID)) {
                Debug.print("URB: Message is not on the received set.");
                this.received.add(msgID);
                this.bebBroadcast(event);
            }
        }
    }

    private void bebBroadcast(SendableEvent event) {
        Debug.print("URB: sending message to beb.");
        try {
            event.setDir(-1);
            event.setSourceSession((Session)this);
            event.init();
            event.go();
        }
        catch (AppiaEventException e) {
            e.printStackTrace();
        }
    }

    private void urbDeliver(SendableEvent event, int sender) {
        Debug.print("URB: delivering message to above protocol.");
        try {
            event.setDir(1);
            event.setSourceSession((Session)this);
            event.source = this.processes.getProcess(sender).getSocketAddress();
            event.init();
            event.go();
        }
        catch (AppiaEventException e) {
            e.printStackTrace();
        }
    }

    private void handleCrash(Crash crash) {
        int crashedProcess = crash.getCrashedProcess();
        Logger.getLogger(AllAckURBSession.class.getName()).fine("Process " + crashedProcess + " failed.");
        this.processes.getProcess(crashedProcess).setCorrect(false);
    }

    private void addAck(SendableEvent event, MessageID msgID) {
        Debug.print("URB: adding ack.");
        int pi = this.processes.getProcess((SocketAddress)event.source).getProcessNumber();
        MessageEntry entry = this.ack.get(msgID);
        if (this.old_delivered[msgID.process] < msgID.seqNumber) {
            if (entry == null) {
                Debug.print("URB: first time that the message is seen.");
                entry = new MessageEntry(event, msgID, this.processes.getSize());
                this.ack.put(msgID, entry);
            }
            entry.acks[pi] = true;
        }
    }

    private void shrinkDelivered(MessageID mid) {
        int pid = mid.process;
        long sn = mid.seqNumber;
        long old_sn = this.old_delivered[pid];
        if (this.delivered.size() < 100) {
            return;
        }
        MessageID tmid = new MessageID(pid, old_sn);
        long i = old_sn + 1L;
        while (i <= sn) {
            tmid.seqNumber = i++;
            if (!this.delivered.remove(tmid)) break;
        }
        this.old_delivered[pid] = i - 1L;
    }
}

