/*
 * Decompiled with CFR 0.152.
 */
package net.sf.appia.protocols.nakfifo;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Iterator;
import java.util.ListIterator;
import net.sf.appia.core.AppiaError;
import net.sf.appia.core.AppiaEventException;
import net.sf.appia.core.AppiaException;
import net.sf.appia.core.Channel;
import net.sf.appia.core.Event;
import net.sf.appia.core.Layer;
import net.sf.appia.core.Session;
import net.sf.appia.core.events.AppiaMulticast;
import net.sf.appia.core.events.SendableEvent;
import net.sf.appia.core.events.channel.ChannelClose;
import net.sf.appia.core.events.channel.ChannelInit;
import net.sf.appia.protocols.common.FIFOUndeliveredEvent;
import net.sf.appia.protocols.common.SendableNotDeliveredEvent;
import net.sf.appia.protocols.frag.MaxPDUSizeEvent;
import net.sf.appia.protocols.nakfifo.IgnoreEvent;
import net.sf.appia.protocols.nakfifo.MessageUtils;
import net.sf.appia.protocols.nakfifo.NackEvent;
import net.sf.appia.protocols.nakfifo.Nacked;
import net.sf.appia.protocols.nakfifo.NakFifoTimer;
import net.sf.appia.protocols.nakfifo.Peer;
import net.sf.appia.protocols.nakfifo.PingEvent;
import net.sf.appia.xml.interfaces.InitializableSession;
import net.sf.appia.xml.utils.SessionProperties;
import org.apache.log4j.Logger;

