/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.eventsourcing;

import io.fluxcapacitor.common.handling.Handler;
import io.fluxcapacitor.javaclient.common.Message;
import io.fluxcapacitor.javaclient.common.caching.Cache;
import io.fluxcapacitor.javaclient.common.caching.NoCache;
import io.fluxcapacitor.javaclient.common.model.Model;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingMessage;
import io.fluxcapacitor.javaclient.eventsourcing.Aggregate;
import io.fluxcapacitor.javaclient.eventsourcing.AnnotatedEventSourcingHandler;
import io.fluxcapacitor.javaclient.eventsourcing.EventSourced;
import io.fluxcapacitor.javaclient.eventsourcing.EventSourcing;
import io.fluxcapacitor.javaclient.eventsourcing.EventSourcingException;
import io.fluxcapacitor.javaclient.eventsourcing.EventSourcingHandler;
import io.fluxcapacitor.javaclient.eventsourcing.EventStore;
import io.fluxcapacitor.javaclient.eventsourcing.NoOpSnapshotRepository;
import io.fluxcapacitor.javaclient.eventsourcing.NoSnapshotTrigger;
import io.fluxcapacitor.javaclient.eventsourcing.PeriodicSnapshotTrigger;
import io.fluxcapacitor.javaclient.eventsourcing.SnapshotRepository;
import io.fluxcapacitor.javaclient.eventsourcing.SnapshotTrigger;
import io.fluxcapacitor.javaclient.tracking.handling.HandlerInterceptor;
import java.beans.ConstructorProperties;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultEventSourcing
implements EventSourcing,
HandlerInterceptor {
    private static final Logger log = LoggerFactory.getLogger(DefaultEventSourcing.class);
    private final Map<Class, Function<String, EventSourcedModel<?>>> modelFactories = new ConcurrentHashMap();
    private final EventStore eventStore;
    private final SnapshotRepository snapshotRepository;
    private final Cache cache;
    private final ThreadLocal<Collection<EventSourcedModel<?>>> loadedModels = new ThreadLocal();

    @Override
    public <T> Model<T> load(String modelId, Class<T> modelType) {
        Collection<EventSourcedModel<?>> loaded = this.loadedModels.get();
        if (loaded == null) {
            return this.createEsModel(modelType, modelId);
        }
        return loaded.stream().filter(model -> ((EventSourcedModel)model).id.equals(modelId)).map(m -> m).findAny().orElseGet(() -> {
            EventSourcedModel model = this.createEsModel(modelType, modelId);
            loaded.add(model);
            return model;
        });
    }

    @Override
    public void invalidateCache() {
        this.cache.invalidateAll();
    }

    @Override
    public EventStore eventStore() {
        return this.eventStore;
    }

    protected <T> EventSourcedModel<T> createEsModel(Class<T> modelType, String modelId) {
        return (EventSourcedModel)this.modelFactories.computeIfAbsent(modelType, t -> {
            AnnotatedEventSourcingHandler eventSourcingHandler = new AnnotatedEventSourcingHandler(modelType);
            Cache cache = this.cache(modelType);
            SnapshotRepository snapshotRepository = this.snapshotRepository(modelType);
            SnapshotTrigger snapshotTrigger = this.snapshotTrigger(modelType);
            String domain = this.domain(modelType);
            return id -> {
                EventSourcedModel eventSourcedModel = new EventSourcedModel(eventSourcingHandler, cache, this.eventStore, snapshotRepository, snapshotTrigger, domain, this.loadedModels.get() == null, (String)id);
                eventSourcedModel.initialize();
                return eventSourcedModel;
            };
        }).apply(modelId);
    }

    @Override
    public Function<DeserializingMessage, Object> interceptHandling(Function<DeserializingMessage, Object> function, Handler<DeserializingMessage> handler, String consumer) {
        return command -> {
            ArrayList models = new ArrayList();
            this.loadedModels.set(models);
            try {
                Object result = function.apply((DeserializingMessage)command);
                try {
                    while (!models.isEmpty()) {
                        ((EventSourcedModel)models.remove(models.size() - 1)).commit();
                    }
                }
                catch (Exception e) {
                    throw new EventSourcingException(String.format("Failed to commit applied events after handling %s", command), e);
                }
                Object r = result;
                return r;
            }
            finally {
                this.loadedModels.remove();
            }
        };
    }

    protected SnapshotRepository snapshotRepository(Class<?> modelType) {
        int frequency = Optional.ofNullable(modelType.getAnnotation(EventSourced.class)).map(EventSourced::snapshotPeriod).orElse((int)((Integer)EventSourced.class.getMethod("snapshotPeriod", new Class[0]).getDefaultValue()));
        return frequency > 0 ? this.snapshotRepository : NoOpSnapshotRepository.INSTANCE;
    }

    protected SnapshotTrigger snapshotTrigger(Class<?> modelType) {
        int frequency = Optional.ofNullable(modelType.getAnnotation(EventSourced.class)).map(EventSourced::snapshotPeriod).orElse((int)((Integer)EventSourced.class.getMethod("snapshotPeriod", new Class[0]).getDefaultValue()));
        return frequency > 0 ? new PeriodicSnapshotTrigger(frequency) : NoSnapshotTrigger.INSTANCE;
    }

    protected Cache cache(Class<?> modelType) {
        boolean cached = Optional.ofNullable(modelType.getAnnotation(EventSourced.class)).map(EventSourced::cached).orElse((boolean)((Boolean)EventSourced.class.getMethod("cached", new Class[0]).getDefaultValue()));
        return cached ? this.cache : NoCache.INSTANCE;
    }

    protected String domain(Class<?> modelType) {
        return Optional.ofNullable(modelType.getAnnotation(EventSourced.class)).map(EventSourced::domain).filter(s -> !s.isEmpty()).orElse(modelType.getSimpleName());
    }

    @ConstructorProperties(value={"eventStore", "snapshotRepository", "cache"})
    public DefaultEventSourcing(EventStore eventStore, SnapshotRepository snapshotRepository, Cache cache) {
        this.eventStore = eventStore;
        this.snapshotRepository = snapshotRepository;
        this.cache = cache;
    }

    protected static class EventSourcedModel<T>
    implements Model<T> {
        private final EventSourcingHandler<T> eventSourcingHandler;
        private final Cache cache;
        private final EventStore eventStore;
        private final SnapshotRepository snapshotRepository;
        private final SnapshotTrigger snapshotTrigger;
        private final String domain;
        private final List<Message> unpublishedEvents = new ArrayList<Message>();
        private final boolean readOnly;
        private final String id;
        private Aggregate<T> aggregate;

        protected void initialize() {
            this.aggregate = this.cache.get(this.id, i -> {
                Aggregate<Object> aggregate = this.snapshotRepository.getSnapshot(this.id).orElse(new Aggregate<Object>(this.id, -1L, null));
                for (DeserializingMessage event : this.eventStore.getDomainEvents(this.id, aggregate.getSequenceNumber()).collect(Collectors.toList())) {
                    aggregate = aggregate.update(m -> {
                        DeserializingMessage current = DeserializingMessage.getCurrent();
                        try {
                            DeserializingMessage.setCurrent(event);
                            Object object = this.eventSourcingHandler.apply(event.toMessage(), m);
                            return object;
                        }
                        finally {
                            DeserializingMessage.setCurrent(current);
                        }
                    });
                }
                return aggregate;
            });
        }

        @Override
        public Model<T> apply(Message message) {
            if (this.readOnly) {
                throw new EventSourcingException(String.format("Not allowed to apply a %s. The model is readonly.", message));
            }
            this.unpublishedEvents.add(message);
            this.aggregate = this.aggregate.update(m -> this.eventSourcingHandler.apply(message, m));
            return this;
        }

        @Override
        public T get() {
            return this.aggregate.getModel();
        }

        @Override
        public long getSequenceNumber() {
            return this.aggregate.getSequenceNumber();
        }

        protected void commit() {
            if (!this.unpublishedEvents.isEmpty()) {
                this.cache.put(this.aggregate.getId(), this.aggregate);
                this.eventStore.storeDomainEvents(this.aggregate.getId(), this.domain, this.aggregate.getSequenceNumber(), new ArrayList<Message>(this.unpublishedEvents));
                if (this.snapshotTrigger.shouldCreateSnapshot(this.aggregate, this.unpublishedEvents)) {
                    this.snapshotRepository.storeSnapshot(this.aggregate);
                }
                this.unpublishedEvents.clear();
            }
        }

        @ConstructorProperties(value={"eventSourcingHandler", "cache", "eventStore", "snapshotRepository", "snapshotTrigger", "domain", "readOnly", "id"})
        public EventSourcedModel(EventSourcingHandler<T> eventSourcingHandler, Cache cache, EventStore eventStore, SnapshotRepository snapshotRepository, SnapshotTrigger snapshotTrigger, String domain, boolean readOnly, String id) {
            this.eventSourcingHandler = eventSourcingHandler;
            this.cache = cache;
            this.eventStore = eventStore;
            this.snapshotRepository = snapshotRepository;
            this.snapshotTrigger = snapshotTrigger;
            this.domain = domain;
            this.readOnly = readOnly;
            this.id = id;
        }
    }
}

