/*
 * Decompiled with CFR 0.152.
 */
package ratpack.stream.internal;

import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscription;
import ratpack.func.Action;
import ratpack.func.Function;
import ratpack.stream.internal.BufferedWriteStream;
import ratpack.stream.internal.BufferingPublisher;

public class PeriodicPublisher<T>
extends BufferingPublisher<T> {
    public PeriodicPublisher(ScheduledExecutorService executorService, final Function<? super Integer, ? extends T> producer, Duration duration) {
        super(Action.noop(), (? super BufferedWriteStream<T> write) -> new Subscription((BufferedWriteStream)write, executorService, duration){
            private volatile int counter;
            private volatile boolean started;
            private volatile boolean cancelled;
            final /* synthetic */ BufferedWriteStream val$write;
            final /* synthetic */ ScheduledExecutorService val$executorService;
            final /* synthetic */ Duration val$duration;
            {
                this.val$write = bufferedWriteStream;
                this.val$executorService = scheduledExecutorService;
                this.val$duration = duration;
            }

            public void request(long n) {
                if (!this.started) {
                    this.started = true;
                    new Task().run();
                }
            }

            public void cancel() {
                this.cancelled = true;
            }

            class Task
            implements Runnable {
                Task() {
                }

                @Override
                public void run() {
                    Object value;
                    try {
                        value = producer.apply(counter++);
                    }
                    catch (Exception e) {
                        cancelled = true;
                        val$write.error(e);
                        return;
                    }
                    if (value == null) {
                        cancelled = true;
                        val$write.complete();
                    } else if (!cancelled) {
                        val$write.item(value);
                        val$executorService.schedule(this, val$duration.toNanos(), TimeUnit.NANOSECONDS);
                    }
                }
            }
        });
    }
}

