/*
 * Decompiled with CFR 0.152.
 */
package org.vanilladb.comm.protocols.zabproposal;

import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Queue;
import java.util.logging.Level;
import java.util.logging.Logger;
import net.sf.appia.core.AppiaEventException;
import net.sf.appia.core.Channel;
import net.sf.appia.core.Event;
import net.sf.appia.core.Layer;
import net.sf.appia.core.Session;
import org.vanilladb.comm.process.ProcessList;
import org.vanilladb.comm.process.ProcessState;
import org.vanilladb.comm.protocols.events.ProcessListInit;
import org.vanilladb.comm.protocols.tcpfd.AllProcessesReady;
import org.vanilladb.comm.protocols.totalorderappl.TotalOrderMessages;
import org.vanilladb.comm.protocols.totalorderappl.TotalOrderRequest;
import org.vanilladb.comm.protocols.zabacceptance.ZabAccept;
import org.vanilladb.comm.protocols.zabacceptance.ZabCacheProposal;
import org.vanilladb.comm.protocols.zabacceptance.ZabCommit;
import org.vanilladb.comm.protocols.zabacceptance.ZabDeny;
import org.vanilladb.comm.protocols.zabelection.LeaderChanged;
import org.vanilladb.comm.protocols.zabelection.LeaderInit;
import org.vanilladb.comm.protocols.zabproposal.ZabProposal;
import org.vanilladb.comm.protocols.zabproposal.ZabProposalId;
import org.vanilladb.comm.protocols.zabproposal.ZabPropose;

