/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.client;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQNoConsumersException;
import org.apache.qpid.client.AMQNoRouteException;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.client.BasicMessageConsumer_0_8;
import org.apache.qpid.client.BasicMessageProducer_0_8;
import org.apache.qpid.client.JMSAMQException;
import org.apache.qpid.client.RejectBehaviour;
import org.apache.qpid.client.TemporaryDestination;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.ReturnMessage;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicAckBody;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.BasicConsumeOkBody;
import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.BasicQosOkBody;
import org.apache.qpid.framing.BasicRecoverBody;
import org.apache.qpid.framing.BasicRecoverOkBody;
import org.apache.qpid.framing.BasicRecoverSyncBody;
import org.apache.qpid.framing.BasicRecoverSyncOkBody;
import org.apache.qpid.framing.BasicRejectBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.ChannelFlowBody;
import org.apache.qpid.framing.ChannelFlowOkBody;
import org.apache.qpid.framing.ExchangeBoundOkBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.ExchangeDeclareOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.QueueBindOkBody;
import org.apache.qpid.framing.QueueDeclareBody;
import org.apache.qpid.framing.QueueDeclareOkBody;
import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.framing.QueueDeleteOkBody;
import org.apache.qpid.framing.TxCommitOkBody;
import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.framing.TxRollbackOkBody;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class AMQSession_0_8
extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> {
    private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
    public static final String QPID_SYNC_AFTER_CLIENT_ACK = "qpid.sync_after_client.ack";
    private final boolean _syncAfterClientAck = Boolean.parseBoolean(System.getProperty("qpid.sync_after_client.ack", "true"));
    private final long _flowControlWaitPeriod = Long.getLong("qpid.flow_control_wait_notify_period", 5000L);
    private final long _flowControlWaitFailure = Long.getLong("qpid.flow_control_wait_failure", 60000L);
    private FlowControlIndicator _flowControl = new FlowControlIndicator();
    private final TopicDestinationCache _topicDestinationCache = new TopicDestinationCache();
    private final QueueDestinationCache _queueDestinationCache = new QueueDestinationCache();

    protected AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark) {
        super(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetchHighMark, defaultPrefetchLowMark);
    }

    AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow) {
        this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
    }

    private ProtocolVersion getProtocolVersion() {
        return this.getProtocolHandler().getProtocolVersion();
    }

    @Override
    protected void acknowledgeImpl() throws JMSException {
        Long tag;
        boolean syncRequired = false;
        while ((tag = this.getUnacknowledgedMessageTags().poll()) != null) {
            this.acknowledgeMessage(tag, false);
            syncRequired = true;
        }
        try {
            if (syncRequired && this._syncAfterClientAck) {
                this.sync();
            }
        }
        catch (AMQException a) {
            throw new JMSAMQException("Failed to sync after acknowledge", (Exception)((Object)a));
        }
    }

    @Override
    public void acknowledgeMessage(long deliveryTag, boolean multiple) {
        BasicAckBody body = this.getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
        AMQFrame ackFrame = body.generateFrame(this.getChannelId());
        if (_logger.isDebugEnabled()) {
            _logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + this.getChannelId());
        }
        this.getProtocolHandler().writeFrame((AMQDataBlock)ackFrame, !this.isTransacted());
        this.getUnacknowledgedMessageTags().remove(deliveryTag);
    }

    @Override
    public void sendQueueBind(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName, AMQDestination dest, boolean nowait) throws AMQException, FailoverException {
        this.getProtocolHandler().syncWrite(this.getProtocolHandler().getMethodRegistry().createQueueBindBody(this.getTicket(), queueName, exchangeName, routingKey, false, arguments).generateFrame(this.getChannelId()), QueueBindOkBody.class);
    }

    @Override
    public void sendClose(long timeout) throws AMQException, FailoverException {
        if (!this.getProtocolHandler().getStateManager().getCurrentState().equals((Object)AMQState.CONNECTION_CLOSED) && !this.getProtocolHandler().getStateManager().getCurrentState().equals((Object)AMQState.CONNECTION_CLOSING)) {
            this.getProtocolHandler().closeSession(this);
            this.getProtocolHandler().syncWrite(this.getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(this.getChannelId()), ChannelCloseOkBody.class, timeout);
        }
    }

    @Override
    public void commitImpl() throws AMQException, FailoverException, TransportException {
        Long tag;
        while ((tag = this.getDeliveredMessageTags().poll()) != null) {
            this.acknowledgeMessage(tag, false);
        }
        AMQProtocolHandler handler = this.getProtocolHandler();
        handler.syncWrite(this.getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(this.getChannelId()), TxCommitOkBody.class);
    }

    @Override
    public void sendCreateQueue(AMQShortString name, boolean autoDelete, boolean durable, boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException {
        FieldTable table = null;
        if (arguments != null && !arguments.isEmpty()) {
            table = new FieldTable();
            for (Map.Entry<String, Object> entry : arguments.entrySet()) {
                table.setObject(entry.getKey(), entry.getValue());
            }
        }
        QueueDeclareBody body = this.getMethodRegistry().createQueueDeclareBody(this.getTicket(), name, false, durable, exclusive, autoDelete, false, table);
        AMQFrame queueDeclare = body.generateFrame(this.getChannelId());
        this.getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
    }

    @Override
    public void sendRecover() throws AMQException, FailoverException {
        this.enforceRejectBehaviourDuringRecover();
        this.getPrefetchedMessageTags().clear();
        this.getUnacknowledgedMessageTags().clear();
        if (this.isStrictAMQP()) {
            BasicRecoverBody body = this.getMethodRegistry().createBasicRecoverBody(false);
            this.getAMQConnection().getProtocolHandler().writeFrame((AMQDataBlock)body.generateFrame(this.getChannelId()));
            _logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
        } else if (this.getProtocolHandler().getProtocolVersion().equals((Object)ProtocolVersion.v8_0)) {
            BasicRecoverBody body = this.getMethodRegistry().createBasicRecoverBody(false);
            this.getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(this.getChannelId()), BasicRecoverOkBody.class);
        } else if (this.getProtocolVersion().equals((Object)ProtocolVersion.v0_9)) {
            BasicRecoverSyncBody body = ((MethodRegistry_0_9)this.getMethodRegistry()).createBasicRecoverSyncBody(false);
            this.getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(this.getChannelId()), BasicRecoverSyncOkBody.class);
        } else if (this.getProtocolVersion().equals((Object)ProtocolVersion.v0_91)) {
            BasicRecoverSyncBody body = ((MethodRegistry_0_91)this.getMethodRegistry()).createBasicRecoverSyncBody(false);
            this.getAMQConnection().getProtocolHandler().syncWrite(body.generateFrame(this.getChannelId()), BasicRecoverSyncOkBody.class);
        } else {
            throw new RuntimeException("Unsupported version of the AMQP Protocol: " + this.getProtocolVersion());
        }
    }

    private void enforceRejectBehaviourDuringRecover() {
        if (_logger.isDebugEnabled()) {
            _logger.debug("Prefetched message: _unacknowledgedMessageTags :" + this.getUnacknowledgedMessageTags());
        }
        ArrayList consumersToCheck = new ArrayList(this.getConsumers().values());
        boolean messageListenerFound = false;
        boolean serverRejectBehaviourFound = false;
        for (BasicMessageConsumer_0_8 consumer : consumersToCheck) {
            if (consumer.isMessageListenerSet()) {
                messageListenerFound = true;
            }
            if (!RejectBehaviour.SERVER.equals((Object)consumer.getRejectBehaviour())) continue;
            serverRejectBehaviourFound = true;
        }
        _logger.debug("about to pre-reject messages for " + consumersToCheck.size() + " consumer(s)");
        if (serverRejectBehaviourFound) {
            switch (this.getAcknowledgeMode()) {
                case 1: 
                case 3: {
                    if (!messageListenerFound) break;
                }
                case 2: {
                    for (Long tag : this.getUnacknowledgedMessageTags()) {
                        this.rejectMessage(tag, false);
                    }
                    break;
                }
            }
        }
    }

    @Override
    public void releaseForRollback() {
        Long tag;
        boolean normalRejectBehaviour = true;
        for (BasicMessageConsumer_0_8 consumer : this.getConsumers().values()) {
            if (!RejectBehaviour.SERVER.equals((Object)consumer.getRejectBehaviour())) continue;
            normalRejectBehaviour = false;
            break;
        }
        while ((tag = this.getDeliveredMessageTags().poll()) != null) {
            this.rejectMessage(tag, normalRejectBehaviour);
        }
    }

    @Override
    public void rejectMessage(long deliveryTag, boolean requeue) {
        if (this.getAcknowledgeMode() == 2 || this.getAcknowledgeMode() == 0 || (this.getAcknowledgeMode() == 1 || this.getAcknowledgeMode() == 3) && this.hasMessageListeners()) {
            if (_logger.isDebugEnabled()) {
                _logger.debug("Rejecting delivery tag:" + deliveryTag + ":SessionHC:" + this.hashCode());
            }
            BasicRejectBody body = this.getMethodRegistry().createBasicRejectBody(deliveryTag, requeue);
            AMQFrame frame = body.generateFrame(this.getChannelId());
            this.getAMQConnection().getProtocolHandler().writeFrame((AMQDataBlock)frame);
        }
    }

    @Override
    public boolean isQueueBound(AMQDestination destination) throws JMSException {
        return this.isQueueBound(destination.getExchangeName(), destination.getAMQQueueName(), destination.getAMQQueueName());
    }

    @Override
    public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey) throws JMSException {
        try {
            AMQMethodEvent response = new FailoverRetrySupport<AMQMethodEvent, AMQException>(new FailoverProtectedOperation<AMQMethodEvent, AMQException>(){

                @Override
                public AMQMethodEvent execute() throws AMQException, FailoverException {
                    AMQFrame boundFrame = AMQSession_0_8.this.getProtocolHandler().getMethodRegistry().createExchangeBoundBody(exchangeName, routingKey, queueName).generateFrame(AMQSession_0_8.this.getChannelId());
                    return AMQSession_0_8.this.getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
                }
            }, this.getAMQConnection()).execute();
            ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody)response.getMethod();
            return responseBody.getReplyCode() == 0;
        }
        catch (AMQException e) {
            throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), (Exception)((Object)e));
        }
    }

    @Override
    public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, boolean nowait, int tag) throws AMQException, FailoverException {
        BasicConsumeBody body = this.getMethodRegistry().createBasicConsumeBody(this.getTicket(), queueName, new AMQShortString(String.valueOf(tag)), consumer.isNoLocal(), consumer.getAcknowledgeMode() == 257, consumer.isExclusive(), nowait, consumer.getArguments());
        AMQFrame jmsConsume = body.generateFrame(this.getChannelId());
        if (nowait) {
            this.getProtocolHandler().writeFrame((AMQDataBlock)jmsConsume);
        } else {
            this.getProtocolHandler().syncWrite(jmsConsume, BasicConsumeOkBody.class);
        }
    }

    @Override
    public void sendExchangeDeclare(AMQShortString name, AMQShortString type, boolean nowait, boolean durable, boolean autoDelete, boolean internal) throws AMQException, FailoverException {
        ExchangeDeclareBody body = this.getMethodRegistry().createExchangeDeclareBody(this.getTicket(), name, type, name.toString().startsWith("amq."), durable, autoDelete, internal, false, null);
        AMQFrame exchangeDeclare = body.generateFrame(this.getChannelId());
        this.getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
    }

    private void sendQueueDeclare(AMQDestination amqd, boolean passive) throws AMQException, FailoverException {
        QueueDeclareBody body = this.getMethodRegistry().createQueueDeclareBody(this.getTicket(), amqd.getAMQQueueName(), passive, amqd.isDurable(), amqd.isExclusive(), amqd.isAutoDelete(), false, null);
        AMQFrame queueDeclare = body.generateFrame(this.getChannelId());
        this.getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
    }

    @Override
    protected AMQShortString declareQueue(final AMQDestination amqd, boolean noLocal, boolean nowait, final boolean passive) throws AMQException {
        final AMQProtocolHandler protocolHandler = this.getProtocolHandler();
        return new FailoverNoopSupport<AMQShortString, AMQException>(new FailoverProtectedOperation<AMQShortString, AMQException>(){

            @Override
            public AMQShortString execute() throws AMQException, FailoverException {
                if (amqd.isNameRequired()) {
                    amqd.setQueueName(protocolHandler.generateQueueName());
                }
                AMQSession_0_8.this.sendQueueDeclare(amqd, passive);
                return amqd.getAMQQueueName();
            }
        }, this.getAMQConnection()).execute();
    }

    @Override
    public void sendQueueDelete(AMQShortString queueName) throws AMQException, FailoverException {
        QueueDeleteBody body = this.getMethodRegistry().createQueueDeleteBody(this.getTicket(), queueName, false, false, true);
        AMQFrame queueDeleteFrame = body.generateFrame(this.getChannelId());
        this.getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
    }

    @Override
    public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException {
        ChannelFlowBody body = this.getMethodRegistry().createChannelFlowBody(!suspend);
        AMQFrame channelFlowFrame = body.generateFrame(this.getChannelId());
        this.getAMQConnection().getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
    }

    @Override
    public BasicMessageConsumer_0_8 createMessageConsumer(AMQDestination destination, int prefetchHigh, int prefetchLow, boolean noLocal, boolean exclusive, String messageSelector, FieldTable arguments, boolean noConsume, boolean autoClose) throws JMSException {
        return new BasicMessageConsumer_0_8(this.getChannelId(), this.getAMQConnection(), destination, messageSelector, noLocal, this.getMessageFactoryRegistry(), this, arguments, prefetchHigh, prefetchLow, exclusive, this.getAcknowledgeMode(), noConsume, autoClose);
    }

    @Override
    public BasicMessageProducer_0_8 createMessageProducer(Destination destination, Boolean mandatory, Boolean immediate, long producerId) throws JMSException {
        try {
            return new BasicMessageProducer_0_8(this.getAMQConnection(), (AMQDestination)destination, this.isTransacted(), this.getChannelId(), this, this.getProtocolHandler(), producerId, immediate, mandatory);
        }
        catch (AMQException e) {
            JMSException ex = new JMSException("Error creating producer");
            ex.initCause((Throwable)e);
            ex.setLinkedException((Exception)((Object)e));
            throw ex;
        }
    }

    @Override
    public void messageReceived(UnprocessedMessage message) {
        if (message instanceof ReturnMessage) {
            this.returnBouncedMessage((ReturnMessage)message);
        } else {
            super.messageReceived(message);
        }
    }

    private void returnBouncedMessage(final ReturnMessage msg) {
        this.getAMQConnection().performConnectionTask(new Runnable(){

            public void run() {
                try {
                    AbstractJMSMessage bouncedMessage = AMQSession_0_8.this.getMessageFactoryRegistry().createMessage(0L, false, msg.getExchange(), msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies(), AMQSession_0_8.this._queueDestinationCache, AMQSession_0_8.this._topicDestinationCache);
                    AMQConstant errorCode = AMQConstant.getConstant((int)msg.getReplyCode());
                    AMQShortString reason = msg.getReplyText();
                    _logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
                    if (errorCode == AMQConstant.NO_CONSUMERS) {
                        AMQSession_0_8.this.getAMQConnection().exceptionReceived((Throwable)((Object)new AMQNoConsumersException("Error: " + reason, bouncedMessage, null)));
                    } else if (errorCode == AMQConstant.NO_ROUTE) {
                        AMQSession_0_8.this.getAMQConnection().exceptionReceived((Throwable)((Object)new AMQNoRouteException("Error: " + reason, bouncedMessage, null)));
                    } else {
                        AMQSession_0_8.this.getAMQConnection().exceptionReceived((Throwable)new AMQUndeliveredException(errorCode, "Error: " + reason, (Object)bouncedMessage, null));
                    }
                }
                catch (Exception e) {
                    _logger.error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", (Throwable)e);
                }
            }
        });
    }

    @Override
    public void sendRollback() throws AMQException, FailoverException {
        TxRollbackBody body = this.getMethodRegistry().createTxRollbackBody();
        AMQFrame frame = body.generateFrame(this.getChannelId());
        this.getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class);
    }

    public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException {
        new FailoverRetrySupport<Object, AMQException>(new FailoverProtectedOperation<Object, AMQException>(){

            @Override
            public Object execute() throws AMQException, FailoverException {
                BasicQosBody basicQosBody = AMQSession_0_8.this.getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
                AMQSession_0_8.this.getProtocolHandler().syncWrite(basicQosBody.generateFrame(AMQSession_0_8.this.getChannelId()), BasicQosOkBody.class);
                return null;
            }
        }, this.getAMQConnection()).execute();
    }

    public DestinationCache<AMQQueue> getQueueDestinationCache() {
        return this._queueDestinationCache;
    }

    public DestinationCache<AMQTopic> getTopicDestinationCache() {
        return this._topicDestinationCache;
    }

    @Override
    protected Long requestQueueDepth(AMQDestination amqd, boolean sync) throws AMQException, FailoverException {
        AMQFrame queueDeclare = this.getMethodRegistry().createQueueDeclareBody(this.getTicket(), amqd.getAMQQueueName(), true, amqd.isDurable(), amqd.isExclusive(), amqd.isAutoDelete(), false, null).generateFrame(this.getChannelId());
        QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
        this.getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
        return okHandler.getMessageCount();
    }

    @Override
    protected boolean tagLE(long tag1, long tag2) {
        return tag1 <= tag2;
    }

    @Override
    protected boolean updateRollbackMark(long currentMark, long deliveryTag) {
        return false;
    }

    @Override
    public AMQMessageDelegateFactory getMessageDelegateFactory() {
        return AMQMessageDelegateFactory.FACTORY_0_8;
    }

    @Override
    public void sync() throws AMQException {
        this.declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false);
    }

    @Override
    public void resolveAddress(AMQDestination dest, boolean isConsumer, boolean noLocal) throws AMQException {
        throw new UnsupportedOperationException("The new addressing based syntax is not supported for AMQP 0-8/0-9 versions");
    }

    @Override
    protected void flushAcknowledgments() {
    }

    @Override
    protected void deleteTemporaryDestination(TemporaryDestination amqQueue) throws JMSException {
    }

    @Override
    public boolean isQueueBound(String exchangeName, String queueName, String bindingKey, Map<String, Object> args) throws JMSException {
        return this.isQueueBound(exchangeName == null ? null : new AMQShortString(exchangeName), queueName == null ? null : new AMQShortString(queueName), bindingKey == null ? null : new AMQShortString(bindingKey));
    }

    private AMQProtocolHandler getProtocolHandler() {
        return this.getAMQConnection().getProtocolHandler();
    }

    public MethodRegistry getMethodRegistry() {
        MethodRegistry methodRegistry = this.getProtocolHandler().getMethodRegistry();
        return methodRegistry;
    }

    @Override
    public AMQException getLastException() {
        AMQStateManager manager = this.getProtocolHandler().getStateManager();
        Exception e = manager.getLastException();
        if (manager.getCurrentState().equals((Object)AMQState.CONNECTION_CLOSED) && e != null) {
            if (e instanceof AMQException) {
                return (AMQException)((Object)e);
            }
            AMQException amqe = new AMQException(AMQConstant.getConstant((int)AMQConstant.INTERNAL_ERROR.getCode()), e.getMessage(), e.getCause());
            return amqe;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isFlowBlocked() {
        FlowControlIndicator flowControlIndicator = this._flowControl;
        synchronized (flowControlIndicator) {
            return !this._flowControl.getFlowControl();
        }
    }

    @Override
    public void setFlowControl(boolean active) {
        this._flowControl.setFlowControl(active);
        if (_logger.isInfoEnabled()) {
            _logger.info("Broker enforced flow control " + (active ? "no longer in effect" : "has been enforced"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void checkFlowControl() throws InterruptedException, JMSException {
        long expiryTime = 0L;
        FlowControlIndicator flowControlIndicator = this._flowControl;
        synchronized (flowControlIndicator) {
            while (!this._flowControl.getFlowControl() && (expiryTime == 0L ? System.currentTimeMillis() + this._flowControlWaitFailure : expiryTime) >= System.currentTimeMillis()) {
                this._flowControl.wait(this._flowControlWaitPeriod);
                if (!_logger.isInfoEnabled()) continue;
                _logger.info("Message send delayed by " + (System.currentTimeMillis() + this._flowControlWaitFailure - expiryTime) / 1000L + "s due to broker enforced flow control");
            }
            if (!this._flowControl.getFlowControl()) {
                _logger.error("Message send failed due to timeout waiting on broker enforced flow control");
                throw new JMSException("Unable to send message for " + this._flowControlWaitFailure / 1000L + " seconds due to broker enforced flow control");
            }
        }
    }

    private static final class FlowControlIndicator {
        private volatile boolean _flowControl = true;

        private FlowControlIndicator() {
        }

        public synchronized void setFlowControl(boolean flowControl) {
            this._flowControl = flowControl;
            this.notify();
        }

        public boolean getFlowControl() {
            return this._flowControl;
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private static class QueueDestinationCache
    extends DestinationCache<AMQQueue> {
        private QueueDestinationCache() {
        }

        @Override
        protected AMQQueue newDestination(AMQShortString exchangeName, AMQShortString routingKey) {
            return new AMQQueue(exchangeName, routingKey, routingKey);
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private static class TopicDestinationCache
    extends DestinationCache<AMQTopic> {
        private TopicDestinationCache() {
        }

        @Override
        protected AMQTopic newDestination(AMQShortString exchangeName, AMQShortString routingKey) {
            return new AMQTopic(exchangeName, routingKey, null);
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    public static abstract class DestinationCache<T extends AMQDestination> {
        private final Map<AMQShortString, Map<AMQShortString, T>> cache = new HashMap<AMQShortString, Map<AMQShortString, T>>();

        public T getDestination(AMQShortString exchangeName, AMQShortString routingKey) {
            AMQDestination destination;
            LinkedHashMap routingMap = this.cache.get(exchangeName);
            if (routingMap == null) {
                routingMap = new LinkedHashMap<AMQShortString, T>(){

                    @Override
                    protected boolean removeEldestEntry(Map.Entry<AMQShortString, T> eldest) {
                        return this.size() >= 200;
                    }
                };
                this.cache.put(exchangeName, routingMap);
            }
            if ((destination = (AMQDestination)routingMap.get(routingKey)) == null) {
                destination = this.newDestination(exchangeName, routingKey);
                routingMap.put(routingKey, destination);
            }
            return (T)destination;
        }

        protected abstract T newDestination(AMQShortString var1, AMQShortString var2);
    }

    class QueueDeclareOkHandler
    extends SpecificMethodFrameListener {
        private long _messageCount;
        private long _consumerCount;

        public QueueDeclareOkHandler() {
            super(AMQSession_0_8.this.getChannelId(), QueueDeclareOkBody.class);
        }

        public boolean processMethod(int channelId, AMQMethodBody frame) {
            boolean matches = super.processMethod(channelId, frame);
            if (matches) {
                QueueDeclareOkBody declareOk = (QueueDeclareOkBody)frame;
                this._messageCount = declareOk.getMessageCount();
                this._consumerCount = declareOk.getConsumerCount();
            }
            return matches;
        }

        public long getMessageCount() {
            return this._messageCount;
        }

        public long getConsumerCount() {
            return this._consumerCount;
        }
    }
}

