/*
 * Decompiled with CFR 0.152.
 */
package ratpack.stream;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import ratpack.exec.ExecController;
import ratpack.exec.ExecSpec;
import ratpack.exec.Promise;
import ratpack.exec.internal.DefaultExecution;
import ratpack.func.Action;
import ratpack.func.BiFunction;
import ratpack.func.Function;
import ratpack.func.Predicate;
import ratpack.registry.Registry;
import ratpack.stream.StreamEvent;
import ratpack.stream.StreamMapper;
import ratpack.stream.TransformablePublisher;
import ratpack.stream.WriteStream;
import ratpack.stream.YieldRequest;
import ratpack.stream.internal.BatchingPublisher;
import ratpack.stream.internal.BufferingPublisher;
import ratpack.stream.internal.CollectingSubscriber;
import ratpack.stream.internal.ConcatPublisher;
import ratpack.stream.internal.DefaultTransformablePublisher;
import ratpack.stream.internal.EmptyPublisher;
import ratpack.stream.internal.FanOutPublisher;
import ratpack.stream.internal.FlatMapPublisher;
import ratpack.stream.internal.FlatYieldingPublisher;
import ratpack.stream.internal.FlattenPublisher;
import ratpack.stream.internal.ForkingSubscription;
import ratpack.stream.internal.GatedPublisher;
import ratpack.stream.internal.IterablePromisePublisher;
import ratpack.stream.internal.IterablePublisher;
import ratpack.stream.internal.MapPublisher;
import ratpack.stream.internal.MergingPublisher;
import ratpack.stream.internal.MulticastPublisher;
import ratpack.stream.internal.PeriodicPublisher;
import ratpack.stream.internal.SingleElementSubscriber;
import ratpack.stream.internal.StreamMapPublisher;
import ratpack.stream.internal.TakePublisher;
import ratpack.stream.internal.WiretapPublisher;
import ratpack.stream.internal.YieldingPublisher;
import ratpack.util.Types;

public class Streams {
    public static <T> TransformablePublisher<T> transformable(Publisher<T> publisher) {
        if (publisher instanceof TransformablePublisher) {
            return (TransformablePublisher)Types.cast(publisher);
        }
        return new DefaultTransformablePublisher<T>(publisher);
    }

    public static <T> TransformablePublisher<T> empty() {
        return (TransformablePublisher)Types.cast(EmptyPublisher.INSTANCE);
    }

    public static <T> TransformablePublisher<T> publish(Iterable<T> iterable) {
        return new IterablePublisher<T>(iterable);
    }

    public static <T> TransformablePublisher<T> publish(Promise<? extends Iterable<T>> promise) {
        return new IterablePromisePublisher(promise);
    }

    public static <T> TransformablePublisher<T> yield(Function<? super YieldRequest, ? extends T> producer) {
        return new YieldingPublisher<T>(producer);
    }

    public static <T> TransformablePublisher<T> flatYield(Function<? super YieldRequest, ? extends Promise<T>> producer) {
        return new FlatYieldingPublisher(producer);
    }

    public static <T> TransformablePublisher<T> constant(T item) {
        return Streams.yield(yieldRequest -> item);
    }

    public static <I, O> TransformablePublisher<O> map(Publisher<I> input, Function<? super I, ? extends O> function) {
        return new MapPublisher<O, I>(input, function);
    }

    public static <T> TransformablePublisher<T> filter(Publisher<T> input, Predicate<? super T> filter) {
        return Streams.streamMap(input, (Subscription s, WriteStream<D> out) -> out.itemMap(s, item -> {
            if (filter.apply(item)) {
                out.item(item);
            } else {
                s.request(1L);
            }
        }));
    }

    public static <U, D> TransformablePublisher<D> streamMap(Publisher<? extends U> input, StreamMapper<? super U, D> mapper) {
        return new StreamMapPublisher<U, D>(input, mapper).buffer();
    }

    @Deprecated
    public static <U, D> TransformablePublisher<D> streamMap(Publisher<U> input, Function<? super WriteStream<D>, ? extends WriteStream<? super U>> mapper) {
        return Streams.streamMap(input, (Subscription subscription, WriteStream<D> downstream) -> {
            WriteStream writeStream = (WriteStream)mapper.apply(downstream);
            return writeStream;
        });
    }

    public static <I, O> TransformablePublisher<O> flatMap(Publisher<I> input, Function<? super I, ? extends Promise<? extends O>> function) {
        return new BatchingPublisher(new FlatMapPublisher(input, function), 1, Action.noop());
    }

    public static <T> TransformablePublisher<T> buffer(Publisher<T> publisher) {
        return new BufferingPublisher(Action.noop(), publisher);
    }