public class ZabProposalSession
extends Session {
    private static Logger logger = Logger.getLogger(ZabProposalSession.class.getName());
    private ProcessList processList;
    private int leaderId;
    private int epochId = 0;
    private long lastReceivedProposalSerial = 0L;
    private ZabProposal cachedProposal;
    private boolean hasOngoingProposal;
    private Queue<Serializable> messageQueue = new ArrayDeque<Serializable>();
    private long nextProposalSerial = 1L;
    private long nextMessageStart = 1L;
    private long currentProposingSerial = 1L;
    private int voteCount = 0;

    ZabProposalSession(Layer layer) {
        super(layer);
    }

    @Override
    public void handle(Event event) {
        if (event instanceof ProcessListInit) {
            this.handleProcessListInit((ProcessListInit)event);
        } else if (event instanceof AllProcessesReady) {
            this.handleAllProcessesReady((AllProcessesReady)event);
        } else if (event instanceof LeaderInit) {
            this.handleLeaderInit((LeaderInit)event);
        } else if (event instanceof LeaderChanged) {
            this.handleLeaderChanged((LeaderChanged)event);
        } else if (event instanceof TotalOrderRequest) {
            this.handleTotalOrderRequest((TotalOrderRequest)event);
        } else if (event instanceof ZabCacheProposal) {
            this.handleZabCacheProposal((ZabCacheProposal)event);
        } else if (event instanceof ZabAccept) {
            this.handleZabAccept((ZabAccept)event);
        } else if (event instanceof ZabDeny) {
            this.handleZabDeny((ZabDeny)event);
        } else if (event instanceof ZabCommit) {
            this.handleZabCommit((ZabCommit)event);
        }
    }

    private void handleProcessListInit(ProcessListInit event) {
        if (logger.isLoggable(Level.FINE)) {
            logger.fine("Received ProcessListInit");
        }
        this.processList = event.copyProcessList();
        try {
            event.go();
        }
        catch (AppiaEventException e) {
            e.printStackTrace();
        }
    }

    private void handleAllProcessesReady(AllProcessesReady event) {
        if (logger.isLoggable(Level.FINE)) {
            logger.fine("Received AllProcessesReady");
        }
        for (int i = 0; i < this.processList.getSize(); ++i) {
            this.processList.getProcess(i).setState(ProcessState.CORRECT);
        }
        try {
            event.go();
        }
        catch (AppiaEventException e) {
            e.printStackTrace();
        }
    }

    private void handleLeaderInit(LeaderInit event) {
        if (logger.isLoggable(Level.FINE)) {
            logger.fine("Received LeaderInit, the leader is " + event.getLeaderId());
        }
        this.leaderId = event.getLeaderId();
    }

    private void handleLeaderChanged(LeaderChanged event) {
        if (logger.isLoggable(Level.FINE)) {
            logger.fine("Received LeaderChanged, the new leader is " + event.getNewLeaderId() + ", new epoch " + event.getNewEpochId());
        }
        this.leaderId = event.getNewLeaderId();
        if (event.getNewEpochId() != this.epochId + 1 && logger.isLoggable(Level.SEVERE)) {
            logger.severe("The epoch id is not as we expected. Do we miss something?");
        }
        this.epochId = event.getNewEpochId();
    }

    private void handleTotalOrderRequest(TotalOrderRequest event) {
        if (logger.isLoggable(Level.FINE)) {
            logger.fine("Received TotalOrderRequest");
        }
        if (this.processList.getSelfId() == this.leaderId) {
            this.messageQueue.addAll(event.getCarriedMessages());
            if (!this.hasOngoingProposal) {
                this.propose(event.getChannel());
            }
        } else {
            this.redirectToLeader(event);
        }
    }

    private void handleZabCacheProposal(ZabCacheProposal event) {
        ZabProposal proposal = event.getProposal();
        ZabProposalId id = proposal.getId();
        if (logger.isLoggable(Level.FINE)) {
            logger.fine(String.format("Received ZabCacheProposal (epoch id: %d, proposal serial #: %d)", id.getEpochId(), id.getSerialNumber()));
        }
        if (id.getEpochId() == this.epochId && id.getSerialNumber() > this.lastReceivedProposalSerial) {
            this.lastReceivedProposalSerial = id.getSerialNumber();
            this.cachedProposal = proposal;
        }
    }

    private void handleZabAccept(ZabAccept event) {
        ZabProposalId id = (ZabProposalId)event.getMessage().popObject();
        if (logger.isLoggable(Level.FINE)) {
            logger.fine(String.format("Received ZabAccept from %s (epoch id: %d, proposal serial #: %d, vote #: %d)", event.source, id.getEpochId(), id.getSerialNumber(), this.voteCount));
        }
        if (id.getEpochId() == this.epochId && id.getSerialNumber() == this.currentProposingSerial) {
            ++this.voteCount;
            if (this.voteCount > this.processList.getCorrectCount() / 2 && this.hasOngoingProposal) {
                this.commit(event.getChannel());
                if (!this.messageQueue.isEmpty()) {
                    this.propose(event.getChannel());
                }
            }
        }
    }

    private void handleZabDeny(ZabDeny event) {
        ZabProposalId id = (ZabProposalId)event.getMessage().popObject();
        if (logger.isLoggable(Level.WARNING)) {
            logger.warning(String.format("Received ZabDeny from %s (epoch id: %d, proposal serial #: %d, vote #: %d)", event.source, id.getEpochId(), id.getSerialNumber(), this.voteCount));
        }
    }

    private void handleZabCommit(ZabCommit event) {
        ZabProposalId id = (ZabProposalId)event.getMessage().popObject();
        if (logger.isLoggable(Level.FINE)) {
            logger.fine(String.format("Received ZabCommit (epoch id: %d, serial #: %d)", id.getEpochId(), id.getSerialNumber()));
        }
        if (id.getEpochId() == this.epochId && id.getSerialNumber() == this.lastReceivedProposalSerial) {
            try {
                TotalOrderMessages messages = new TotalOrderMessages(event.getChannel(), this, this.cachedProposal.getMessages(), this.cachedProposal.getMessageStartId());
                messages.init();
                messages.go();
            }
            catch (AppiaEventException e) {
                e.printStackTrace();
            }
        }
    }

    private void propose(Channel channel) {
        ArrayList<Serializable> messageList = new ArrayList<Serializable>();
        Serializable message = this.messageQueue.poll();
        while (message != null) {
            messageList.add(message);
            message = this.messageQueue.poll();
        }
        if (messageList.isEmpty()) {
            return;
        }
        if (logger.isLoggable(Level.FINE)) {
            logger.fine(String.format("Leader proposes (epoch id: %d, serial #: %d)", this.epochId, this.nextMessageStart));
        }
        try {
            ZabProposalId id = new ZabProposalId(this.epochId, this.nextProposalSerial);
            ZabProposal proposal = new ZabProposal(id, this.nextMessageStart, messageList.toArray(new Serializable[messageList.size()]));
            this.currentProposingSerial = this.nextProposalSerial++;
            this.voteCount = 0;
            this.nextMessageStart += (long)messageList.size();
            ZabPropose propose = new ZabPropose(channel, this);
            propose.getMessage().pushObject(proposal);
            propose.init();
            propose.go();
            this.hasOngoingProposal = true;
        }
        catch (AppiaEventException e) {
            e.printStackTrace();
        }
    }

    private void redirectToLeader(TotalOrderRequest request) {
        if (logger.isLoggable(Level.FINE)) {
            logger.fine("Redirect the message to the leader (id = " + this.leaderId + ")");
        }
        try {
            request.source = this.processList.getSelfProcess().getAddress();
            request.dest = this.processList.getProcess(this.leaderId).getAddress();
            request.go();
        }
        catch (AppiaEventException e) {
            e.printStackTrace();
        }
    }

    private void commit(Channel chennel) {
        if (logger.isLoggable(Level.FINE)) {
            logger.fine(String.format("Commit the message (epoch id: %d, proposal serial #: %d, vote #: %d)", this.epochId, this.currentProposingSerial, this.voteCount));
        }
        try {
            ZabCommit commit = new ZabCommit(chennel, this);
            commit.getMessage().pushObject(new ZabProposalId(this.epochId, this.currentProposingSerial));
            commit.init();
            commit.go();
            this.hasOngoingProposal = false;
        }
        catch (AppiaEventException e) {
            e.printStackTrace();
        }
    }
}

