/*
 * Decompiled with CFR 0.152.
 */
package net.sf.appia.protocols.tcpcomplete;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Random;
import javax.management.Attribute;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import net.sf.appia.core.AppiaEventException;
import net.sf.appia.core.AppiaException;
import net.sf.appia.core.Channel;
import net.sf.appia.core.Direction;
import net.sf.appia.core.Event;
import net.sf.appia.core.Layer;
import net.sf.appia.core.Session;
import net.sf.appia.core.events.AppiaMulticast;
import net.sf.appia.core.events.SendableEvent;
import net.sf.appia.core.events.channel.ChannelClose;
import net.sf.appia.core.events.channel.ChannelInit;
import net.sf.appia.core.message.Message;
import net.sf.appia.core.message.MsgBuffer;
import net.sf.appia.management.AppiaManagementException;
import net.sf.appia.management.ManagedSession;
import net.sf.appia.protocols.common.RegisterSocketEvent;
import net.sf.appia.protocols.tcpcomplete.AcceptReader;
import net.sf.appia.protocols.tcpcomplete.CloseTcpSocket;
import net.sf.appia.protocols.tcpcomplete.Measures;
import net.sf.appia.protocols.tcpcomplete.SenderQueue;
import net.sf.appia.protocols.tcpcomplete.SocketInfoContainer;
import net.sf.appia.protocols.tcpcomplete.TcpReader;
import net.sf.appia.protocols.tcpcomplete.TcpTimer;
import net.sf.appia.protocols.tcpcomplete.TcpUndeliveredEvent;
import net.sf.appia.protocols.utils.HostUtils;
import net.sf.appia.protocols.utils.ParseUtils;
import net.sf.appia.xml.interfaces.InitializableSession;
import net.sf.appia.xml.utils.SessionProperties;
import org.apache.log4j.Logger;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class TcpCompleteSession
extends Session
implements InitializableSession,
ManagedSession {
    private static Logger log = Logger.getLogger(TcpCompleteSession.class);
    private static final int DEST_TIMEOUT = 150000;
    private static final int MAX_INACTIVITY = 2;
    private static final int SOTIMEOUT = 5000;
    protected int param_DEST_TIMEOUT = 150000;
    protected int param_MAX_INACTIVITY = 2;
    protected int param_SOTIMEOUT = 5000;
    protected boolean param_CLOSE_INACTIVE_SOCKETS = true;
    protected Hashtable<String, Channel> channels = new Hashtable();
    protected Hashtable<InetSocketAddress, SocketInfoContainer> ourReaders = new Hashtable();
    protected Hashtable<InetSocketAddress, SocketInfoContainer> otherReaders = new Hashtable();
    protected AcceptReader acceptThread;
    protected int ourPort = -1;
    protected Object socketLock = new Object();
    protected Object channelLock = new Object();
    private Channel timerChannel = null;
    private Measures measures = new Measures(this);

    public TcpCompleteSession(Layer layer) {
        super(layer);
    }

    @Override
    public void init(SessionProperties params) {
        if (params.containsKey("reader_sotimeout")) {
            this.param_SOTIMEOUT = params.getInt("reader_sotimeout");
        }
        if (params.containsKey("dest_timeout")) {
            this.param_DEST_TIMEOUT = params.getInt("dest_timeout");
        }
        if (params.containsKey("max_inactivity")) {
            this.param_MAX_INACTIVITY = params.getInt("max_inactivity");
        }
        if (params.containsKey("close_inactive_sockets")) {
            this.param_CLOSE_INACTIVE_SOCKETS = params.getBoolean("close_inactive_sockets");
        }
    }

    @Override
    public void handle(Event e) {
        if (e instanceof SendableEvent) {
            this.handleSendable((SendableEvent)e);
        } else if (e instanceof RegisterSocketEvent) {
            this.handleRegisterSocket((RegisterSocketEvent)e);
        } else if (e instanceof ChannelInit) {
            this.handleChannelInit((ChannelInit)e);
        } else if (e instanceof ChannelClose) {
            this.handleChannelClose((ChannelClose)e);
        } else if (e instanceof TcpTimer) {
            this.handleTcpTimer((TcpTimer)e);
        } else if (e instanceof CloseTcpSocket) {
            this.handleCloseSocket((CloseTcpSocket)e);
        }
    }

    private void handleSendable(SendableEvent e) {
        if (e.getDir() == 1) {
            if (e.getChannel().isStarted()) {
                try {
                    e.go();
                }
                catch (AppiaEventException e1) {
                    e1.printStackTrace();
                }
            }
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)("preparing to send ::" + e + " CHANNEL: " + e.getChannel().getChannelID()));
        }
        byte[] data = this.format(e);
        if (e.dest instanceof AppiaMulticast) {
            Object[] dests = ((AppiaMulticast)e.dest).getDestinations();
            int i = 0;
            while (i < dests.length) {
                if (dests[i] instanceof InetSocketAddress) {
                    this.send(data, (InetSocketAddress)dests[i], e.getChannel());
                } else {
                    this.sendUndelivered(e.getChannel(), (InetSocketAddress)dests[i]);
                }
                ++i;
            }
        } else if (e.dest instanceof InetSocketAddress) {
            this.send(data, (InetSocketAddress)e.dest, e.getChannel());
        } else {
            this.sendUndelivered(e.getChannel(), (InetSocketAddress)e.dest);
        }
        try {
            e.go();
        }
        catch (AppiaEventException ex) {
            ex.printStackTrace();
        }
    }

    protected void handleRegisterSocket(RegisterSocketEvent e) {
        if (log.isDebugEnabled()) {
            log.debug((Object)("TCP Session received RegisterSocketEvent to register a socket in port " + e.port));
        }
        ServerSocket ss = null;
        if (e.localHost == null) {
            e.localHost = HostUtils.getLocalAddress();
        }
        if (this.ourPort < 0) {
            if (e.port == 0) {
                try {
                    ss = new ServerSocket(0, 50, e.localHost);
                }
                catch (IOException ex) {
                    log.debug((Object)("Exception when trying to create a server socket in First Available mode: " + ex));
                }
            } else if (e.port == -1) {
                Random rand = new Random();
                boolean done = false;
                while (!done) {
                    int p = rand.nextInt(Short.MAX_VALUE);
                    try {
                        ss = new ServerSocket(p, 50, e.localHost);
                        done = true;
                    }
                    catch (IllegalArgumentException ex) {
                        log.debug((Object)("Exception when trying to create a server socket in Randomly Available mode: " + ex));
                    }
                    catch (IOException ex) {
                        log.debug((Object)("Exception when trying to create a server socket in Randomly Available mode: " + ex));
                    }
                }
            } else {
                try {
                    ss = new ServerSocket(e.port, 50, e.localHost);
                }
                catch (IOException ex) {
                    log.debug((Object)("Exception when trying to create a server socket using the port: " + e.port + "\nException: " + ex));
                }
            }
        }
        if (ss != null) {
            this.ourPort = ss.getLocalPort();
            if (log.isDebugEnabled()) {
                log.debug((Object)("TCP Session registered a socket in port " + this.ourPort));
            }
            this.acceptThread = new AcceptReader(ss, this, e.getChannel(), this.socketLock);
            Thread t = e.getChannel().getThreadFactory().newThread(this.acceptThread);
            t.setName("TCP Accept thread from port " + this.ourPort);
            t.start();
            e.localHost = ss.getInetAddress();
            e.port = this.ourPort;
            e.error = false;
        } else {
            e.error = true;
            if (this.acceptThread != null && this.acceptThread.getPort() == e.port) {
                e.setErrorCode(-3);
                e.setErrorDescription("Socket already bound in port " + e.port);
            } else {
                e.setErrorCode(-2);
                e.setErrorDescription("Could not create socket. Resource is busy.");
            }
        }
        e.setDir(Direction.invert(e.getDir()));
        e.setSourceSession(this);
        try {
            e.init();
            e.go();
        }
        catch (AppiaEventException ex) {
            ex.printStackTrace();
        }
    }

    private void handleCloseSocket(CloseTcpSocket e) {
        InetSocketAddress dest = (InetSocketAddress)e.getAddress();
        if (this.existsSocket(this.otherReaders, dest)) {
            this.otherReaders.remove(dest).close();
            if (log.isDebugEnabled()) {
                log.debug((Object)("Closing TCP socket for destination: " + dest));
            }
        } else if (this.existsSocket(this.ourReaders, dest)) {
            this.ourReaders.remove(dest).close();
            if (log.isDebugEnabled()) {
                log.debug((Object)("Closing TCP socket for destination: " + dest));
            }
        } else {
            log.debug((Object)("Requested to close socket " + dest + " but the socket does not exist."));
        }
    }

    private void handleChannelInit(ChannelInit e) {
        this.putChannel(e.getChannel());
        this.measures.setTimeProvider(e.getChannel().getTimeProvider());
        try {
            e.go();
        }
        catch (AppiaEventException ex) {
            ex.printStackTrace();
        }
        if (this.timerChannel == null && this.param_CLOSE_INACTIVE_SOCKETS) {
            try {
                TcpTimer timer = new TcpTimer(this.param_DEST_TIMEOUT, e.getChannel(), (Session)this, 0);
                timer.go();
                this.timerChannel = timer.getChannel();
            }
            catch (AppiaEventException ex) {
                ex.printStackTrace();
            }
            catch (AppiaException ex) {
                ex.printStackTrace();
            }
        }
    }

    private void handleChannelClose(ChannelClose e) {
        this.removeChannel(e.getChannel());
        if (this.channels.size() == 0) {
            log.warn((Object)"No more channels. Cleaning sockets.");
            this.acceptThread.setRunning(false);
            for (SocketInfoContainer comm : this.ourReaders.values()) {
                comm.close();
            }
            this.ourReaders.clear();
            for (SocketInfoContainer comm : this.otherReaders.values()) {
                comm.close();
            }
            this.otherReaders.clear();
        } else if (this.timerChannel != null && e.getChannel().getChannelID().equals(this.timerChannel.getChannelID())) {
            try {
                this.timerChannel = this.channels.values().iterator().next();
                TcpTimer timer = new TcpTimer(this.param_DEST_TIMEOUT, this.timerChannel, (Session)this, 0);
                timer.go();
            }
            catch (Exception ex) {
                this.timerChannel = null;
                ex.printStackTrace();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleTcpTimer(TcpTimer e) {
        try {
            e.go();
        }
        catch (AppiaEventException e1) {
            e1.printStackTrace();
        }
        Object object = this.socketLock;
        synchronized (object) {
            TcpReader reader;
            Iterator<SocketInfoContainer> it = this.ourReaders.values().iterator();
            while (it.hasNext()) {
                reader = it.next().reader;
                if (reader.sumInactiveCounter() <= this.param_MAX_INACTIVITY) continue;
                reader.setRunning(false);
                it.remove();
            }
            it = this.otherReaders.values().iterator();
            while (it.hasNext()) {
                reader = it.next().reader;
                if (reader.sumInactiveCounter() <= this.param_MAX_INACTIVITY) continue;
                reader.setRunning(false);
                it.remove();
            }
        }
    }

    protected void send(byte[] data, InetSocketAddress dest, Channel channel) {
        block10: {
            SocketInfoContainer container = null;
            try {
                if (this.existsSocket(this.ourReaders, dest)) {
                    container = this.getSocket(this.ourReaders, dest);
                    if (log.isDebugEnabled()) {
                        log.debug((Object)"our socket, sending...");
                    }
                } else if (this.existsSocket(this.otherReaders, dest)) {
                    container = this.getSocket(this.otherReaders, dest);
                    if (log.isDebugEnabled()) {
                        log.debug((Object)"other socket, sending...");
                    }
                } else {
                    container = this.createSocket(this.ourReaders, dest, channel);
                    if (log.isDebugEnabled()) {
                        log.debug((Object)"created new socket, sending...");
                    }
                }
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Adding to socket Queue of " + container.sender + " Queue has now #Items: " + container.sender.getQueue().getSize()));
                }
                this.measures.countBytesDown(data.length);
                this.measures.countMessagesDown(1);
                container.sender.getQueue().add(new MessageContainer(data, dest, channel));
            }
            catch (IOException ex) {
                if (!log.isDebugEnabled()) break block10;
                ex.printStackTrace();
                log.debug((Object)("Node " + dest + " failed."));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean existsSocket(Hashtable<InetSocketAddress, SocketInfoContainer> hr, InetSocketAddress iwp) {
        Object object = this.socketLock;
        synchronized (object) {
            block4: {
                if (!hr.containsKey(iwp)) break block4;
                return true;
            }
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected SocketInfoContainer getSocket(Hashtable<InetSocketAddress, SocketInfoContainer> hm, InetSocketAddress iwp) {
        Object object = this.socketLock;
        synchronized (object) {
            SocketInfoContainer container = hm.get(iwp);
            return container;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected SocketInfoContainer createSocket(Hashtable<InetSocketAddress, SocketInfoContainer> hr, InetSocketAddress iwp, Channel channel) throws IOException {
        Object object = this.socketLock;
        synchronized (object) {
            Socket newSocket = null;
            newSocket = new Socket(iwp.getAddress(), iwp.getPort());
            newSocket.setTcpNoDelay(true);
            byte[] bPort = ParseUtils.intToByteArray(this.ourPort);
            newSocket.getOutputStream().write(bPort);
            if (log.isDebugEnabled()) {
                log.debug((Object)("Sending our original port " + this.ourPort));
            }
            return this.addSocket(hr, iwp, newSocket, channel);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected SocketInfoContainer addSocket(Hashtable<InetSocketAddress, SocketInfoContainer> hr, InetSocketAddress iwp, Socket socket, Channel channel) {
        Object object = this.socketLock;
        synchronized (object) {
            TcpReader reader = new TcpReader(socket, this, this.ourPort, iwp.getPort(), channel, this.measures);
            Thread tr = channel.getThreadFactory().newThread(reader);
            TcpSender sender = new TcpSender(socket, new SenderQueue<MessageContainer>());
            Thread ts = channel.getThreadFactory().newThread(sender);
            SocketInfoContainer container = new SocketInfoContainer(reader, sender);
            tr.setName("TCP reader thread [" + iwp + "]");
            tr.start();
            ts.setName("TCP sender thread [" + iwp + "]");
            ts.start();
            hr.put(iwp, container);
            return container;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void removeSocket(InetSocketAddress iwp) {
        Object object = this.socketLock;
        synchronized (object) {
            if (this.existsSocket(this.ourReaders, iwp)) {
                this.ourReaders.remove(iwp).close();
            } else if (this.existsSocket(this.otherReaders, iwp)) {
                this.otherReaders.remove(iwp).close();
            } else if (log.isDebugEnabled()) {
                log.debug((Object)"No socket to remove.");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Channel getChannel(String channelName) {
        Object object = this.channelLock;
        synchronized (object) {
            return this.channels.get(channelName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void putChannel(Channel channel) {
        Object object = this.channelLock;
        synchronized (object) {
            this.channels.put(channel.getChannelID(), channel);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void removeChannel(Channel channel) {
        Object object = this.channelLock;
        synchronized (object) {
            this.channels.remove(channel.getChannelID());
        }
    }

    protected byte[] format(SendableEvent e) {
        MsgBuffer mbuf = new MsgBuffer();
        Message msg = e.getMessage();
        byte[] eventType = e.getClass().getName().getBytes();
        byte[] channelID = e.getChannel().getChannelID().getBytes();
        mbuf.len = channelID.length;
        msg.push(mbuf);
        System.arraycopy(channelID, 0, mbuf.data, mbuf.off, mbuf.len);
        mbuf.len = 4;
        msg.push(mbuf);
        ParseUtils.intToByteArray(channelID.length, mbuf.data, mbuf.off);
        mbuf.len = eventType.length;
        msg.push(mbuf);
        System.arraycopy(eventType, 0, mbuf.data, mbuf.off, mbuf.len);
        mbuf.len = 4;
        msg.push(mbuf);
        ParseUtils.intToByteArray(eventType.length, mbuf.data, mbuf.off);
        mbuf.len = 4;
        msg.push(mbuf);
        ParseUtils.intToByteArray(msg.length() - 4, mbuf.data, mbuf.off);
        return msg.toByteArray();
    }

    protected void sendASyncUndelivered(Channel channel, InetSocketAddress who) {
        try {
            new TcpUndeliveredEvent(channel, 1, this, who).asyncGo(channel, 1);
        }
        catch (AppiaEventException exception) {
            exception.printStackTrace();
        }
        this.removeSocket(who);
    }

    protected void sendUndelivered(Channel channel, InetSocketAddress who) {
        try {
            new TcpUndeliveredEvent(channel, 1, this, who).go();
        }
        catch (AppiaEventException exception) {
            exception.printStackTrace();
        }
        this.removeSocket(who);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected int getGlobalQueueSize() {
        int sum = 0;
        Object object = this.socketLock;
        synchronized (object) {
            Enumeration<SocketInfoContainer> e = this.ourReaders.elements();
            while (e.hasMoreElements()) {
                sum += e.nextElement().sender.getQueue().getSize();
            }
            e = this.otherReaders.elements();
            while (e.hasMoreElements()) {
                sum += e.nextElement().sender.getQueue().getSize();
            }
        }
        return sum;
    }

    @Override
    public Object attributeGetter(String attribute, MBeanAttributeInfo info) throws AppiaManagementException {
        return this.measures.attributeGetter(attribute, info);
    }

    @Override
    public void attributeSetter(Attribute attribute, MBeanAttributeInfo info) throws AppiaManagementException {
        this.measures.attributeSetter(attribute, info);
    }

    @Override
    public MBeanAttributeInfo[] getAttributes(String sessionID) {
        return this.measures.getAttributes(sessionID);
    }

    @Override
    public MBeanOperationInfo[] getOperations(String sessionID) {
        return null;
    }

    @Override
    public Object invoke(String action, MBeanOperationInfo info, Object[] params, String[] signature) throws AppiaManagementException {
        return this.measures.invoke(action, info, params, signature);
    }

    class MessageContainer {
        byte[] data;
        InetSocketAddress who;
        Channel channel;

        MessageContainer(byte[] b, InetSocketAddress sa, Channel c) {
            this.data = b;
            this.who = sa;
            this.channel = c;
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    class TcpSender
    implements Runnable {
        private Socket socket;
        private SenderQueue<MessageContainer> queue;
        private boolean running = true;

        TcpSender(Socket s, SenderQueue<MessageContainer> sq) {
            this.socket = s;
            this.queue = sq;
        }

        @Override
        public void run() {
            MessageContainer container = null;
            while (this.isRunning()) {
                container = this.queue.removeNext();
                if (container == null) continue;
                try {
                    if (log.isDebugEnabled()) {
                        log.debug((Object)("Sending message to the socket for " + container.who + " with " + container.data.length + " bytes"));
                    }
                    this.socket.getOutputStream().write(container.data);
                    if (log.isDebugEnabled()) {
                        log.debug((Object)"Flushing data...");
                    }
                    this.socket.getOutputStream().flush();
                    if (!log.isDebugEnabled()) continue;
                    log.debug((Object)"Flushing done...");
                }
                catch (IOException e) {
                    if (!this.isRunning()) continue;
                    TcpCompleteSession.this.sendASyncUndelivered(container.channel, container.who);
                    if (!log.isDebugEnabled()) continue;
                    log.debug((Object)"Exception when send ASyncUndelivered:\n");
                    e.printStackTrace();
                }
            }
            try {
                if (log.isDebugEnabled()) {
                    log.debug((Object)("Closing socket " + this.socket));
                }
                this.socket.close();
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }

        SenderQueue<MessageContainer> getQueue() {
            return this.queue;
        }

        public synchronized void setRunning(boolean r) {
            this.running = r;
            if (!this.running && !this.socket.isClosed()) {
                try {
                    this.socket.shutdownOutput();
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        private synchronized boolean isRunning() {
            return this.running;
        }
    }
}

