/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.mongodb.source.reader.fetch;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.changestream.OperationType;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
import com.ververica.cdc.connectors.mongodb.source.dialect.MongoDBDialect;
import com.ververica.cdc.connectors.mongodb.source.offset.ChangeStreamOffset;
import com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBFetchTaskContext;
import com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask;
import com.ververica.cdc.connectors.mongodb.source.utils.MongoRecordUtils;
import com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.relational.TableId;
import java.util.ArrayList;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.bson.RawBsonDocument;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDBScanFetchTask
implements FetchTask<SourceSplitBase> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBScanFetchTask.class);
    private final SnapshotSplit snapshotSplit;
    private volatile boolean taskRunning = false;

    public MongoDBScanFetchTask(SnapshotSplit snapshotSplit) {
        this.snapshotSplit = snapshotSplit;
    }

    public void execute(FetchTask.Context context) throws Exception {
        MongoDBFetchTaskContext taskContext = (MongoDBFetchTaskContext)context;
        MongoDBSourceConfig sourceConfig = taskContext.getSourceConfig();
        MongoDBDialect dialect = taskContext.getDialect();
        ChangeEventQueue<DataChangeEvent> changeEventQueue = taskContext.getQueue();
        this.taskRunning = true;
        TableId collectionId = this.snapshotSplit.getTableId();
        ChangeStreamOffset lowWatermark = dialect.displayCurrentOffset(sourceConfig);
        LOG.info("Snapshot step 1 - Determining low watermark {} for split {}", (Object)lowWatermark, (Object)this.snapshotSplit);
        changeEventQueue.enqueue((Object)new DataChangeEvent(WatermarkEvent.create(MongoRecordUtils.createWatermarkPartitionMap(collectionId.identifier()), (String)"__mongodb_watermarks", (String)this.snapshotSplit.splitId(), (WatermarkKind)WatermarkKind.LOW, (Offset)lowWatermark)));
        LOG.info("Snapshot step 2 - Snapshotting data");
        try (MongoCursor cursor = null;){
            MongoClient mongoClient = MongoUtils.clientFor(sourceConfig);
            MongoCollection<RawBsonDocument> collection = MongoUtils.collectionFor(mongoClient, collectionId, RawBsonDocument.class);
            cursor = collection.find().min((Bson)((BsonDocument)this.snapshotSplit.getSplitStart()[1])).max((Bson)((BsonDocument)this.snapshotSplit.getSplitEnd()[1])).hint((Bson)((BsonDocument)this.snapshotSplit.getSplitStart()[0])).batchSize(sourceConfig.getBatchSize()).noCursorTimeout(true).cursor();
            while (cursor.hasNext()) {
                if (!this.taskRunning) {
                    throw new InterruptedException("Interrupted while snapshotting collection " + collectionId.identifier());
                }
                BsonDocument valueDocument = this.normalizeSnapshotDocument(collectionId, (BsonDocument)cursor.next());
                BsonDocument keyDocument = new BsonDocument("_id", valueDocument.get((Object)"_id"));
                SourceRecord snapshotRecord = MongoRecordUtils.createSourceRecord(MongoRecordUtils.createPartitionMap(sourceConfig.getHosts(), collectionId.catalog(), collectionId.table()), MongoRecordUtils.createSourceOffsetMap(keyDocument.getDocument((Object)"_id"), true), collectionId.identifier(), keyDocument, valueDocument);
                changeEventQueue.enqueue((Object)new DataChangeEvent(snapshotRecord));
            }
            ChangeStreamOffset highWatermark = dialect.displayCurrentOffset(sourceConfig);
            LOG.info("Snapshot step 3 - Determining high watermark {} for split {}", (Object)highWatermark, (Object)this.snapshotSplit);
            changeEventQueue.enqueue((Object)new DataChangeEvent(WatermarkEvent.create(MongoRecordUtils.createWatermarkPartitionMap(collectionId.identifier()), (String)"__mongodb_watermarks", (String)this.snapshotSplit.splitId(), (WatermarkKind)WatermarkKind.HIGH, (Offset)highWatermark)));
            LOG.info("Snapshot step 4 - Back fill stream split for snapshot split {}", (Object)this.snapshotSplit);
            StreamSplit backfillStreamSplit = this.createBackfillStreamSplit(lowWatermark, highWatermark);
            boolean streamBackfillRequired = backfillStreamSplit.getEndingOffset().isAfter(backfillStreamSplit.getStartingOffset());
            if (!streamBackfillRequired) {
                changeEventQueue.enqueue((Object)new DataChangeEvent(WatermarkEvent.create(MongoRecordUtils.createWatermarkPartitionMap(collectionId.identifier()), (String)"__mongodb_watermarks", (String)backfillStreamSplit.splitId(), (WatermarkKind)WatermarkKind.END, (Offset)backfillStreamSplit.getEndingOffset())));
            } else {
                MongoDBStreamFetchTask backfillStreamTask = new MongoDBStreamFetchTask(backfillStreamSplit);
                backfillStreamTask.execute(taskContext);
            }
            this.taskRunning = false;
        }
    }

    public boolean isRunning() {
        return this.taskRunning;
    }

    public SnapshotSplit getSplit() {
        return this.snapshotSplit;
    }

    private StreamSplit createBackfillStreamSplit(ChangeStreamOffset lowWatermark, ChangeStreamOffset highWatermark) {
        return new StreamSplit(this.snapshotSplit.splitId(), (Offset)lowWatermark, (Offset)highWatermark, new ArrayList(), this.snapshotSplit.getTableSchemas(), 0);
    }

    private BsonDocument normalizeSnapshotDocument(TableId collectionId, BsonDocument originalDocument) {
        BsonDocument valueDocument = new BsonDocument();
        BsonDocument id = new BsonDocument();
        id.put("_id", originalDocument.get((Object)"_id"));
        valueDocument.put("_id", (BsonValue)id);
        valueDocument.put("operationType", (BsonValue)new BsonString(OperationType.INSERT.getValue()));
        BsonDocument ns = new BsonDocument();
        ns.put("db", (BsonValue)new BsonString(collectionId.catalog()));
        ns.put("coll", (BsonValue)new BsonString(collectionId.table()));
        valueDocument.put("ns", (BsonValue)ns);
        valueDocument.put("documentKey", (BsonValue)new BsonDocument("_id", originalDocument.get((Object)"_id")));
        valueDocument.put("fullDocument", (BsonValue)originalDocument);
        valueDocument.put("ts_ms", (BsonValue)new BsonInt64(System.currentTimeMillis()));
        BsonDocument source = new BsonDocument();
        source.put("snapshot", (BsonValue)new BsonString("true"));
        source.put("ts_ms", (BsonValue)new BsonInt64(0L));
        valueDocument.put("source", (BsonValue)source);
        return valueDocument;
    }
}