    public static <T> TransformablePublisher<T> gate(Publisher<T> publisher, Action<? super Runnable> valveReceiver) {
        return new GatedPublisher<T>(publisher, valveReceiver);
    }

    public static <T> TransformablePublisher<T> periodically(ScheduledExecutorService executorService, Duration duration, Function<? super Integer, ? extends T> producer) {
        return new PeriodicPublisher<T>(executorService, producer, duration).buffer();
    }

    public static <T> TransformablePublisher<T> periodically(Registry registry, Duration duration, Function<? super Integer, ? extends T> producer) {
        return new PeriodicPublisher<T>(registry.get(ExecController.class).getExecutor(), producer, duration).buffer();
    }

    public static <T> TransformablePublisher<T> wiretap(Publisher<T> publisher, Action<? super StreamEvent<T>> listener) {
        return new WiretapPublisher<T>(publisher, listener);
    }

    public static <T> TransformablePublisher<T> multicast(Publisher<T> publisher) {
        return new MulticastPublisher<T>(publisher);
    }

    public static <T> TransformablePublisher<T> fanOut(Publisher<? extends Iterable<? extends T>> publisher) {
        return Streams.fanOut(publisher, Action.noop());
    }

    public static <T> TransformablePublisher<T> fanOut(Publisher<? extends Iterable<? extends T>> publisher, Action<? super T> disposer) {
        return new FanOutPublisher<T>(publisher, disposer);
    }

    @SafeVarargs
    public static <T> TransformablePublisher<T> merge(Publisher<? extends T> ... publishers) {
        return new MergingPublisher<T>(publishers).buffer();
    }

    public static <T> Promise<T> toPromise(Publisher<T> publisher) {
        return Promise.async(f -> publisher.subscribe(SingleElementSubscriber.to(f::accept)));
    }

    public static <T> Promise<List<T>> toList(Publisher<T> publisher) {
        return Promise.async(f -> publisher.subscribe(new CollectingSubscriber(f::accept, s -> s.request(Long.MAX_VALUE))));
    }

    public static <T, R> Promise<R> reduce(Publisher<T> publisher, final R seed, final BiFunction<? super R, ? super T, ? extends R> reducer) {
        return Promise.async(d -> publisher.subscribe(new Subscriber<T>(){
            private Subscription subscription;
            private AtomicInteger count;
            private volatile Object value;
            {
                this.value = seed;
                this.count = new AtomicInteger();
            }

            public void onSubscribe(Subscription s) {
                this.subscription = s;
                s.request(Long.MAX_VALUE);
            }

            public void onNext(T t) {
                this.count.incrementAndGet();
                try {
                    this.value = reducer.apply(this.value, t);
                }
                catch (Throwable e) {
                    this.subscription.cancel();
                    d.error(e);
                }
            }

            public void onError(Throwable t) {
                d.error(t);
            }

            public void onComplete() {
                d.success(this.value);
            }
        }));
    }

    public static <T> TransformablePublisher<T> bindExec(Publisher<T> publisher) {
        return Streams.bindExec(publisher, Action.noop());
    }

    public static <T> TransformablePublisher<T> bindExec(Publisher<T> publisher, Action<? super T> disposer) {
        return DefaultExecution.stream(publisher, disposer);
    }

    public static <T> TransformablePublisher<T> fork(Publisher<T> publisher, Action<? super ExecSpec> execConfig, Action<? super T> disposer) {
        return new BufferingPublisher<T>(disposer, write -> new ForkingSubscription((Action<? super ExecSpec>)((Action<ExecSpec>)execConfig), publisher, write)).bindExec(disposer);
    }

    public static <T> TransformablePublisher<T> take(long count, Publisher<T> upstreamPublisher) {
        return new TakePublisher<T>(count, upstreamPublisher);
    }

    public static <T> TransformablePublisher<T> concat(Iterable<? extends Publisher<? extends T>> publishers, Action<? super T> disposer) {
        return new ConcatPublisher<T>(disposer, publishers);
    }

    public static <T> TransformablePublisher<T> concat(Iterable<? extends Publisher<? extends T>> publishers) {
        return new ConcatPublisher(Action.noop(), publishers);
    }

    public static <T> TransformablePublisher<T> batch(int batchSize, Publisher<T> publisher, Action<? super T> disposer) {
        return new BatchingPublisher<T>(publisher, batchSize, disposer);
    }

    public static <T> TransformablePublisher<T> flatten(Publisher<? extends Publisher<T>> publisher) {
        return Streams.flatten(publisher, Action.noop());
    }

    public static <T> TransformablePublisher<T> flatten(Publisher<? extends Publisher<T>> publisher, Action<? super T> disposer) {
        return new FlattenPublisher<T>(publisher, disposer);
    }
}

