/*
 * Decompiled with CFR 0.152.
 */
package ratpack.exec.internal;

import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.exec.internal.ContinuationStream;
import ratpack.exec.internal.DefaultExecution;
import ratpack.func.Action;
import ratpack.func.Block;
import ratpack.stream.TransformablePublisher;
import ratpack.util.Exceptions;

public class ExecutionBoundPublisher<T>
implements TransformablePublisher<T> {
    private final Publisher<T> publisher;
    private final Action<? super T> disposer;

    public ExecutionBoundPublisher(Publisher<T> publisher, Action<? super T> disposer) {
        this.publisher = publisher;
        this.disposer = disposer;
    }

    public void subscribe(Subscriber<? super T> subscriber) {
        final DefaultExecution execution = DefaultExecution.require();
        execution.delimitStream(arg_0 -> subscriber.onError(arg_0), continuation -> this.publisher.subscribe(new Subscriber<T>((ContinuationStream)continuation, (Subscriber)subscriber){
            private Subscription subscription;
            private final AtomicBoolean cancelled = new AtomicBoolean();
            private final AtomicBoolean pendingCancelSignal = new AtomicBoolean(true);
            final /* synthetic */ ContinuationStream val$continuation;
            final /* synthetic */ Subscriber val$subscriber;
            {
                this.val$continuation = continuationStream;
                this.val$subscriber = subscriber;
            }

            private boolean dispatch(Block block) {
                if (this.cancelled.get()) {
                    return false;
                }
                if (execution.isBound()) {
                    try {
                        block.execute();
                        return true;
                    }
                    catch (Exception e) {
                        throw Exceptions.uncheck(e);
                    }
                }
                return this.val$continuation.event(block);
            }

            public void onSubscribe(final Subscription subscription) {
                this.subscription = subscription;
                this.dispatch(() -> this.val$subscriber.onSubscribe(new Subscription(){

                    public void request(long n) {
                        this.dispatch(() -> subscription.request(n));
                    }

                    public void cancel() {
                        if (cancelled.compareAndSet(false, true)) {
                            if (execution.isBound()) {
                                subscription.cancel();
                                val$continuation.complete(Block.noop());
                            } else {
                                pendingCancelSignal.set(true);
                                val$continuation.complete(() -> {
                                    if (pendingCancelSignal.compareAndSet(true, false)) {
                                        subscription.cancel();
                                    }
                                });
                            }
                        }
                    }
                }));
            }

            public void onNext(T element) {
                boolean added = this.dispatch(() -> {
                    if (this.cancelled.get()) {
                        this.dispose(element);
                    } else {
                        this.val$subscriber.onNext(element);
                    }
                });
                if (!added) {
                    this.dispose(element);
                    if (this.cancelled.get() && execution.isBound() && this.pendingCancelSignal.compareAndSet(true, false)) {
                        this.subscription.cancel();
                        this.val$continuation.complete(Block.noop());
                    }
                }
            }

            private void dispose(T element) {
                try {
                    ExecutionBoundPublisher.this.disposer.execute(element);
                }
                catch (Exception e) {
                    DefaultExecution.LOGGER.warn("Exception raised disposing stream item will be ignored - ", (Throwable)e);
                }
            }

            public void onComplete() {
                this.val$continuation.complete(() -> {
                    if (!this.cancelled.get()) {
                        this.val$subscriber.onComplete();
                    }
                });
            }

            public void onError(Throwable cause) {
                if (!this.cancelled.get()) {
                    this.val$continuation.complete(() -> this.val$subscriber.onError(cause));
                }
            }
        }));
    }
}

