/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.spi.cluster.jgroups.impl.services;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.spi.cluster.jgroups.impl.services.RpcExecutorService;
import io.vertx.spi.cluster.jgroups.impl.support.DataHolder;
import io.vertx.spi.cluster.jgroups.impl.support.LambdaLogger;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.jgroups.Message;
import org.jgroups.blocks.MethodCall;
import org.jgroups.blocks.RequestOptions;
import org.jgroups.blocks.ResponseMode;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.util.NotifyingFuture;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;

public class DefaultRpcExecutorService
implements RpcExecutorService,
LambdaLogger {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultRpcExecutorService.class);
    private static final Message.Flag[] JGROUPS_FLAGS = new Message.Flag[]{Message.Flag.NO_TOTAL_ORDER};
    private final Vertx vertx;
    private final RpcDispatcher dispatcher;
    private volatile boolean active = true;

    public DefaultRpcExecutorService(Vertx vertx, RpcDispatcher dispatcher) {
        this.vertx = vertx;
        this.dispatcher = dispatcher;
    }

    @Override
    public <T> void runAsync(Supplier<T> supplier, Handler<AsyncResult<T>> handler) {
        this.vertx.executeBlocking(future -> {
            try {
                future.complete(supplier.get());
            }
            catch (Exception e) {
                future.fail((Throwable)e);
            }
        }, handler);
    }

    @Override
    public <T> T remoteExecute(MethodCall action, long timeout) {
        this.logTrace(() -> String.format("RemoteExecute sync action %s with timeout %s", action, timeout));
        RequestOptions options = new RequestOptions().setFlags(JGROUPS_FLAGS).setMode(ResponseMode.GET_ALL).setTimeout(timeout);
        try {
            NotifyingFuture<RspList<T>> notifyingFuture = this.execute(action, options);
            RspList rspList = (RspList)notifyingFuture.get(timeout, TimeUnit.MILLISECONDS);
            return this.futureDone(rspList);
        }
        catch (Exception e) {
            throw new VertxException((Throwable)e);
        }
    }

    @Override
    public <T> void remoteExecute(MethodCall action, Handler<AsyncResult<T>> handler) {
        this.remoteExecute(action, 0L, handler);
    }

    @Override
    public <T> void remoteExecute(MethodCall action, long timeout, Handler<AsyncResult<T>> handler) {
        this.logTrace(() -> String.format("RemoteExecute action %s, handler %s", action, handler));
        RequestOptions options = new RequestOptions().setFlags(JGROUPS_FLAGS).setMode(ResponseMode.GET_ALL).setTimeout(timeout);
        try {
            NotifyingFuture<RspList<T>> notifyingFuture = this.execute(action, options);
            notifyingFuture.setListener(future -> this.vertx.executeBlocking(f -> {
                try {
                    RspList rspList = (RspList)future.get();
                    f.complete(this.futureDone(rspList));
                }
                catch (Exception e) {
                    f.fail((Throwable)e);
                }
            }, handler));
        }
        catch (Exception e) {
            handler.handle((Object)Future.failedFuture((Throwable)e));
        }
    }

    @Override
    public void stop() {
        this.active = false;
    }

    private <T> NotifyingFuture<RspList<T>> execute(MethodCall action, RequestOptions options) throws Exception {
        return this.internalExecute(action, options);
    }

    private <T> NotifyingFuture<RspList<T>> internalExecute(MethodCall action, RequestOptions options) throws Exception {
        if (this.active) {
            return this.dispatcher.callRemoteMethodsWithFuture(null, action, options);
        }
        throw new VertxException("Executor service is closed");
    }

    private <T> T futureDone(RspList<T> rspList) {
        Collection values = rspList.values();
        values.parallelStream().filter(Rsp::hasException).forEach(rsp -> this.logWarn(() -> String.format("Execute method failed. Sender [%s], with exception [%s]", rsp.getSender(), rsp.getException())));
        T value = values.stream().filter(Rsp::wasReceived).filter(((Predicate<Rsp>)Rsp::hasException).negate()).filter(((Predicate<Rsp>)Rsp::wasUnreachable).negate()).map(Rsp::getValue).filter(t -> t != null).reduce((a, b) -> a).orElse(null);
        if (value instanceof DataHolder) {
            return ((DataHolder)value).unwrap();
        }
        return value;
    }

    @Override
    public Logger log() {
        return LOG;
    }
}

