/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.mongodb.source.dialect;

import com.mongodb.client.MongoClient;
import com.ververica.cdc.connectors.base.dialect.DataSourceDialect;
import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.mongodb.source.assigners.splitters.MongoDBChunkSplitter;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
import com.ververica.cdc.connectors.mongodb.source.offset.ChangeStreamDescriptor;
import com.ververica.cdc.connectors.mongodb.source.offset.ChangeStreamOffset;
import com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBFetchTaskContext;
import com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBScanFetchTask;
import com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask;
import com.ververica.cdc.connectors.mongodb.source.utils.CollectionDiscoveryUtils;
import com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Experimental;
import org.bson.BsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public class MongoDBDialect
implements DataSourceDialect<MongoDBSourceConfig> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBDialect.class);
    private final Map<MongoDBSourceConfig, CollectionDiscoveryUtils.CollectionDiscoveryInfo> cache = new ConcurrentHashMap<MongoDBSourceConfig, CollectionDiscoveryUtils.CollectionDiscoveryInfo>();

    public String getName() {
        return "MongoDB";
    }

    public List<TableId> discoverDataCollections(MongoDBSourceConfig sourceConfig) {
        CollectionDiscoveryUtils.CollectionDiscoveryInfo discoveryInfo = this.discoverAndCacheDataCollections(sourceConfig);
        return discoveryInfo.getDiscoveredCollections().stream().map(TableId::parse).collect(Collectors.toList());
    }

    public Map<TableId, TableChanges.TableChange> discoverDataCollectionSchemas(MongoDBSourceConfig sourceConfig) {
        List<TableId> discoveredCollections = this.discoverDataCollections(sourceConfig);
        HashMap<TableId, TableChanges.TableChange> schemas = new HashMap<TableId, TableChanges.TableChange>(discoveredCollections.size());
        for (TableId collectionId : discoveredCollections) {
            schemas.put(collectionId, MongoDBDialect.collectionSchema(collectionId));
        }
        return schemas;
    }

    private CollectionDiscoveryUtils.CollectionDiscoveryInfo discoverAndCacheDataCollections(MongoDBSourceConfig sourceConfig) {
        return this.cache.computeIfAbsent(sourceConfig, config -> {
            MongoClient mongoClient = MongoUtils.clientFor(sourceConfig);
            List<String> discoveredDatabases = CollectionDiscoveryUtils.databaseNames(mongoClient, CollectionDiscoveryUtils.databaseFilter(sourceConfig.getDatabaseList()));
            List<String> discoveredCollections = CollectionDiscoveryUtils.collectionNames(mongoClient, discoveredDatabases, CollectionDiscoveryUtils.collectionsFilter(sourceConfig.getCollectionList()));
            return new CollectionDiscoveryUtils.CollectionDiscoveryInfo(discoveredDatabases, discoveredCollections);
        });
    }

    public static TableChanges.TableChange collectionSchema(TableId tableId) {
        Table table = Table.editor().tableId(tableId).addColumn(Column.editor().name("_id").optional(false).create()).setPrimaryKeyNames(new String[]{"_id"}).create();
        return new TableChanges.TableChange(TableChanges.TableChangeType.CREATE, table);
    }

    public ChangeStreamOffset displayCurrentOffset(MongoDBSourceConfig sourceConfig) {
        MongoClient mongoClient = MongoUtils.clientFor(sourceConfig);
        BsonDocument startupResumeToken = MongoUtils.getLatestResumeToken(mongoClient, ChangeStreamDescriptor.deployment());
        ChangeStreamOffset changeStreamOffset = startupResumeToken != null ? new ChangeStreamOffset(startupResumeToken) : new ChangeStreamOffset(MongoUtils.getCurrentClusterTime(mongoClient));
        LOG.info("Current change stream offset : {}", (Object)changeStreamOffset);
        return changeStreamOffset;
    }

    public boolean isDataCollectionIdCaseSensitive(MongoDBSourceConfig sourceConfig) {
        return true;
    }

    public ChunkSplitter createChunkSplitter(MongoDBSourceConfig sourceConfig) {
        return new MongoDBChunkSplitter(sourceConfig);
    }

    public FetchTask<SourceSplitBase> createFetchTask(SourceSplitBase sourceSplitBase) {
        if (sourceSplitBase.isSnapshotSplit()) {
            return new MongoDBScanFetchTask(sourceSplitBase.asSnapshotSplit());
        }
        return new MongoDBStreamFetchTask(sourceSplitBase.asStreamSplit());
    }

    public MongoDBFetchTaskContext createFetchTaskContext(SourceSplitBase sourceSplitBase, MongoDBSourceConfig sourceConfig) {
        CollectionDiscoveryUtils.CollectionDiscoveryInfo discoveryInfo = this.discoverAndCacheDataCollections(sourceConfig);
        ChangeStreamDescriptor changeStreamDescriptor = MongoUtils.getChangeStreamDescriptor(sourceConfig, discoveryInfo.getDiscoveredDatabases(), discoveryInfo.getDiscoveredCollections());
        return new MongoDBFetchTaskContext(this, sourceConfig, changeStreamDescriptor);
    }
}

