/*
 * Decompiled with CFR 0.152.
 */
package org.cloudfoundry.util;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuple2;

public final class SortingUtils {
    private SortingUtils() {
    }

    public static <T> Function<Flux<T>, Flux<T>> timespan(Comparator<T> comparator, Duration timespan) {
        return source -> {
            PriorityQueue accumulator = new PriorityQueue((o1, o2) -> comparator.compare(o1.getT2(), o2.getT2()));
            Object monitor = new Object();
            DirectProcessor d = DirectProcessor.create();
            Disposable disposable = source.timestamp().subscribe(item -> Optional.ofNullable(item).ifPresent(i -> {
                Object object = monitor;
                synchronized (object) {
                    accumulator.add(i);
                }
            }), arg_0 -> ((DirectProcessor)d).onError(arg_0), () -> ((DirectProcessor)d).onComplete());
            return Flux.interval((Duration)timespan).takeUntilOther((Publisher)d).flatMap(n -> SortingUtils.getItems(accumulator, comparator, timespan), null, () -> SortingUtils.getItems(accumulator, comparator, Duration.ZERO)).doOnCancel(() -> ((Disposable)disposable).dispose());
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static <T> Flux<T> getItems(Queue<Tuple2<Long, T>> accumulator, Object monitor, Duration timespan) {
        ArrayList<Object> items = new ArrayList<Object>();
        Object object = monitor;
        synchronized (object) {
            while (SortingUtils.isBefore(accumulator.peek(), timespan)) {
                items.add(accumulator.remove().getT2());
            }
        }
        return Flux.fromIterable(items);
    }

    private static <T> boolean isBefore(Tuple2<Long, T> candidate, Duration timespan) {
        return candidate != null && (Duration.ZERO == timespan || Instant.ofEpochMilli((Long)candidate.getT1()).isBefore(Instant.now().minus(timespan)));
    }
}

