/*
 * Decompiled with CFR 0.152.
 */
package org.spf4j.concurrent;

import com.google.common.annotations.Beta;
import java.io.Closeable;
import java.io.Flushable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.spf4j.concurrent.DefaultScheduler;

@Beta
public final class ThreadLocalBufferedConsumer<T>
implements Flushable,
Closeable {
    private final ThreadLocal<List<T>> localBuffer;
    private final Map<Thread, List<T>> buffers;
    private final Consumer<List<T>> consumer;
    private final int localSize;
    private final ScheduledFuture<?> schedule;

    public ThreadLocalBufferedConsumer(final int localSize, Consumer<List<T>> consumer, int delayMillis) {
        this.localSize = localSize;
        this.buffers = new ConcurrentHashMap<Thread, List<T>>();
        this.consumer = consumer;
        this.localBuffer = new ThreadLocal<List<T>>(){

            @Override
            protected List<T> initialValue() {
                ArrayList result = new ArrayList(localSize);
                ThreadLocalBufferedConsumer.this.buffers.put(Thread.currentThread(), result);
                return result;
            }
        };
        this.schedule = DefaultScheduler.INSTANCE.scheduleWithFixedDelay(this::flush, delayMillis, delayMillis, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void write(T value) {
        List<T> lb;
        List<T> list = lb = this.localBuffer.get();
        synchronized (list) {
            if (lb.size() >= this.localSize) {
                this.consumer.accept(lb);
                lb.clear();
            }
            lb.add(value);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void flush() {
        Iterator<Map.Entry<Thread, List<T>>> iterator = this.buffers.entrySet().iterator();
        ArrayList<T> toWrite = new ArrayList<T>(this.localSize);
        while (iterator.hasNext()) {
            List<T> lb;
            Map.Entry<Thread, List<T>> entry = iterator.next();
            Thread thread = entry.getKey();
            if (!thread.isAlive()) {
                iterator.remove();
            }
            List<T> list = lb = entry.getValue();
            synchronized (list) {
                if (!lb.isEmpty()) {
                    toWrite.addAll(lb);
                }
                lb.clear();
            }
        }
        this.consumer.accept(toWrite);
    }

    @Override
    public synchronized void close() {
        if (!this.schedule.isCancelled()) {
            this.schedule.cancel(false);
            this.flush();
        }
    }

    public String toString() {
        return "ThreadLocalBufferedConsumer{ consumer=" + this.consumer + ", localSize=" + this.localSize + ", schedule=" + this.schedule + '}';
    }
}

