/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.postgresql.connection.wal2json;

import io.debezium.connector.postgresql.TypeRegistry;
import io.debezium.connector.postgresql.connection.AbstractMessageDecoder;
import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.connector.postgresql.connection.ReplicationStream;
import io.debezium.connector.postgresql.connection.TransactionMessage;
import io.debezium.connector.postgresql.connection.wal2json.DateTimeFormat;
import io.debezium.connector.postgresql.connection.wal2json.Wal2JsonReplicationMessage;
import io.debezium.document.Array;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.document.Value;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.kafka.connect.errors.ConnectException;
import org.postgresql.replication.fluent.logical.ChainedLogicalStreamBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NonStreamingWal2JsonMessageDecoder
extends AbstractMessageDecoder {
    private static final Logger LOGGER = LoggerFactory.getLogger(NonStreamingWal2JsonMessageDecoder.class);
    private final DateTimeFormat dateTime = DateTimeFormat.get();
    private boolean containsMetadata = false;

    @Override
    public void processNotEmptyMessage(ByteBuffer buffer, ReplicationStream.ReplicationMessageProcessor processor, TypeRegistry typeRegistry) throws SQLException, InterruptedException {
        try {
            if (!buffer.hasArray()) {
                throw new IllegalStateException("Invalid buffer received from PG server during streaming replication");
            }
            byte[] source = buffer.array();
            byte[] content = Arrays.copyOfRange(source, buffer.arrayOffset(), source.length);
            LOGGER.trace("Message arrived for decoding {}", (Object)new String(content));
            Document message = DocumentReader.floatNumbersAsTextReader().read(content);
            long txId = message.getLong((CharSequence)"xid");
            String timestamp = message.getString((CharSequence)"timestamp");
            Instant commitTime = this.dateTime.systemTimestampToInstant(timestamp);
            Array changes = message.getArray((CharSequence)"change");
            if (changes.isEmpty()) {
                processor.process(null);
            } else {
                Iterator it = changes.iterator();
                processor.process(new TransactionMessage(ReplicationMessage.Operation.BEGIN, txId, commitTime));
                while (it.hasNext()) {
                    Value value = ((Array.Entry)it.next()).getValue();
                    processor.process(new Wal2JsonReplicationMessage(txId, commitTime, value.asDocument(), this.containsMetadata, !it.hasNext(), typeRegistry));
                }
                processor.process(new TransactionMessage(ReplicationMessage.Operation.COMMIT, txId, commitTime));
            }
        }
        catch (IOException e) {
            throw new ConnectException((Throwable)e);
        }
    }

    @Override
    public ChainedLogicalStreamBuilder optionsWithMetadata(ChainedLogicalStreamBuilder builder) {
        return this.optionsWithoutMetadata(builder).withSlotOption("include-not-null", "true");
    }

    @Override
    public ChainedLogicalStreamBuilder optionsWithoutMetadata(ChainedLogicalStreamBuilder builder) {
        return builder.withSlotOption("pretty-print", 1).withSlotOption("write-in-chunks", 0).withSlotOption("include-xids", 1).withSlotOption("include-timestamp", 1);
    }

    @Override
    public void setContainsMetadata(boolean containsMetadata) {
        this.containsMetadata = containsMetadata;
    }
}

