/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.mongodb.source.reader;

import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplitState;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter;
import com.ververica.cdc.connectors.mongodb.source.offset.ChangeStreamOffset;
import com.ververica.cdc.connectors.mongodb.source.utils.MongoRecordUtils;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MongoDBRecordEmitter<T>
extends IncrementalSourceRecordEmitter<T> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBRecordEmitter.class);

    public MongoDBRecordEmitter(DebeziumDeserializationSchema<T> deserializationSchema, SourceReaderMetrics sourceReaderMetrics, OffsetFactory offsetFactory) {
        super(deserializationSchema, sourceReaderMetrics, false, offsetFactory);
    }

    protected void processElement(SourceRecord element, SourceOutput<T> output, SourceSplitState splitState) throws Exception {
        if (WatermarkEvent.isWatermarkEvent((SourceRecord)element)) {
            Offset watermark = this.getOffsetPosition(element);
            if (WatermarkEvent.isHighWatermarkEvent((SourceRecord)element) && splitState.isSnapshotSplitState()) {
                splitState.asSnapshotSplitState().setHighWatermark(watermark);
            }
        } else if (MongoRecordUtils.isHeartbeatEvent(element)) {
            if (splitState.isStreamSplitState()) {
                this.updatePositionForStreamSplit(element, splitState);
            }
        } else if (MongoRecordUtils.isDataChangeRecord(element)) {
            if (splitState.isStreamSplitState()) {
                this.updatePositionForStreamSplit(element, splitState);
            }
            this.reportMetrics(element);
            this.emitElement(element, output);
        } else {
            LOG.info("Meet unknown element {}, just skip.", (Object)element);
        }
    }

    private void updatePositionForStreamSplit(SourceRecord element, SourceSplitState splitState) {
        BsonDocument resumeToken = MongoRecordUtils.getResumeToken(element);
        StreamSplitState streamSplitState = splitState.asStreamSplitState();
        ChangeStreamOffset offset = (ChangeStreamOffset)streamSplitState.getStartingOffset();
        if (offset != null) {
            offset.updatePosition(resumeToken);
        }
        splitState.asStreamSplitState().setStartingOffset((Offset)offset);
    }

    protected void reportMetrics(SourceRecord element) {
        long now = System.currentTimeMillis();
        this.sourceReaderMetrics.recordProcessTime(now);
        Long messageTimestamp = MongoRecordUtils.getMessageTimestamp(element);
        if (messageTimestamp != null && messageTimestamp > 0L) {
            Long fetchTimestamp = MongoRecordUtils.getFetchTimestamp(element);
            if (fetchTimestamp != null && fetchTimestamp >= messageTimestamp) {
                this.sourceReaderMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp);
            }
            this.sourceReaderMetrics.recordEmitDelay(now - messageTimestamp);
        }
    }
}

