/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.driver.media;

import io.aeron.driver.DataPacketDispatcher;
import io.aeron.driver.DriverConductorProxy;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.PublicationImage;
import io.aeron.driver.media.DestinationImageControlAddress;
import io.aeron.driver.media.MultiRcvDestination;
import io.aeron.driver.media.ReceiveChannelEndpointThreadLocals;
import io.aeron.driver.media.ReceiveDestinationUdpTransport;
import io.aeron.driver.media.UdpChannel;
import io.aeron.driver.media.UdpChannelTransport;
import io.aeron.driver.status.SystemCounterDescriptor;
import io.aeron.exceptions.AeronException;
import io.aeron.protocol.DataHeaderFlyweight;
import io.aeron.protocol.NakFlyweight;
import io.aeron.protocol.RttMeasurementFlyweight;
import io.aeron.protocol.SetupFlyweight;
import io.aeron.protocol.StatusMessageFlyweight;
import io.aeron.status.ChannelEndpointStatus;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import org.agrona.collections.Hashing;
import org.agrona.collections.Int2IntCounterMap;
import org.agrona.collections.Long2LongCounterMap;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;

public class ReceiveChannelEndpoint
extends UdpChannelTransport {
    private static final long DESTINATION_ADDRESS_TIMEOUT = TimeUnit.SECONDS.toNanos(5L);
    private final DataPacketDispatcher dispatcher;
    private final ByteBuffer smBuffer;
    private final StatusMessageFlyweight statusMessageFlyweight;
    private final ByteBuffer nakBuffer;
    private final NakFlyweight nakFlyweight;
    private final ByteBuffer rttMeasurementBuffer;
    private final RttMeasurementFlyweight rttMeasurementFlyweight;
    private final AtomicCounter shortSends;
    private final AtomicCounter possibleTtlAsymmetry;
    private final AtomicCounter statusIndicator;
    private final Int2IntCounterMap refCountByStreamIdMap = new Int2IntCounterMap(0);
    private final Long2LongCounterMap refCountByStreamIdAndSessionIdMap = new Long2LongCounterMap(0L);
    private final MultiRcvDestination multiRcvDestination;
    private final long receiverId;

    public ReceiveChannelEndpoint(UdpChannel udpChannel, DataPacketDispatcher dispatcher, AtomicCounter statusIndicator, MediaDriver.Context context) {
        super(udpChannel, udpChannel.remoteData(), udpChannel.remoteData(), null, context.errorLog(), context.systemCounters().get(SystemCounterDescriptor.INVALID_PACKETS));
        this.dispatcher = dispatcher;
        this.statusIndicator = statusIndicator;
        this.shortSends = context.systemCounters().get(SystemCounterDescriptor.SHORT_SENDS);
        this.possibleTtlAsymmetry = context.systemCounters().get(SystemCounterDescriptor.POSSIBLE_TTL_ASYMMETRY);
        ReceiveChannelEndpointThreadLocals threadLocals = context.receiveChannelEndpointThreadLocals();
        this.smBuffer = threadLocals.smBuffer();
        this.statusMessageFlyweight = threadLocals.statusMessageFlyweight();
        this.nakBuffer = threadLocals.nakBuffer();
        this.nakFlyweight = threadLocals.nakFlyweight();
        this.rttMeasurementBuffer = threadLocals.rttMeasurementBuffer();
        this.rttMeasurementFlyweight = threadLocals.rttMeasurementFlyweight();
        this.receiverId = threadLocals.receiverId();
        String mode = udpChannel.channelUri().get("control-mode");
        this.multiRcvDestination = "manual".equals(mode) ? new MultiRcvDestination(context.nanoClock(), DESTINATION_ADDRESS_TIMEOUT) : null;
    }

    public int sendTo(ByteBuffer buffer, InetSocketAddress remoteAddress) {
        int remaining = buffer.remaining();
        int bytesSent = 0;
        try {
            if (null != this.sendDatagramChannel) {
                this.sendHook(buffer, remoteAddress);
                if (this.sendDatagramChannel.isOpen()) {
                    bytesSent = this.sendDatagramChannel.send(buffer, remoteAddress);
                }
            }
        }
        catch (IOException ex) {
            ReceiveChannelEndpoint.sendError(remaining, ex, remoteAddress);
        }
        return bytesSent;
    }

    public String originalUriString() {
        return this.udpChannel().originalUriString();
    }

    public int statusIndicatorCounterId() {
        return this.statusIndicator.id();
    }

    public void indicateActive() {
        long currentStatus = this.statusIndicator.get();
        if (currentStatus != 0L) {
            throw new AeronException("channel cannot be registered unless INITIALISING: status=" + ChannelEndpointStatus.status((long)currentStatus));
        }
        this.statusIndicator.setOrdered(1L);
    }

    public void closeStatusIndicator() {
        if (!this.statusIndicator.isClosed()) {
            this.statusIndicator.setOrdered(2L);
            this.statusIndicator.close();
        }
    }

    public void closeMultiRcvDestination() {
        if (null != this.multiRcvDestination) {
            this.multiRcvDestination.close();
        }
    }

    public void openChannel(DriverConductorProxy conductorProxy) {
        if (null == this.multiRcvDestination) {
            if (conductorProxy.notConcurrent()) {
                this.openDatagramChannel(this.statusIndicator);
            } else {
                try {
                    this.openDatagramChannel(this.statusIndicator);
                }
                catch (Exception ex) {
                    conductorProxy.channelEndpointError(this.statusIndicator.id(), ex);
                    throw ex;
                }
            }
        }
    }

    public void possibleTtlAsymmetryEncountered() {
        this.possibleTtlAsymmetry.incrementOrdered();
    }

    public int incRefToStream(int streamId) {
        return this.refCountByStreamIdMap.incrementAndGet(streamId);
    }

    public int decRefToStream(int streamId) {
        int count = this.refCountByStreamIdMap.decrementAndGet(streamId);
        if (-1 == count) {
            this.refCountByStreamIdMap.remove(streamId);
            throw new IllegalStateException("could not find stream Id to decrement: " + streamId);
        }
        return count;
    }

    public long incRefToStreamAndSession(int streamId, int sessionId) {
        return this.refCountByStreamIdAndSessionIdMap.incrementAndGet(Hashing.compoundKey((int)streamId, (int)sessionId));
    }

    public long decRefToStreamAndSession(int streamId, int sessionId) {
        long key = Hashing.compoundKey((int)streamId, (int)sessionId);
        long count = this.refCountByStreamIdAndSessionIdMap.decrementAndGet(key);
        if (-1L == count) {
            this.refCountByStreamIdAndSessionIdMap.remove(key);
            throw new IllegalStateException("could not find stream Id + session Id to decrement: " + streamId + " " + sessionId);
        }
        return count;
    }

    public int streamCount() {
        return this.refCountByStreamIdMap.size() + this.refCountByStreamIdAndSessionIdMap.size();
    }

    public boolean shouldBeClosed() {
        return this.refCountByStreamIdMap.isEmpty() && this.refCountByStreamIdAndSessionIdMap.isEmpty() && !this.statusIndicator.isClosed();
    }

    public boolean hasExplicitControl() {
        return this.udpChannel.hasExplicitControl();
    }

    public InetSocketAddress explicitControlAddress() {
        return this.udpChannel.hasExplicitControl() ? this.udpChannel.localControl() : null;
    }

    public boolean hasDestinationControl() {
        return null != this.multiRcvDestination;
    }

    public void validateAllowsDestinationControl() {
        if (null == this.multiRcvDestination) {
            throw new AeronException("channel does not allow manual control");
        }
    }

    @Override
    public boolean isMulticast() {
        return this.isMulticast(0);
    }

    public boolean isMulticast(int transportIndex) {
        if (null != this.multiRcvDestination) {
            return this.multiRcvDestination.transport(transportIndex).isMulticast();
        }
        if (0 == transportIndex) {
            return super.isMulticast();
        }
        throw new IllegalStateException("isMulticast for unknown index " + transportIndex);
    }

    @Override
    public UdpChannel udpChannel() {
        return this.udpChannel(0);
    }

    public UdpChannel udpChannel(int transportIndex) {
        if (null != this.multiRcvDestination && this.multiRcvDestination.hasDestination(transportIndex)) {
            return this.multiRcvDestination.transport(transportIndex).udpChannel();
        }
        if (0 == transportIndex) {
            return super.udpChannel();
        }
        throw new IllegalStateException("udpChannel for unknown index " + transportIndex);
    }

    @Override
    public int multicastTtl() {
        return this.multicastTtl(0);
    }

    public int multicastTtl(int transportIndex) {
        if (null != this.multiRcvDestination) {
            return this.multiRcvDestination.transport(transportIndex).multicastTtl();
        }
        if (0 == transportIndex) {
            return super.multicastTtl();
        }
        throw new IllegalStateException("multicastTtl for unknown index " + transportIndex);
    }

    public int addDestination(ReceiveDestinationUdpTransport transport) {
        return this.multiRcvDestination.addDestination(transport);
    }

    public void removeDestination(int transportIndex) {
        this.multiRcvDestination.removeDestination(transportIndex);
    }

    public int destination(UdpChannel udpChannel) {
        return this.multiRcvDestination.transport(udpChannel);
    }

    public ReceiveDestinationUdpTransport destination(int transportIndex) {
        return this.multiRcvDestination.transport(transportIndex);
    }

    public int onDataPacket(DataHeaderFlyweight header, UnsafeBuffer buffer, int length, InetSocketAddress srcAddress, int transportIndex) {
        return this.dispatcher.onDataPacket(this, header, buffer, length, srcAddress, transportIndex);
    }

    public void onSetupMessage(SetupFlyweight header, UnsafeBuffer buffer, int length, InetSocketAddress srcAddress, int transportIndex) {
        this.dispatcher.onSetupMessage(this, header, srcAddress, transportIndex);
    }

    public void onRttMeasurement(RttMeasurementFlyweight header, UnsafeBuffer buffer, int length, InetSocketAddress srcAddress, int transportIndex) {
        long requestedReceiverId = header.receiverId();
        if (requestedReceiverId == this.receiverId || requestedReceiverId == 0L) {
            this.dispatcher.onRttMeasurement(this, header, srcAddress, transportIndex);
        }
    }

    public void sendSetupElicitingStatusMessage(int transportIndex, InetSocketAddress controlAddress, int sessionId, int streamId) {
        if (!this.isClosed) {
            this.smBuffer.clear();
            this.statusMessageFlyweight.sessionId(sessionId).streamId(streamId).consumptionTermId(0).consumptionTermOffset(0).receiverWindowLength(0).flags((short)128);
            this.send(this.smBuffer, 36, transportIndex, controlAddress);
        }
    }

    public void sendRttMeasurement(int transportIndex, InetSocketAddress controlAddress, int sessionId, int streamId, long echoTimestampNs, long receptionDelta, boolean isReply) {
        if (!this.isClosed) {
            this.rttMeasurementFlyweight.sessionId(sessionId).streamId(streamId).receiverId(this.receiverId).echoTimestampNs(echoTimestampNs).receptionDelta(receptionDelta).flags(isReply ? (short)128 : 0);
            this.send(this.rttMeasurementBuffer, 40, transportIndex, controlAddress);
        }
    }

    public void sendStatusMessage(DestinationImageControlAddress[] controlAddresses, int sessionId, int streamId, int termId, int termOffset, int window, short flags) {
        if (!this.isClosed) {
            this.smBuffer.clear();
            this.statusMessageFlyweight.sessionId(sessionId).streamId(streamId).consumptionTermId(termId).consumptionTermOffset(termOffset).receiverWindowLength(window).flags(flags);
            this.send(this.smBuffer, 36, controlAddresses);
        }
    }

    public void sendNakMessage(DestinationImageControlAddress[] controlAddresses, int sessionId, int streamId, int termId, int termOffset, int length) {
        if (!this.isClosed) {
            this.nakBuffer.clear();
            this.nakFlyweight.streamId(streamId).sessionId(sessionId).termId(termId).termOffset(termOffset).length(length);
            this.send(this.nakBuffer, 28, controlAddresses);
        }
    }

    public void sendRttMeasurement(DestinationImageControlAddress[] controlAddresses, int sessionId, int streamId, long echoTimestampNs, long receptionDelta, boolean isReply) {
        if (!this.isClosed) {
            this.rttMeasurementFlyweight.sessionId(sessionId).streamId(streamId).receiverId(this.receiverId).echoTimestampNs(echoTimestampNs).receptionDelta(receptionDelta).flags(isReply ? (short)128 : 0);
            this.send(this.rttMeasurementBuffer, 40, controlAddresses);
        }
    }

    public void removePendingSetup(int sessionId, int streamId) {
        this.dispatcher.removePendingSetup(sessionId, streamId);
    }

    public void removePublicationImage(PublicationImage publicationImage) {
        this.dispatcher.removePublicationImage(publicationImage);
    }

    public void addSubscription(int streamId) {
        this.dispatcher.addSubscription(streamId);
    }

    public void addSubscription(int streamId, int sessionId) {
        this.dispatcher.addSubscription(streamId, sessionId);
    }

    public void removeSubscription(int streamId) {
        this.dispatcher.removeSubscription(streamId);
    }

    public void removeSubscription(int streamId, int sessionId) {
        this.dispatcher.removeSubscription(streamId, sessionId);
    }

    public void addPublicationImage(PublicationImage image) {
        this.dispatcher.addPublicationImage(image);
    }

    public void removeCoolDown(int sessionId, int streamId) {
        this.dispatcher.removeCoolDown(sessionId, streamId);
    }

    public boolean shouldElicitSetupMessage() {
        return this.dispatcher.shouldElicitSetupMessage();
    }

    protected void send(ByteBuffer buffer, int bytesToSend, DestinationImageControlAddress[] controlAddresses) {
        int bytesSent = null == this.multiRcvDestination ? this.sendTo(buffer, controlAddresses[0].address) : this.multiRcvDestination.sendToAll(controlAddresses, buffer, 0, bytesToSend);
        if (bytesToSend != bytesSent) {
            this.shortSends.increment();
        }
    }

    protected void send(ByteBuffer buffer, int bytesToSend, int transportIndex, InetSocketAddress remoteAddress) {
        int bytesSent = null == this.multiRcvDestination ? this.sendTo(buffer, remoteAddress) : MultiRcvDestination.sendTo(this.multiRcvDestination.transport(transportIndex), buffer, remoteAddress);
        if (bytesToSend != bytesSent) {
            this.shortSends.increment();
        }
    }
}

