/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.mysql.debezium.task.context;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.debezium.EmbeddedFlinkDatabaseHistory;
import com.ververica.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl;
import com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlTaskContextImpl;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.mysql.MySqlChangeEventSourceMetricsFactory;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlDatabaseSchema;
import io.debezium.connector.mysql.MySqlErrorHandler;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.connector.mysql.MySqlTopicSelector;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.metrics.SnapshotChangeEventSourceMetrics;
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.TopicSelector;
import io.debezium.util.Clock;
import io.debezium.util.Collect;
import io.debezium.util.SchemaNameAdjuster;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StatefulTaskContext {
    private static final Logger LOG = LoggerFactory.getLogger(StatefulTaskContext.class);
    private static final Clock clock = Clock.SYSTEM;
    private final MySqlSourceConfig sourceConfig;
    private final MySqlConnectorConfig connectorConfig;
    private final MySqlEventMetadataProvider metadataProvider;
    private final SchemaNameAdjuster schemaNameAdjuster;
    private final MySqlConnection connection;
    private final BinaryLogClient binaryLogClient;
    private MySqlDatabaseSchema databaseSchema;
    private MySqlTaskContextImpl taskContext;
    private MySqlOffsetContext offsetContext;
    private TopicSelector<TableId> topicSelector;
    private SnapshotChangeEventSourceMetrics snapshotChangeEventSourceMetrics;
    private StreamingChangeEventSourceMetrics streamingChangeEventSourceMetrics;
    private EventDispatcherImpl<TableId> dispatcher;
    private ChangeEventQueue<DataChangeEvent> queue;
    private ErrorHandler errorHandler;

    public StatefulTaskContext(MySqlSourceConfig sourceConfig, BinaryLogClient binaryLogClient, MySqlConnection connection) {
        this.sourceConfig = sourceConfig;
        this.connectorConfig = sourceConfig.getMySqlConnectorConfig();
        this.schemaNameAdjuster = SchemaNameAdjuster.create();
        this.metadataProvider = new MySqlEventMetadataProvider();
        this.binaryLogClient = binaryLogClient;
        this.connection = connection;
    }

    public void configure(MySqlSplit mySqlSplit) {
        boolean tableIdCaseInsensitive = this.connection.isTableIdCaseSensitive();
        this.topicSelector = MySqlTopicSelector.defaultSelector((MySqlConnectorConfig)this.connectorConfig);
        EmbeddedFlinkDatabaseHistory.registerHistory(this.sourceConfig.getDbzConfiguration().getString("database.history.instance.name"), mySqlSplit.getTableSchemas().values());
        this.databaseSchema = DebeziumUtils.createMySqlDatabaseSchema(this.connectorConfig, tableIdCaseInsensitive);
        this.offsetContext = this.loadStartingOffsetState((OffsetContext.Loader)new MySqlOffsetContext.Loader(this.connectorConfig), mySqlSplit);
        this.validateAndLoadDatabaseHistory(this.offsetContext, this.databaseSchema);
        this.taskContext = new MySqlTaskContextImpl(this.connectorConfig, this.databaseSchema, this.binaryLogClient);
        int queueSize = mySqlSplit.isSnapshotSplit() ? Integer.MAX_VALUE : this.connectorConfig.getMaxQueueSize();
        this.queue = new ChangeEventQueue.Builder().pollInterval(this.connectorConfig.getPollInterval()).maxBatchSize(this.connectorConfig.getMaxBatchSize()).maxQueueSize(queueSize).maxQueueSizeInBytes(this.connectorConfig.getMaxQueueSizeInBytes()).loggingContextSupplier(() -> this.taskContext.configureLoggingContext("mysql-cdc-connector-task")).build();
        this.dispatcher = new EventDispatcherImpl<TableId>((CommonConnectorConfig)this.connectorConfig, this.topicSelector, (DatabaseSchema<TableId>)this.databaseSchema, this.queue, (DataCollectionFilters.DataCollectionFilter<TableId>)this.connectorConfig.getTableFilters().dataCollectionFilter(), DataChangeEvent::new, this.metadataProvider, this.schemaNameAdjuster);
        MySqlChangeEventSourceMetricsFactory changeEventSourceMetricsFactory = new MySqlChangeEventSourceMetricsFactory(new MySqlStreamingChangeEventSourceMetrics((MySqlTaskContext)this.taskContext, this.queue, (EventMetadataProvider)this.metadataProvider));
        this.snapshotChangeEventSourceMetrics = changeEventSourceMetricsFactory.getSnapshotMetrics((CdcSourceTaskContext)this.taskContext, this.queue, (EventMetadataProvider)this.metadataProvider);
        this.streamingChangeEventSourceMetrics = changeEventSourceMetricsFactory.getStreamingMetrics((CdcSourceTaskContext)this.taskContext, this.queue, (EventMetadataProvider)this.metadataProvider);
        this.errorHandler = new MySqlErrorHandler(this.connectorConfig.getLogicalName(), this.queue);
    }

    private void validateAndLoadDatabaseHistory(MySqlOffsetContext offset, MySqlDatabaseSchema schema) {
        schema.initializeStorage();
        schema.recover((OffsetContext)offset);
    }

    private MySqlOffsetContext loadStartingOffsetState(OffsetContext.Loader loader, MySqlSplit mySqlSplit) {
        BinlogOffset offset = mySqlSplit.isSnapshotSplit() ? BinlogOffset.INITIAL_OFFSET : mySqlSplit.asBinlogSplit().getStartingOffset();
        MySqlOffsetContext mySqlOffsetContext = (MySqlOffsetContext)loader.load(offset.getOffset());
        if (!this.isBinlogAvailable(mySqlOffsetContext)) {
            throw new IllegalStateException("The connector is trying to read binlog starting at " + mySqlOffsetContext.getSourceInfo() + ", but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.");
        }
        return mySqlOffsetContext;
    }

    private boolean isBinlogAvailable(MySqlOffsetContext offset) {
        String binlogFilename = offset.getSourceInfo().getString("file");
        if (binlogFilename == null) {
            return true;
        }
        if (binlogFilename.equals("")) {
            return true;
        }
        List logNames = this.connection.availableBinlogFiles();
        boolean found = logNames.stream().anyMatch(binlogFilename::equals);
        if (!found) {
            LOG.info("Connector requires binlog file '{}', but MySQL only has {}", (Object)binlogFilename, (Object)String.join((CharSequence)", ", logNames));
        } else {
            LOG.info("MySQL has the binlog file '{}' required by the connector", (Object)binlogFilename);
        }
        return found;
    }

    public static Clock getClock() {
        return clock;
    }

    public MySqlSourceConfig getSourceConfig() {
        return this.sourceConfig;
    }

    public MySqlConnectorConfig getConnectorConfig() {
        return this.connectorConfig;
    }

    public MySqlConnection getConnection() {
        return this.connection;
    }

    public BinaryLogClient getBinaryLogClient() {
        return this.binaryLogClient;
    }

    public MySqlDatabaseSchema getDatabaseSchema() {
        return this.databaseSchema;
    }

    public MySqlTaskContextImpl getTaskContext() {
        return this.taskContext;
    }

    public EventDispatcherImpl<TableId> getDispatcher() {
        return this.dispatcher;
    }

    public ChangeEventQueue<DataChangeEvent> getQueue() {
        return this.queue;
    }

    public ErrorHandler getErrorHandler() {
        return this.errorHandler;
    }

    public MySqlOffsetContext getOffsetContext() {
        return this.offsetContext;
    }

    public TopicSelector<TableId> getTopicSelector() {
        return this.topicSelector;
    }

    public SnapshotChangeEventSourceMetrics getSnapshotChangeEventSourceMetrics() {
        this.snapshotChangeEventSourceMetrics.reset();
        return this.snapshotChangeEventSourceMetrics;
    }

    public StreamingChangeEventSourceMetrics getStreamingChangeEventSourceMetrics() {
        this.streamingChangeEventSourceMetrics.reset();
        return this.streamingChangeEventSourceMetrics;
    }

    public SchemaNameAdjuster getSchemaNameAdjuster() {
        return this.schemaNameAdjuster;
    }

    public static class MySqlEventMetadataProvider
    implements EventMetadataProvider {
        public static final String SERVER_ID_KEY = "server_id";
        public static final String GTID_KEY = "gtid";
        public static final String BINLOG_FILENAME_OFFSET_KEY = "file";
        public static final String BINLOG_POSITION_OFFSET_KEY = "pos";
        public static final String BINLOG_ROW_IN_EVENT_OFFSET_KEY = "row";
        public static final String THREAD_KEY = "thread";
        public static final String QUERY_KEY = "query";

        public Instant getEventTimestamp(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
            if (value == null) {
                return null;
            }
            Struct sourceInfo = value.getStruct("source");
            if (source == null) {
                return null;
            }
            Long timestamp = sourceInfo.getInt64("ts_ms");
            return timestamp == null ? null : Instant.ofEpochMilli(timestamp);
        }

        public Map<String, String> getEventSourcePosition(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
            if (value == null) {
                return null;
            }
            Struct sourceInfo = value.getStruct("source");
            if (source == null) {
                return null;
            }
            return Collect.hashMapOf((Object)BINLOG_FILENAME_OFFSET_KEY, (Object)sourceInfo.getString(BINLOG_FILENAME_OFFSET_KEY), (Object)BINLOG_POSITION_OFFSET_KEY, (Object)Long.toString(sourceInfo.getInt64(BINLOG_POSITION_OFFSET_KEY)), (Object)BINLOG_ROW_IN_EVENT_OFFSET_KEY, (Object)Integer.toString(sourceInfo.getInt32(BINLOG_ROW_IN_EVENT_OFFSET_KEY)));
        }

        public String getTransactionId(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
            return ((MySqlOffsetContext)offset).getTransactionId();
        }
    }
}

