/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.tracking;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.Message;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.javaclient.tracking.ConsumerService;
import io.fluxcapacitor.javaclient.tracking.ProducerService;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

public class InMemoryMessageStore
implements ProducerService,
ConsumerService {
    private final AtomicLong nextIndex = new AtomicLong();
    private final ConcurrentSkipListMap<Long, Message> messageLog = new ConcurrentSkipListMap();
    private final Map<String, Long> consumerTokens = new ConcurrentHashMap<String, Long>();
    private final List<Consumer<Message>> monitors = new CopyOnWriteArrayList<Consumer<Message>>();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Awaitable send(Message ... messages) {
        Arrays.stream(messages).forEach(m -> {
            long index = this.nextIndex.getAndIncrement();
            m.setIndex(Long.valueOf(index));
            this.messageLog.put(index, (Message)m);
        });
        InMemoryMessageStore inMemoryMessageStore = this;
        synchronized (inMemoryMessageStore) {
            this.notifyAll();
        }
        return () -> {};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MessageBatch read(String processor, int channel, int maxSize, Duration maxTimeout) {
        long deadline = System.currentTimeMillis() + maxTimeout.toMillis();
        InMemoryMessageStore inMemoryMessageStore = this;
        synchronized (inMemoryMessageStore) {
            NavigableMap tailMap = Collections.emptyMap();
            while (System.currentTimeMillis() < deadline && (tailMap = this.messageLog.tailMap((Object)this.getLastToken(processor), false)).isEmpty()) {
                try {
                    this.wait(deadline - System.currentTimeMillis());
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                    return new MessageBatch(new int[]{0, 1}, Collections.emptyList(), null);
                }
            }
            ArrayList messages = new ArrayList(tailMap.values());
            Long lastIndex = messages.isEmpty() ? null : ((Message)messages.get(messages.size() - 1)).getIndex();
            return new MessageBatch(new int[]{0, 1}, messages, lastIndex);
        }
    }

    private long getLastToken(String consumer) {
        return this.consumerTokens.computeIfAbsent(consumer, k -> -1L);
    }

    @Override
    public void storePosition(String processor, int[] segment, long lastIndex) {
        this.consumerTokens.put(processor, lastIndex);
    }

    public Registration registerMonitor(Consumer<Message> monitor) {
        this.monitors.add(monitor);
        return () -> this.monitors.remove(monitor);
    }
}

