/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.tracking;

import io.fluxcapacitor.common.Interceptor;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.TimingUtils;
import io.fluxcapacitor.common.api.Message;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.javaclient.tracking.ConsumerService;
import io.fluxcapacitor.javaclient.tracking.TrackingConfiguration;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Tracker
implements Runnable,
Registration {
    private static final Logger log = LoggerFactory.getLogger(Tracker.class);
    private final String name;
    private final int channel;
    private final TrackingConfiguration configuration;
    private final Consumer<List<Message>> consumer;
    private final ConsumerService consumerService;
    private final AtomicBoolean running = new AtomicBoolean();

    public Tracker(String name, int channel, TrackingConfiguration configuration, Consumer<List<Message>> consumer, ConsumerService consumerService) {
        this.name = name;
        this.channel = channel;
        this.configuration = configuration;
        this.consumer = Interceptor.join(configuration.getBatchInterceptors()).intercept(consumer);
        this.consumerService = consumerService;
    }

    @Override
    public void run() {
        if (this.running.compareAndSet(false, true)) {
            while (this.running.get()) {
                MessageBatch batch = this.fetch();
                this.process(batch.getMessages(), batch.getSegment());
            }
        }
    }

    public boolean cancel() {
        return this.running.compareAndSet(true, false);
    }

    protected MessageBatch fetch() {
        return (MessageBatch)TimingUtils.retryOnFailure(() -> this.consumerService.read(this.name, this.channel, this.configuration.getMaxFetchBatchSize(), this.configuration.getMaxWaitDuration()), (Duration)this.configuration.getRetryDelay(), e -> this.running.get());
    }

    protected void process(List<Message> messages, int[] segment) {
        if (messages.isEmpty() || !this.running.get()) {
            return;
        }
        if (messages.size() > this.configuration.getMaxConsumerBatchSize()) {
            for (int i = 0; i < messages.size(); i += this.configuration.getMaxConsumerBatchSize()) {
                List<Message> batch = messages.subList(i, Math.min(i + this.configuration.getMaxConsumerBatchSize(), messages.size()));
                this.processBatch(batch, segment);
            }
        } else {
            this.processBatch(messages, segment);
        }
    }

    protected void processBatch(List<Message> batch, int[] segment) {
        try {
            this.consumer.accept(batch);
        }
        catch (Exception e2) {
            log.error("Consumer {} failed to handle batch of {} messages and did not handle exception. Tracker will be stopped.", new Object[]{this.name, batch.size(), e2});
            this.cancel();
            throw e2;
        }
        TimingUtils.retryOnFailure(() -> {
            this.consumerService.storePosition(this.name, segment, ((Message)batch.get(batch.size() - 1)).getIndex());
            return null;
        }, (Duration)this.configuration.getRetryDelay(), e -> this.running.get());
    }
}

