/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.table.sink;

import java.util.List;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.InnerTableWrite;
import org.apache.paimon.table.sink.KeyAndBucketExtractor;
import org.apache.paimon.table.sink.RowKindGenerator;
import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.Restorable;

public class TableWriteImpl<T>
implements InnerTableWrite,
Restorable<List<FileStoreWrite.State<T>>> {
    private final FileStoreWrite<T> write;
    private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor;
    private final RecordExtractor<T> recordExtractor;
    @Nullable
    private final RowKindGenerator rowKindGenerator;
    private final boolean ignoreDelete;
    private boolean batchCommitted = false;
    private BucketMode bucketMode;

    public TableWriteImpl(FileStoreWrite<T> write, KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor, RecordExtractor<T> recordExtractor, @Nullable RowKindGenerator rowKindGenerator, boolean ignoreDelete) {
        this.write = write;
        this.keyAndBucketExtractor = keyAndBucketExtractor;
        this.recordExtractor = recordExtractor;
        this.rowKindGenerator = rowKindGenerator;
        this.ignoreDelete = ignoreDelete;
    }

    @Override
    public TableWriteImpl<T> withIgnorePreviousFiles(boolean ignorePreviousFiles) {
        this.write.withIgnorePreviousFiles(ignorePreviousFiles);
        return this;
    }

    @Override
    public TableWriteImpl<T> withExecutionMode(boolean isStreamingMode) {
        this.write.withExecutionMode(isStreamingMode);
        return this;
    }

    @Override
    public TableWriteImpl<T> withIOManager(IOManager ioManager) {
        this.write.withIOManager(ioManager);
        return this;
    }

    @Override
    public TableWriteImpl<T> withMemoryPool(MemorySegmentPool memoryPool) {
        this.write.withMemoryPool(memoryPool);
        return this;
    }

    public TableWriteImpl<T> withMemoryPoolFactory(MemoryPoolFactory memoryPoolFactory) {
        this.write.withMemoryPoolFactory(memoryPoolFactory);
        return this;
    }

    public TableWriteImpl<T> withCompactExecutor(ExecutorService compactExecutor) {
        this.write.withCompactExecutor(compactExecutor);
        return this;
    }

    public TableWriteImpl<T> withBucketMode(BucketMode bucketMode) {
        this.bucketMode = bucketMode;
        return this;
    }

    @Override
    public BinaryRow getPartition(InternalRow row) {
        this.keyAndBucketExtractor.setRecord(row);
        return this.keyAndBucketExtractor.partition();
    }

    @Override
    public int getBucket(InternalRow row) {
        this.keyAndBucketExtractor.setRecord(row);
        return this.keyAndBucketExtractor.bucket();
    }

    @Override
    public void write(InternalRow row) throws Exception {
        this.writeAndReturn(row);
    }

    @Override
    public void write(InternalRow row, int bucket) throws Exception {
        this.writeAndReturn(row, bucket);
    }

    @Nullable
    public SinkRecord writeAndReturn(InternalRow row) throws Exception {
        RowKind rowKind = RowKindGenerator.getRowKind(this.rowKindGenerator, row);
        if (this.ignoreDelete && rowKind.isRetract()) {
            return null;
        }
        SinkRecord record = this.toSinkRecord(row);
        this.write.write(record.partition(), record.bucket(), this.recordExtractor.extract(record, rowKind));
        return record;
    }

    @Nullable
    public SinkRecord writeAndReturn(InternalRow row, int bucket) throws Exception {
        RowKind rowKind = RowKindGenerator.getRowKind(this.rowKindGenerator, row);
        if (this.ignoreDelete && rowKind.isRetract()) {
            return null;
        }
        SinkRecord record = this.toSinkRecord(row, bucket);
        this.write.write(record.partition(), bucket, this.recordExtractor.extract(record, rowKind));
        return record;
    }

    private SinkRecord toSinkRecord(InternalRow row) {
        this.keyAndBucketExtractor.setRecord(row);
        return new SinkRecord(this.keyAndBucketExtractor.partition(), this.keyAndBucketExtractor.bucket(), this.keyAndBucketExtractor.trimmedPrimaryKey(), row);
    }

    private SinkRecord toSinkRecord(InternalRow row, int bucket) {
        this.keyAndBucketExtractor.setRecord(row);
        return new SinkRecord(this.keyAndBucketExtractor.partition(), bucket, this.keyAndBucketExtractor.trimmedPrimaryKey(), row);
    }

    public SinkRecord toLogRecord(SinkRecord record) {
        this.keyAndBucketExtractor.setRecord(record.row());
        return new SinkRecord(record.partition(), this.bucketMode == BucketMode.UNAWARE ? -1 : record.bucket(), this.keyAndBucketExtractor.logPrimaryKey(), record.row());
    }

    @Override
    public void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception {
        this.write.compact(partition, bucket, fullCompaction);
    }

    @Override
    public TableWriteImpl<T> withMetricRegistry(MetricRegistry metricRegistry) {
        this.write.withMetricRegistry(metricRegistry);
        return this;
    }

    public void notifyNewFiles(long snapshotId, BinaryRow partition, int bucket, List<DataFileMeta> files) {
        this.write.notifyNewFiles(snapshotId, partition, bucket, files);
    }

    @Override
    public List<CommitMessage> prepareCommit(boolean waitCompaction, long commitIdentifier) throws Exception {
        return this.write.prepareCommit(waitCompaction, commitIdentifier);
    }

    @Override
    public List<CommitMessage> prepareCommit() throws Exception {
        Preconditions.checkState(!this.batchCommitted, "BatchTableWrite only support one-time committing.");
        this.batchCommitted = true;
        return this.prepareCommit(true, Long.MAX_VALUE);
    }

    @Override
    public void close() throws Exception {
        this.write.close();
    }

    @Override
    public List<FileStoreWrite.State<T>> checkpoint() {
        return (List)this.write.checkpoint();
    }

    @Override
    public void restore(List<FileStoreWrite.State<T>> state) {
        this.write.restore(state);
    }

    @VisibleForTesting
    public FileStoreWrite<T> getWrite() {
        return this.write;
    }

    public static interface RecordExtractor<T> {
        public T extract(SinkRecord var1, RowKind var2);
    }
}

