/*
 * Decompiled with CFR 0.152.
 */
package org.spf4j.tsdb2;

import com.google.common.io.ByteStreams;
import com.google.common.primitives.Longs;
import com.sun.nio.file.SensitivityWatchEventModifier;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
import java.nio.channels.Channels;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.spf4j.base.Either;
import org.spf4j.base.ExecutionContexts;
import org.spf4j.base.Handler;
import org.spf4j.base.TimeSource;
import org.spf4j.concurrent.DefaultExecutor;
import org.spf4j.io.CountingInputStream;
import org.spf4j.io.MemorizingBufferedInputStream;
import org.spf4j.tsdb2.TSDBWriter;
import org.spf4j.tsdb2.avro.DataBlock;
import org.spf4j.tsdb2.avro.Header;
import org.spf4j.tsdb2.avro.TableDef;

@SuppressFBWarnings(value={"IICU_INCORRECT_INTERNAL_CLASS_USE"})
public final class TSDBReader
implements Closeable {
    private static final boolean CORUPTION_LENIENT = Boolean.getBoolean("spf4j.tsdb2.lenientRead");
    private static final Schema R_SCHEMA = Schema.createUnion(Arrays.asList(TableDef.SCHEMA$, DataBlock.SCHEMA$));
    private CountingInputStream bis;
    private final Header header;
    private long size;
    private BinaryDecoder decoder;
    private final SpecificDatumReader<Object> recordReader;
    private RandomAccessFile raf;
    private final File file;
    private final Path filePath;
    private volatile boolean watch;
    private final int bufferSize;
    private final SeekableByteChannel byteChannel;

    public TSDBReader(File file, int bufferSize) throws IOException {
        this(file, bufferSize, 0L);
    }

    public TSDBReader(File file, int bufferSize, long from) throws IOException {
        this.file = file;
        this.filePath = file.toPath();
        this.bufferSize = bufferSize;
        this.byteChannel = Files.newByteChannel(this.filePath, new OpenOption[0]);
        this.resetStream(0L);
        SpecificDatumReader reader = new SpecificDatumReader(Header.getClassSchema());
        TSDBWriter.validateType(this.bis);
        byte[] buff = new byte[8];
        ByteStreams.readFully((InputStream)this.bis, (byte[])buff);
        this.size = Longs.fromByteArray((byte[])buff);
        this.header = (Header)reader.read(null, (Decoder)this.decoder);
        this.recordReader = new SpecificDatumReader(new Schema.Parser().parse(this.header.getContentSchema()), R_SCHEMA);
        if (from > 0L) {
            this.resetStream(from);
        }
    }

    private void resetStream(long position) throws IOException {
        this.byteChannel.position(position);
        this.bis = new CountingInputStream(new MemorizingBufferedInputStream(Channels.newInputStream(this.byteChannel), this.bufferSize), position);
        this.decoder = DecoderFactory.get().directBinaryDecoder((InputStream)this.bis, this.decoder);
    }

    public synchronized boolean reReadSize() throws IOException {
        if (this.raf == null) {
            this.raf = new RandomAccessFile(this.file, "r");
        }
        this.raf.seek(TSDBWriter.MAGIC.length);
        long old = this.size;
        this.size = this.raf.readLong();
        if (this.size != old) {
            this.resetStream(this.bis.getCount());
            return true;
        }
        return false;
    }

    @Nullable
    public synchronized Either<TableDef, DataBlock> read() throws IOException {
        Object result;
        long position = this.bis.getCount();
        if (position >= this.size) {
            return null;
        }
        try {
            result = this.recordReader.read(null, (Decoder)this.decoder);
        }
        catch (IOException | RuntimeException ex) {
            if (CORUPTION_LENIENT) {
                return null;
            }
            throw new IOException("Error reading tsdb file at " + position + ", this= " + this, ex);
        }
        if (result instanceof TableDef) {
            TableDef td = (TableDef)result;
            long tdId = td.getId();
            if (position != tdId) {
                throw new IOException("Table Id should be equal with file position " + position + ", " + tdId);
            }
            return Either.left(td);
        }
        return Either.right((DataBlock)result);
    }

    @Override
    public synchronized void close() throws IOException {
        try (CountingInputStream is = this.bis;){
            if (this.raf != null) {
                this.raf.close();
            }
        }
    }

    public synchronized long getSize() {
        return this.size;
    }

    public Header getHeader() {
        return this.header;
    }

    public void stopWatching() {
        this.watch = false;
    }

    public synchronized <E extends Exception> Future<Void> bgWatch(Handler<Either<TableDef, DataBlock>, E> handler, EventSensitivity es) {
        return this.bgWatch(handler, es, ExecutionContexts.getContextDeadlineNanos());
    }

    public synchronized <E extends Exception> Future<Void> bgWatch(Handler<Either<TableDef, DataBlock>, E> handler, EventSensitivity es, long timeout, TimeUnit unit) {
        return this.bgWatch(handler, es, TimeSource.nanoTime() + unit.toNanos(timeout));
    }

    public synchronized <E extends Exception> Future<Void> bgWatch(final Handler<Either<TableDef, DataBlock>, E> handler, final EventSensitivity es, long deadlineNanos) {
        return DefaultExecutor.INSTANCE.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                TSDBReader.this.watch(handler, es);
                return null;
            }
        });
    }

    public <E extends Exception> void watch(Handler<Either<TableDef, DataBlock>, E> handler, EventSensitivity es) throws IOException, InterruptedException, E {
        this.watch(handler, es, ExecutionContexts.getContextDeadlineNanos());
    }

    public <E extends Exception> void watch(Handler<Either<TableDef, DataBlock>, E> handler, EventSensitivity es, long timeout, TimeUnit unit) throws IOException, InterruptedException, E {
        this.watch(handler, es, TimeSource.nanoTime() + unit.toNanos(timeout));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @SuppressFBWarnings(value={"NOS_NON_OWNED_SYNCHRONIZATION"})
    public <E extends Exception> void watch(Handler<Either<TableDef, DataBlock>, E> handler, EventSensitivity es, long deadlineNanos) throws IOException, InterruptedException, E {
        SensitivityWatchEventModifier sensitivity;
        TSDBReader tSDBReader = this;
        synchronized (tSDBReader) {
            if (this.watch) {
                throw new IllegalStateException("File is already watched " + this.file);
            }
            this.watch = true;
        }
        switch (es) {
            case LOW: {
                sensitivity = SensitivityWatchEventModifier.LOW;
                break;
            }
            case MEDIUM: {
                sensitivity = SensitivityWatchEventModifier.MEDIUM;
                break;
            }
            case HIGH: {
                sensitivity = SensitivityWatchEventModifier.HIGH;
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unsupported sensitivity " + (Object)((Object)es));
            }
        }
        Path path = this.file.getParentFile().toPath();
        try (WatchService watchService = path.getFileSystem().newWatchService();){
            path.register(watchService, new WatchEvent.Kind[]{StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.OVERFLOW}, sensitivity);
            this.readAll(handler, deadlineNanos);
            do {
                long tNanos;
                if ((tNanos = deadlineNanos - TimeSource.nanoTime()) <= 0L) {
                } else {
                    WatchKey key = watchService.poll(1L, TimeUnit.SECONDS);
                    if (key == null) {
                        if (!this.reReadSize()) continue;
                        this.readAll(handler, deadlineNanos);
                        continue;
                    }
                    if (!key.isValid()) {
                        key.cancel();
                    } else {
                        if (!key.pollEvents().isEmpty() && this.reReadSize()) {
                            this.readAll(handler, deadlineNanos);
                        }
                        if (key.reset()) continue;
                        key.cancel();
                    }
                }
                break;
            } while (this.watch);
        }
        finally {
            this.watch = false;
        }
    }

    public synchronized <E extends Exception> void readAll(Handler<Either<TableDef, DataBlock>, E> handler, long deadlineNanos) throws IOException, E {
        Either<TableDef, DataBlock> data;
        while ((data = this.read()) != null) {
            handler.handle(data, deadlineNanos);
        }
    }

    public synchronized void readAll(Consumer<Either<TableDef, DataBlock>> consumer) throws IOException {
        Either<TableDef, DataBlock> data;
        while ((data = this.read()) != null) {
            consumer.accept(data);
        }
    }

    public String toString() {
        return "TSDBReader{size=" + this.size + ", raf=" + this.raf + ", file=" + this.file + '}';
    }

    public static enum EventSensitivity {
        HIGH,
        MEDIUM,
        LOW;

    }
}