public class NakFifoSession
extends Session
implements InitializableSession {
    private static Logger log = Logger.getLogger(NakFifoSession.class);
    public static final long DEFAULT_TIMER_PERIOD = 5000L;
    public static final long DEFAULT_RESEND_TIME = 10000L;
    public static final long DEFAULT_MAX_APPL_TIME = 180000L;
    public static final long DEFAULT_MAX_RECV_TIME = 60000L;
    public static final long DEFAULT_MAX_SENT_TIME = 45000L;
    private long param_TIMER_PERIOD = 5000L;
    private long param_RESEND_NACK_ROUNDS = 10000L / this.param_TIMER_PERIOD;
    private long param_MAX_APPL_ROUNDS = 180000L / this.param_TIMER_PERIOD;
    private long param_MAX_RECV_ROUNDS = 60000L / this.param_TIMER_PERIOD;
    private long param_MAX_SENT_ROUNDS = 45000L / this.param_TIMER_PERIOD;
    private HashMap peers = new HashMap();
    private Channel timerChannel = null;
    private MessageUtils utils = new MessageUtils();
    public static final boolean debugFull = true;
    public static final int debugListLimit = 10;

    public NakFifoSession(Layer layer) {
        super(layer);
    }

    public void init(SessionProperties params) {
        if (params.containsKey("timer_period")) {
            this.param_TIMER_PERIOD = params.getLong("timer_period");
        }
        if (params.containsKey("resend_nack_time")) {
            this.param_RESEND_NACK_ROUNDS = params.getLong("resend_nack_time") / this.param_TIMER_PERIOD;
        }
        if (params.containsKey("max_appl_time")) {
            this.param_MAX_APPL_ROUNDS = params.getLong("max_appl_time") / this.param_TIMER_PERIOD;
        }
        if (params.containsKey("max_recv_time")) {
            this.param_MAX_RECV_ROUNDS = params.getLong("max_recv_time") / this.param_TIMER_PERIOD;
        }
        if (params.containsKey("max_sent_time")) {
            this.param_MAX_SENT_ROUNDS = params.getLong("max_sent_time") / this.param_TIMER_PERIOD;
        }
    }

    public void handle(Event event) {
        if (event instanceof NackEvent) {
            this.handleNack((NackEvent)event);
            return;
        }
        if (event instanceof IgnoreEvent) {
            this.handleIgnore((IgnoreEvent)event);
            return;
        }
        if (event instanceof PingEvent) {
            this.handlePing((PingEvent)event);
            return;
        }
        if (event instanceof NakFifoTimer) {
            this.handleNakFifoTimer((NakFifoTimer)event);
            return;
        }
        if (event instanceof SendableEvent) {
            this.handleSendable((SendableEvent)event);
            return;
        }
        if (event instanceof SendableNotDeliveredEvent) {
            this.handleSendableNotDelivered((SendableNotDeliveredEvent)event);
            return;
        }
        if (event instanceof ChannelInit) {
            this.handleChannelInit((ChannelInit)event);
            return;
        }
        if (event instanceof ChannelClose) {
            this.handleChannelClose((ChannelClose)event);
            return;
        }
        if (event instanceof MaxPDUSizeEvent) {
            this.handleMaxPDUSize((MaxPDUSizeEvent)event);
            return;
        }
        log.error((Object)("Unwanted event (\"" + event.getClass().getName() + "\") received. Continued..."));
        try {
            event.go();
        }
        catch (AppiaEventException ex) {
            ex.printStackTrace();
        }
    }

    private void handleChannelInit(ChannelInit ev) {
        try {
            ev.go();
        }
        catch (AppiaEventException ex) {
            ex.printStackTrace();
        }
        if (this.timerChannel == null) {
            this.sendTimer(ev.getChannel());
        }
        log.debug((Object)("Params:\n\tTIMER_PERIOD=" + this.param_TIMER_PERIOD + "\n\tMAX_APPL_ROUNDS=" + this.param_MAX_APPL_ROUNDS + "\n\tMAX_RECV_ROUNDS=" + this.param_MAX_RECV_ROUNDS + "\n\tMAX_SENT_ROUNDS=" + this.param_MAX_SENT_ROUNDS + "\n\tRESEND_NACK_ROUNDS=" + this.param_RESEND_NACK_ROUNDS));
    }

    private void handleChannelClose(ChannelClose ev) {
        try {
            ev.go();
        }
        catch (AppiaEventException ex) {
            ex.printStackTrace();
        }
        if (ev.getChannel() == this.timerChannel) {
            this.timerChannel = null;
            for (Peer peer : this.peers.values()) {
                if (peer.last_channel == null) continue;
                this.sendTimer(peer.last_channel);
                if (this.timerChannel == null) continue;
                return;
            }
            log.warn((Object)"Unable to send timer. Corret operation is not garanteed");
        }
    }

    private void handleMaxPDUSize(MaxPDUSizeEvent event) {
        if (event.getDir() == 1) {
            event.pduSize -= 9;
        }
        try {
            event.go();
        }
        catch (AppiaEventException e) {
            e.printStackTrace();
        }
    }

    private void handleSendableNotDelivered(SendableNotDeliveredEvent ev) {
        try {
            FIFOUndeliveredEvent event = new FIFOUndeliveredEvent(ev.getChannel(), this, ev.getEvent());
            event.go();
        }
        catch (AppiaEventException ex) {
            ex.printStackTrace();
        }
    }

    private void handleSendable(SendableEvent ev) {
        if (ev.getDir() == 1) {
            this.receive(ev);
            return;
        }
        if (ev.getDir() == -1) {
            if (ev.dest instanceof AppiaMulticast) {
                Object[] dests = ((AppiaMulticast)ev.dest).getDestinations();
                int i = 0;
                while (i < dests.length) {
                    this.send(ev, dests[i]);
                    ++i;
                }
                return;
            }
            if (ev.dest instanceof InetSocketAddress && ((InetSocketAddress)ev.dest).getAddress().isMulticastAddress()) {
                log.debug((Object)"Destination is a IP Multicast address. Ignored.");
                ev.getMessage().pushByte((byte)1);
                try {
                    ev.go();
                }
                catch (AppiaEventException ex) {
                    ex.printStackTrace();
                }
                return;
            }
            this.send(ev, ev.dest);
            return;
        }
        log.warn((Object)("Direction is wrong. Discarding event " + ev));
    }

    private void handlePing(PingEvent ev) {
        if (ev.getDir() != 1) {
            log.warn((Object)"Discarding Ping event due to wrong diretion.");
            return;
        }
        this.receive(ev);
    }

    private void handleNack(NackEvent ev) {
        Peer peer = (Peer)this.peers.get(ev.source);
        if (peer == null) {
            peer = this.createPeer(ev.source, ev.getChannel());
            return;
        }
        long first = ev.getMessage().popLong();
        if (first < 0L) {
            log.debug((Object)"Ignoring Nack due to wrong first seq number.");
            return;
        }
        long last = ev.getMessage().popLong();
        if (last < 0L) {
            log.debug((Object)"Ignoring Nack due to wrong last seq number.");
            return;
        }
        if (first > last) {
            log.debug((Object)("Ignoring Nack due to wrong seq numbers (first=" + first + ",last=" + last + ",confirmed=" + peer.last_msg_confirmed + ")."));
            return;
        }
        if (first < peer.first_msg_sent || last > peer.last_msg_sent) {
            log.debug((Object)"Received Nack for message not sent. Restarting communication.");
            this.ignore(peer, ev.getChannel());
            return;
        }
        this.debugPeer(peer, "handleNack(" + first + "," + last + ")");
        if (first <= peer.last_msg_confirmed) {
            if (last <= peer.last_msg_confirmed) {
                log.debug((Object)"Received Nack for messages already confirmed. Discarding.");
                return;
            }
            first = peer.last_msg_confirmed + 1L;
            log.debug((Object)("Received Nack for message already confirmed. Changig first to " + first));
        }
        this.resend(peer, first, last);
    }

    private void handleNakFifoTimer(NakFifoTimer ev) {
        if (ev.getQualifierMode() != 2) {
            return;
        }
        try {
            ev.go();
        }
        catch (AppiaEventException ex) {
            ex.printStackTrace();
        }
        Iterator peers_iter = this.peers.values().iterator();
        while (peers_iter.hasNext()) {
            Peer peer = (Peer)peers_iter.next();
            ++peer.rounds_appl_msg;
            ++peer.rounds_msg_recv;
            ++peer.rounds_msg_sent;
            this.debugPeer(peer, "Timer");
            if (peer.nacked != null) {
                ++peer.nacked.rounds;
                if ((long)peer.nacked.rounds > this.param_RESEND_NACK_ROUNDS) {
                    this.nack(peer, peer.last_msg_delivered >= peer.nacked.first_msg ? peer.last_msg_delivered + 1L : peer.nacked.first_msg, peer.nacked.last_msg, ((SendableEvent)peer.undelivered_msgs.getFirst()).getChannel());
                    peer.nacked.rounds = 0;
                }
            } else if ((long)peer.rounds_appl_msg > this.param_MAX_APPL_ROUNDS) {
                peers_iter.remove();
                peer = null;
            }
            if (peer != null && (long)peer.rounds_msg_recv > this.param_MAX_RECV_ROUNDS) {
                Iterator msgs = peer.unconfirmed_msgs.iterator();
                while (msgs.hasNext()) {
                    this.sendFIFOUndelivered((SendableEvent)msgs.next(), peer.addr);
                }
                peers_iter.remove();
                peer = null;
            }
            if (peer == null || (long)peer.rounds_msg_sent <= this.param_MAX_SENT_ROUNDS) continue;
            try {
                PingEvent e = new PingEvent(peer.last_channel, this);
                e.dest = peer.addr;
                this.send(e, peer.addr);
            }
            catch (AppiaEventException ex) {
                ex.printStackTrace();
                log.warn((Object)"Impossible to send ping.");
            }
        }
    }

    private void handleIgnore(IgnoreEvent ev) {
        Peer peer = (Peer)this.peers.get(ev.source);
        if (peer == null) {
            peer = this.createPeer(ev.source, ev.getChannel());
        }
        this.debugPeer(peer, "handleIgnore");
        peer.last_msg_delivered = ev.getMessage().popLong();
        peer.undelivered_msgs.clear();
        peer.nacked = null;
        peer.rounds_msg_recv = 0;
        peer.last_channel = ev.getChannel();
        log.debug((Object)("Received Ignore from " + peer.addr.toString() + " with value " + peer.last_msg_delivered));
    }

    private void send(SendableEvent event, Object addr) {
        Peer peer = (Peer)this.peers.get(addr);
        if (peer == null) {
            peer = this.createPeer(addr, event.getChannel());
        }
        try {
            SendableEvent ev = (SendableEvent)event.cloneEvent();
            ev.setSourceSession(this);
            ev.init();
            this.utils.pushSeq(ev.getMessage(), peer.last_msg_delivered);
            this.utils.pushSeq(ev.getMessage(), peer.last_msg_sent + 1L);
            ev.getMessage().pushByte((byte)0);
            ev.dest = addr;
            ev.go();
            ++peer.last_msg_sent;
            this.storeUnconfirmed(peer, event);
            peer.rounds_msg_sent = 0;
            if (!(ev instanceof PingEvent)) {
                peer.rounds_appl_msg = 0;
            }
            peer.last_channel = ev.getChannel();
        }
        catch (AppiaEventException ex) {
            ex.printStackTrace();
            log.warn((Object)"To mantain coerence, sending undelivered.");
            this.sendFIFOUndelivered(event, peer.addr);
            return;
        }
        catch (CloneNotSupportedException ex) {
            ex.printStackTrace();
            log.warn((Object)"To mantain coerence, sending undelivered.");
            this.sendFIFOUndelivered(event, peer.addr);
            return;
        }
    }

    private void receive(SendableEvent ev) {
        long seq;
        byte flags = ev.getMessage().popByte();
        if ((flags & 1) != 0) {
            log.debug((Object)"Received msg with ignore flag. Ignoring.");
            try {
                ev.go();
            }
            catch (AppiaEventException ex) {
                ex.printStackTrace();
            }
            return;
        }
        Peer peer = (Peer)this.peers.get(ev.source);
        if (peer == null) {
            peer = this.createPeer(ev.source, ev.getChannel());
        }
        if ((seq = this.utils.popSeq(ev.getMessage(), peer.last_msg_delivered, false)) < 0L) {
            log.debug((Object)("Problems reading sequence number discarding event " + ev + " from " + ev.dest.toString()));
            return;
        }
        long peer_confirmed = this.utils.popSeq(ev.getMessage(), peer.last_msg_confirmed, false);
        if (peer_confirmed < 0L) {
            log.debug((Object)("Problems reading last message received by peer, discarding event " + ev + " from " + ev.dest.toString()));
            return;
        }
        peer.last_channel = ev.getChannel();
        peer.rounds_msg_recv = 0;
        if (!(ev instanceof PingEvent)) {
            peer.rounds_appl_msg = 0;
        }
        if (peer_confirmed >= peer.first_msg_sent && peer_confirmed <= peer.last_msg_sent) {
            if (peer_confirmed > peer.last_msg_confirmed) {
                this.removeUnconfirmed(peer, peer_confirmed);
            }
        } else if (peer_confirmed > 0L && peer_confirmed != peer.last_msg_confirmed) {
            log.debug((Object)("Received wrong peer confirmed number (expected between " + peer.first_msg_sent + " and " + peer.last_msg_sent + ", received " + peer_confirmed + ". Sending Ignore."));
            this.ignore(peer, ev.getChannel());
        }
        if (seq == peer.last_msg_delivered + 1L) {
            try {
                if (!(ev instanceof PingEvent)) {
                    ev.go();
                }
            }
            catch (AppiaEventException ex) {
                ex.printStackTrace();
                return;
            }
            peer.last_msg_delivered = seq;
            if (peer.undelivered_msgs.size() > 0) {
                long undelivered = this.deliverUndelivered(peer);
                this.debugPeer(peer, "receive1(" + seq + "," + undelivered + ")");
                if (peer.nacked != null && peer.last_msg_delivered >= peer.nacked.last_msg) {
                    peer.nacked = null;
                }
                if (peer.nacked == null && undelivered >= 0L) {
                    this.nack(peer, peer.last_msg_delivered + 1L, undelivered - 1L, ev.getChannel());
                }
            }
        } else {
            if (seq <= peer.last_msg_delivered) {
                log.debug((Object)("Received old message from " + peer.addr.toString() + ". Discarding."));
                return;
            }
            this.storeUndelivered(peer, ev, seq);
            if (peer.nacked == null) {
                this.nack(peer, peer.last_msg_delivered + 1L, seq - 1L, ev.getChannel());
            }
        }
    }

    private void nack(Peer peer, long first, long last, Channel channel) {
        if (first > last) {
            this.debugPeer(peer, "nack error");
            throw new AppiaError("first(" + first + ") > last(" + last + ")");
        }
        try {
            NackEvent nack = new NackEvent(channel, this);
            nack.getMessage().pushLong(last);
            nack.getMessage().pushLong(first);
            nack.dest = peer.addr;
            nack.go();
            peer.nacked = new Nacked(first, last);
        }
        catch (AppiaEventException ex) {
            ex.printStackTrace();
            log.warn((Object)"Impossible to send Nack. Maybe next time.");
        }
        this.debugPeer(peer, "nack");
    }

    private void ignore(Peer peer, Channel channel) {
        try {
            IgnoreEvent ev = new IgnoreEvent(channel, this);
            ev.getMessage().pushLong(peer.last_msg_confirmed);
            ev.dest = peer.addr;
            ev.go();
            peer.rounds_msg_sent = 0;
            log.debug((Object)("Sent Ignore with " + peer.last_msg_confirmed + " to " + peer.addr));
        }
        catch (AppiaEventException ex) {
            ex.printStackTrace();
            log.warn((Object)"Unable to send Ignore later it will be retransmited.");
        }
    }

    private void storeUnconfirmed(Peer peer, SendableEvent ev) {
        peer.unconfirmed_msgs.addLast(ev);
    }

    private void removeUnconfirmed(Peer peer, long last) {
        while (peer.last_msg_confirmed < last) {
            SendableEvent ev = (SendableEvent)peer.unconfirmed_msgs.removeFirst();
            ++peer.last_msg_confirmed;
        }
    }

    private void resend(Peer peer, long first, long last) {
        ListIterator aux = peer.unconfirmed_msgs.listIterator();
        long seq = peer.last_msg_confirmed;
        while (aux.hasNext() && seq <= last) {
            SendableEvent evaux = (SendableEvent)aux.next();
            if (++seq < first || seq > last) continue;
            try {
                SendableEvent ev = (SendableEvent)evaux.cloneEvent();
                ev.setSourceSession(this);
                ev.init();
                this.utils.pushSeq(ev.getMessage(), peer.last_msg_delivered);
                this.utils.pushSeq(ev.getMessage(), seq);
                ev.getMessage().pushByte((byte)0);
                ev.dest = peer.addr;
                ev.go();
                peer.rounds_msg_sent = 0;
            }
            catch (AppiaEventException ex1) {
                ex1.printStackTrace();
            }
            catch (CloneNotSupportedException ex2) {
                ex2.printStackTrace();
            }
        }
    }

    private void storeUndelivered(Peer peer, SendableEvent ev, long seq) {
        this.utils.pushSeq(ev.getMessage(), seq);
        ListIterator<SendableEvent> aux = peer.undelivered_msgs.listIterator(peer.undelivered_msgs.size());
        while (aux.hasPrevious()) {
            SendableEvent evaux = (SendableEvent)aux.previous();
            long seqaux = this.utils.popSeq(evaux.getMessage(), peer.last_msg_delivered, true);
            if (seqaux == seq) {
                log.debug((Object)"Received undelivered message already stored. Discarding new copy.");
                return;
            }
            if (seqaux >= seq) continue;
            aux.next();
            aux.add(ev);
            return;
        }
        peer.undelivered_msgs.addFirst(ev);
    }

    private long deliverUndelivered(Peer peer) {
        ListIterator aux = peer.undelivered_msgs.listIterator();
        while (aux.hasNext()) {
            SendableEvent evaux = (SendableEvent)aux.next();
            long seqaux = this.utils.popSeq(evaux.getMessage(), peer.last_msg_delivered, true);
            if (seqaux == peer.last_msg_delivered + 1L) {
                try {
                    if (!(evaux instanceof PingEvent)) {
                        evaux.getMessage().discard(4);
                        evaux.go();
                    }
                }
                catch (AppiaEventException ex) {
                    ex.printStackTrace();
                    log.debug((Object)("Discarding event " + evaux + ". This may lead to incoherence."));
                }
                peer.last_msg_delivered = seqaux;
                aux.remove();
                continue;
            }
            return seqaux;
        }
        return -1L;
    }

    private Peer createPeer(Object addr, Channel channel) {
        Peer peer = new Peer(addr, channel.getTimeProvider());
        this.peers.put(peer.addr, peer);
        this.ignore(peer, channel);
        return peer;
    }

    private void sendFIFOUndelivered(SendableEvent ev, Object addr) {
        if (ev instanceof PingEvent) {
            return;
        }
        try {
            SendableEvent clone = (SendableEvent)ev.cloneEvent();
            clone.dest = addr;
            FIFOUndeliveredEvent e = new FIFOUndeliveredEvent(ev.getChannel(), this, clone);
            e.go();
        }
        catch (AppiaEventException ex) {
            ex.printStackTrace();
            log.warn((Object)"Unable to send Undelivered notification. Continuing but problems may happen.");
        }
        catch (CloneNotSupportedException ex) {
            ex.printStackTrace();
            log.warn((Object)"Unable to send Undelivered notification. Continuing but problems may happen.");
        }
    }

    private void sendTimer(Channel channel) {
        try {
            NakFifoTimer timer = new NakFifoTimer(this.param_TIMER_PERIOD, channel, (Session)this, 0);
            timer.go();
            this.timerChannel = channel;
        }
        catch (AppiaException ex) {
            log.error((Object)"Unable to send timer. Corrcet operation of session is not guaranteed.");
        }
    }

    private void debugPeer(Peer peer, String s) {
        if (log.isDebugEnabled()) {
            SendableEvent ev;
            s = "@" + s + " Peer: " + peer.addr.toString() + "\n";
            s = String.valueOf(s) + "\t First Msg Sent: " + peer.first_msg_sent + "\n";
            s = String.valueOf(s) + "\t Last Msg Sent/Confirmed: " + peer.last_msg_sent + "/" + peer.last_msg_confirmed + "\n";
            s = String.valueOf(s) + "\t Last Msg Delivered: " + peer.last_msg_delivered + "\n";
            s = String.valueOf(s) + "\t Rounds Appl/Sent/Recv: " + peer.rounds_appl_msg + "/" + peer.rounds_msg_sent + "/" + peer.rounds_msg_recv + "\n";
            int limit = 10;
            s = String.valueOf(s) + "\t Unconfirmed Msgs:\n";
            ListIterator iter = peer.unconfirmed_msgs.listIterator();
            long l = peer.last_msg_confirmed;
            while (iter.hasNext()) {
                ev = (SendableEvent)iter.next();
                s = String.valueOf(s) + "\t\t " + ++l + ": " + ev + "\n";
                if (--limit > 0) continue;
                s = String.valueOf(s) + "\t\t  ...\n";
                break;
            }
            limit = 10;
            s = String.valueOf(s) + "\t Undelivered Msgs:\n";
            iter = peer.undelivered_msgs.listIterator();
            while (iter.hasNext()) {
                ev = (SendableEvent)iter.next();
                l = this.utils.popSeq(ev.getMessage(), peer.last_msg_delivered, true);
                s = String.valueOf(s) + "\t\t " + l + ": " + ev + "\n";
                if (--limit > 0) continue;
                s = String.valueOf(s) + "\t\t  ...\n";
                break;
            }
            s = String.valueOf(s) + "\t Nacked First/Last/Rounds: ";
            s = peer.nacked == null ? String.valueOf(s) + "null\n" : String.valueOf(s) + peer.nacked.first_msg + "/" + peer.nacked.last_msg + "/" + peer.nacked.rounds + "\n";
            s = String.valueOf(s) + "\t Channel: " + peer.last_channel + "\n";
            log.debug((Object)s);
        }
    }
}

