/*
 * Decompiled with CFR 0.152.
 */
package org.vanilladb.comm.protocols.zabTotalOrder;

import java.util.ArrayDeque;
import java.util.LinkedList;
import java.util.Queue;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.sf.appia.core.AppiaEventException;
import net.sf.appia.core.Channel;
import net.sf.appia.core.Event;
import net.sf.appia.core.Layer;
import net.sf.appia.core.Session;
import net.sf.appia.core.events.channel.ChannelInit;
import org.vanilladb.comm.messages.TotalOrderMessage;
import org.vanilladb.comm.protocols.consensusUtils.PaxosObjectProposal;
import org.vanilladb.comm.protocols.consensusUtils.PaxosProposal;
import org.vanilladb.comm.protocols.events.Crash;
import org.vanilladb.comm.protocols.events.PaxosPropose;
import org.vanilladb.comm.protocols.events.PaxosReturn;
import org.vanilladb.comm.protocols.events.ProcessInitEvent;
import org.vanilladb.comm.protocols.events.ZabCacheTom;
import org.vanilladb.comm.protocols.events.ZabCommit;
import org.vanilladb.comm.protocols.events.ZabRequest;
import org.vanilladb.comm.protocols.events.ZabTomResult;
import org.vanilladb.comm.protocols.utils.ProcessSet;
import org.vanilladb.comm.protocols.zabTotalOrder.ZabTOBEvent;

public class ZabTOBSession
extends Session {
    private ProcessSet processes;
    private Queue<Object> msg_queue;
    private int leader;
    private boolean proposed;
    private int epoch;
    private int sn;
    private long toid = 0L;
    private long tosn = 0L;
    private Channel channel;
    private TotalOrderMessage cachedTom;
    private long lastTime = 0L;

    public ZabTOBSession(Layer layer) {
        super(layer);
    }

    public void handle(Event event) {
        if (event instanceof ZabRequest) {
            this.handleZabRequest((ZabRequest)event);
        } else if (event instanceof ZabTOBEvent) {
            this.handleZabTOBEvent((ZabTOBEvent)event);
        } else if (event instanceof PaxosReturn) {
            this.handleConsensusDecide((PaxosReturn)event);
        } else if (event instanceof ChannelInit) {
            this.handleChannelInit((ChannelInit)event);
        } else if (event instanceof ProcessInitEvent) {
            this.handleProcessInitEvent((ProcessInitEvent)event);
        } else if (event instanceof ZabCommit) {
            this.handleZabCommit((ZabCommit)event);
        } else if (event instanceof ZabCacheTom) {
            this.handleZabCacheTom((ZabCacheTom)event);
        } else if (event instanceof Crash) {
            this.handleCrash((Crash)event);
        }
    }

    private void handleProcessInitEvent(ProcessInitEvent event) {
        this.processes = event.getProcessSet();
        this.msg_queue = new ArrayDeque<Object>();
        this.leader = event.getProcessSet().getSize() - 1;
        this.proposed = false;
        this.epoch = 0;
        this.sn = 0;
        try {
            event.go();
        }
        catch (AppiaEventException e) {
            e.printStackTrace();
        }
    }

    private void handleChannelInit(ChannelInit init) {
        this.channel = init.getChannel();
        try {
            init.go();
        }
        catch (AppiaEventException e) {
            e.printStackTrace();
        }
    }

    private void handleZabRequest(ZabRequest event) {
        if (this.leader == this.processes.getSelfRank()) {
            this.msg_queue.add(event.getObject());
            this.zabPropose();
        } else {
            try {
                ZabTOBEvent ev = new ZabTOBEvent(this.channel, -1, this);
                ev.getMessage().pushObject(event.getObject());
                ev.source = this.processes.getSelfProcess().getSocketAddress();
                ev.dest = this.processes.getProcess(this.leader).getSocketAddress();
                ev.init();
                ev.go();
            }
            catch (AppiaEventException e) {
                e.printStackTrace();
            }
        }
    }

    private void handleZabTOBEvent(ZabTOBEvent event) {
        this.msg_queue.add(event.getMessage().popObject());
        this.zabPropose();
    }

    private void handleConsensusDecide(PaxosReturn pr) {
        this.proposed = false;
        PaxosProposal pp = pr.decision;
        PaxosObjectProposal pop = (PaxosObjectProposal)pp;
        if (!pp.abort) {
            if (Logger.getLogger(ZabTOBSession.class.getName()).isLoggable(Level.FINE)) {
                Logger.getLogger(ZabTOBSession.class.getName()).fine("consensus decided " + this.tosn);
                Logger.getLogger(ZabTOBSession.class.getName()).fine("TOB Queue size = " + this.msg_queue.size());
            }
            try {
                ZabCommit zc = new ZabCommit(this.channel, -1, this);
                zc.init();
                zc.go();
            }
            catch (AppiaEventException ex) {
                ex.printStackTrace();
            }
        } else {
            this.msg_queue.add(pop.obj);
        }
        this.zabPropose();
    }

    private void zabPropose() {
        if (!this.proposed && !this.msg_queue.isEmpty()) {
            this.proposed = true;
            LinkedList<Object> list = new LinkedList<Object>();
            while (!this.msg_queue.isEmpty()) {
                Object[] objectArray = ((TotalOrderMessage)this.msg_queue.poll()).getMessages();
                int n = objectArray.length;
                int n2 = 0;
                while (n2 < n) {
                    Object o = objectArray[n2];
                    list.add(o);
                    ++n2;
                }
            }
            TotalOrderMessage tom = new TotalOrderMessage(list.toArray(new Object[0]));
            tom.setTotalOrderIdStart(this.toid);
            tom.setTotalOrderSequenceNumber(this.tosn++);
            this.toid += (long)tom.getMessages().length;
            PaxosObjectProposal proposal = new PaxosObjectProposal(tom);
            try {
                PaxosPropose event = new PaxosPropose(this.channel, -1, this);
                event.value = proposal;
                event.epoch = this.epoch;
                event.sn = ++this.sn;
                event.init();
                event.go();
            }
            catch (AppiaEventException e) {
                e.printStackTrace();
            }
        }
    }

    private void handleZabCommit(ZabCommit zc) {
        try {
            ZabTomResult ztr = new ZabTomResult(this.channel, 1, this, this.cachedTom);
            ztr.init();
            ztr.go();
        }
        catch (AppiaEventException e) {
            e.printStackTrace();
        }
    }

    private void handleZabCacheTom(ZabCacheTom zct) {
        this.cachedTom = zct.getTom();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleCrash(Crash crash) {
        int crashedProcess = crash.getCrashedProcess();
        Logger.getLogger(ZabTOBSession.class.getName()).fine("Process " + crashedProcess + " failed.");
        ZabTOBSession zabTOBSession = this;
        synchronized (zabTOBSession) {
            this.processes.getProcess(crashedProcess).setCorrect(false);
            if (crashedProcess == this.leader) {
                int i = 0;
                while (i < this.processes.getSize()) {
                    if (this.processes.getProcess(i).isCorrect()) {
                        this.leader = i;
                    }
                    ++i;
                }
                ++this.epoch;
                this.sn = 0;
            }
            if (this.processes.getSelfRank() == this.leader) {
                this.proposed = false;
                this.msg_queue.clear();
            }
        }
        try {
            crash.go();
        }
        catch (AppiaEventException e) {
            e.printStackTrace();
        }
    }
}

