/*
 * Decompiled with CFR 0.152.
 */
package ratpack.stream.internal;

import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.func.Action;
import ratpack.stream.internal.BufferedWriteStream;
import ratpack.stream.internal.BufferingPublisher;

public class FanOutPublisher<T>
extends BufferingPublisher<T> {
    public FanOutPublisher(final Publisher<? extends Iterable<? extends T>> publisher) {
        super(Action.noop(), (? super BufferedWriteStream<T> write) -> new Subscription((BufferedWriteStream)write){
            Subscription upstream;
            final AtomicBoolean emitting = new AtomicBoolean();
            final /* synthetic */ BufferedWriteStream val$write;
            {
                this.val$write = bufferedWriteStream;
            }

            public void request(long n) {
                if (this.emitting.get()) {
                    return;
                }
                if (this.upstream == null) {
                    publisher.subscribe(new Subscriber<Iterable<? extends T>>(){

                        public void onSubscribe(Subscription s) {
                            if (val$write.isCancelled()) {
                                s.cancel();
                                return;
                            }
                            upstream = s;
                            if (val$write.getRequested() > 0L) {
                                s.request(val$write.getRequested());
                            }
                        }

                        public void onNext(Iterable<? extends T> items) {
                            emitting.set(true);
                            for (Object item : items) {
                                val$write.item(item);
                            }
                            emitting.set(false);
                            long requested = val$write.getRequested();
                            if (requested > 0L) {
                                upstream.request(1L);
                            }
                        }

                        public void onError(Throwable t) {
                            val$write.error(t);
                        }

                        public void onComplete() {
                            val$write.complete();
                        }
                    });
                } else {
                    this.upstream.request(n);
                }
            }

            public void cancel() {
                if (this.upstream != null) {
                    this.upstream.cancel();
                }
            }
        });
    }
}

