/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.client.proxy;

import com.hazelcast.client.config.ClientReliableTopicConfig;
import com.hazelcast.client.impl.HazelcastClientInstanceImpl;
import com.hazelcast.client.impl.MemberImpl;
import com.hazelcast.client.spi.ClientProxy;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Member;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
import com.hazelcast.logging.ILogger;
import com.hazelcast.monitor.LocalTopicStats;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.ringbuffer.OverflowPolicy;
import com.hazelcast.ringbuffer.ReadResultSet;
import com.hazelcast.ringbuffer.Ringbuffer;
import com.hazelcast.ringbuffer.StaleSequenceException;
import com.hazelcast.spi.exception.DistributedObjectDestroyedException;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.topic.ReliableMessageListener;
import com.hazelcast.topic.TopicOverloadException;
import com.hazelcast.topic.TopicOverloadPolicy;
import com.hazelcast.topic.impl.reliable.ReliableMessageListenerAdapter;
import com.hazelcast.topic.impl.reliable.ReliableTopicMessage;
import com.hazelcast.util.Preconditions;
import com.hazelcast.util.UuidUtil;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

public class ClientReliableTopicProxy<E>
extends ClientProxy
implements ITopic<E> {
    private static final int MAX_BACKOFF = 2000;
    private static final int INITIAL_BACKOFF_MS = 100;
    private final ILogger logger;
    private final ConcurrentMap<String, MessageRunner> runnersMap = new ConcurrentHashMap<String, MessageRunner>();
    private final Ringbuffer ringbuffer;
    private final SerializationService serializationService;
    private final ClientReliableTopicConfig config;
    private final Executor executor;
    private final TopicOverloadPolicy overloadPolicy;

    public ClientReliableTopicProxy(String objectId, HazelcastClientInstanceImpl client) {
        super("hz:impl:reliableTopicService", objectId);
        this.ringbuffer = client.getRingbuffer("_hz_rb_" + objectId);
        this.serializationService = client.getSerializationService();
        this.config = client.getClientConfig().getReliableTopicConfig(objectId);
        this.executor = this.getExecutor(this.config, client);
        this.overloadPolicy = this.config.getTopicOverloadPolicy();
        this.logger = client.getLoggingService().getLogger(this.getClass());
    }

    private Executor getExecutor(ClientReliableTopicConfig config, HazelcastClientInstanceImpl client) {
        Executor executor = config.getExecutor();
        if (executor == null) {
            executor = client.getClientExecutionService();
        }
        return executor;
    }

    public void publish(E payload) {
        try {
            Data data = this.serializationService.toData(payload);
            ReliableTopicMessage message = new ReliableTopicMessage(data, null);
            switch (this.overloadPolicy) {
                case ERROR: {
                    this.addOrFail(message);
                    break;
                }
                case DISCARD_OLDEST: {
                    this.addOrOverwrite(message);
                    break;
                }
                case DISCARD_NEWEST: {
                    this.ringbuffer.addAsync((Object)message, OverflowPolicy.FAIL).get();
                    break;
                }
                case BLOCK: {
                    this.addWithBackoff(message);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Unknown overloadPolicy:" + this.overloadPolicy);
                }
            }
        }
        catch (RuntimeException e) {
            throw e;
        }
        catch (Exception e) {
            throw new HazelcastException("Failed to publish message: " + payload + " to topic:" + this.name, (Throwable)e);
        }
    }

    private Long addOrOverwrite(ReliableTopicMessage message) throws Exception {
        return (Long)this.ringbuffer.addAsync((Object)message, OverflowPolicy.OVERWRITE).get();
    }

    private void addOrFail(ReliableTopicMessage message) throws Exception {
        long sequenceId = (Long)this.ringbuffer.addAsync((Object)message, OverflowPolicy.FAIL).get();
        if (sequenceId == -1L) {
            throw new TopicOverloadException("Failed to publish message: " + message + " on topic:" + this.name);
        }
    }

    private void addWithBackoff(ReliableTopicMessage message) throws Exception {
        long result;
        long timeoutMs = 100L;
        while ((result = ((Long)this.ringbuffer.addAsync((Object)message, OverflowPolicy.FAIL).get()).longValue()) == -1L) {
            TimeUnit.MILLISECONDS.sleep(timeoutMs);
            if ((timeoutMs *= 2L) <= 2000L) continue;
            timeoutMs = 2000L;
        }
    }

    public String addMessageListener(MessageListener<E> listener) {
        Preconditions.checkNotNull(listener, (String)"listener can't be null");
        String id = UuidUtil.newUnsecureUuidString();
        ReliableMessageListener<E> reliableMessageListener = this.toReliableMessageListener(listener);
        MessageRunner runner = new MessageRunner(id, reliableMessageListener);
        this.runnersMap.put(id, runner);
        runner.next();
        return id;
    }

    private ReliableMessageListener<E> toReliableMessageListener(MessageListener<E> listener) {
        if (listener instanceof ReliableMessageListener) {
            return (ReliableMessageListener)listener;
        }
        return new ReliableMessageListenerAdapter(listener);
    }

    public boolean removeMessageListener(String registrationId) {
        Preconditions.checkNotNull((Object)registrationId, (String)"registrationId can't be null");
        MessageRunner runner = (MessageRunner)this.runnersMap.get(registrationId);
        if (runner == null) {
            return false;
        }
        runner.cancel();
        return true;
    }

    public LocalTopicStats getLocalTopicStats() {
        throw new UnsupportedOperationException("Locality is ambiguous for client!!!");
    }

    public Ringbuffer getRingbuffer() {
        return this.ringbuffer;
    }

    public String toString() {
        return "ITopic{name='" + this.name + '\'' + '}';
    }

    class MessageRunner
    implements ExecutionCallback<ReadResultSet<ReliableTopicMessage>> {
        final ReliableMessageListener<E> listener;
        private final String id;
        private long sequence;
        private volatile boolean cancelled;

        public MessageRunner(String id, ReliableMessageListener<E> listener) {
            this.id = id;
            this.listener = listener;
            long initialSequence = listener.retrieveInitialSequence();
            if (initialSequence == -1L) {
                initialSequence = ClientReliableTopicProxy.this.ringbuffer.tailSequence() + 1L;
            }
            this.sequence = initialSequence;
        }

        void next() {
            if (this.cancelled) {
                return;
            }
            ICompletableFuture f = ClientReliableTopicProxy.this.ringbuffer.readManyAsync(this.sequence, 1, ClientReliableTopicProxy.this.config.getReadBatchSize(), null);
            f.andThen((ExecutionCallback)this, ClientReliableTopicProxy.this.executor);
        }

        public void onResponse(ReadResultSet<ReliableTopicMessage> result) {
            for (Object item : result) {
                block4: {
                    ReliableTopicMessage message = (ReliableTopicMessage)item;
                    if (this.cancelled) {
                        return;
                    }
                    try {
                        this.listener.storeSequence(this.sequence);
                        this.process(message);
                    }
                    catch (Throwable t) {
                        if (!this.terminate(t)) break block4;
                        this.cancel();
                        return;
                    }
                }
                ++this.sequence;
            }
            this.next();
        }

        private void process(ReliableTopicMessage message) throws Throwable {
            this.listener.onMessage(this.toMessage(message));
        }

        private Message<E> toMessage(ReliableTopicMessage m) {
            MemberImpl member = null;
            if (m.getPublisherAddress() != null) {
                member = new MemberImpl(m.getPublisherAddress());
            }
            Object payload = ClientReliableTopicProxy.this.serializationService.toObject((Object)m.getPayload());
            return new Message(ClientReliableTopicProxy.this.name, payload, m.getPublishTime(), (Member)member);
        }

        public void onFailure(Throwable t) {
            if (this.cancelled) {
                return;
            }
            if (t instanceof ExecutionException && t.getCause() instanceof StaleSequenceException) {
                long remoteHeadSeq = ClientReliableTopicProxy.this.ringbuffer.headSequence();
                if (this.listener.isLossTolerant()) {
                    if (ClientReliableTopicProxy.this.logger.isFinestEnabled()) {
                        ClientReliableTopicProxy.this.logger.finest("MessageListener " + this.listener + " on topic: " + ClientReliableTopicProxy.this.name + " ran into a stale sequence. " + "Jumping from oldSequence: " + this.sequence + " to sequence: " + remoteHeadSeq);
                    }
                    this.sequence = remoteHeadSeq;
                    this.next();
                    return;
                }
                ClientReliableTopicProxy.this.logger.warning("Terminating MessageListener:" + this.listener + " on topic: " + ClientReliableTopicProxy.this.name + ". " + "Reason: The listener was too slow or the retention period of the message has been violated. " + "head: " + remoteHeadSeq + " sequence:" + this.sequence);
            } else if (t instanceof HazelcastInstanceNotActiveException) {
                if (ClientReliableTopicProxy.this.logger.isFinestEnabled()) {
                    ClientReliableTopicProxy.this.logger.finest("Terminating MessageListener " + this.listener + " on topic: " + ClientReliableTopicProxy.this.name + ". " + " Reason: HazelcastInstance is shutting down");
                }
            } else if (t instanceof DistributedObjectDestroyedException) {
                if (ClientReliableTopicProxy.this.logger.isFinestEnabled()) {
                    ClientReliableTopicProxy.this.logger.finest("Terminating MessageListener " + this.listener + " on topic: " + ClientReliableTopicProxy.this.name + ". " + "Reason: Topic is destroyed");
                }
            } else {
                ClientReliableTopicProxy.this.logger.warning("Terminating MessageListener " + this.listener + " on topic: " + ClientReliableTopicProxy.this.name + ". " + "Reason: Unhandled exception, message: " + t.getMessage(), t);
            }
            this.cancel();
        }

        void cancel() {
            this.cancelled = true;
            ClientReliableTopicProxy.this.runnersMap.remove(this.id);
        }

        private boolean terminate(Throwable failure) {
            if (this.cancelled) {
                return true;
            }
            try {
                boolean terminate = this.listener.isTerminal(failure);
                if (terminate) {
                    ClientReliableTopicProxy.this.logger.warning("Terminating MessageListener " + this.listener + " on topic: " + ClientReliableTopicProxy.this.name + ". " + "Reason: Unhandled exception, message: " + failure.getMessage(), failure);
                } else if (ClientReliableTopicProxy.this.logger.isFinestEnabled()) {
                    ClientReliableTopicProxy.this.logger.finest("MessageListener " + this.listener + " on topic: " + ClientReliableTopicProxy.this.name + " ran into an exception:" + " message:" + failure.getMessage(), failure);
                }
                return terminate;
            }
            catch (Throwable t) {
                ClientReliableTopicProxy.this.logger.warning("Terminating messageListener:" + this.listener + " on topic: " + ClientReliableTopicProxy.this.name + ". " + "Reason: Unhandled exception while calling ReliableMessageListener.isTerminal() method", t);
                return true;
            }
        }
    }
}

