/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.common;

import java.util.List;
import java.util.Optional;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.connectors.TransformationScanProvider;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.utils.TransformationMetadata;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

public abstract class CommonExecTableSourceScan
extends ExecNodeBase<RowData>
implements MultipleTransformationTranslator<RowData> {
    public static final String SOURCE_TRANSFORMATION = "source";
    public static final String FIELD_NAME_SCAN_TABLE_SOURCE = "scanTableSource";
    @JsonProperty(value="scanTableSource")
    private final DynamicTableSourceSpec tableSourceSpec;

    protected CommonExecTableSourceScan(int id, ExecNodeContext context, ReadableConfig persistedConfig, DynamicTableSourceSpec tableSourceSpec, List<InputProperty> inputProperties, LogicalType outputType, String description) {
        super(id, context, persistedConfig, inputProperties, outputType, description);
        this.tableSourceSpec = tableSourceSpec;
    }

    @Override
    public String getSimplifiedName() {
        return this.tableSourceSpec.getContextResolvedTable().getIdentifier().getObjectName();
    }

    public DynamicTableSourceSpec getTableSourceSpec() {
        return this.tableSourceSpec;
    }

    @Override
    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner, ExecNodeConfig config) {
        StreamExecutionEnvironment env = planner.getExecEnv();
        TransformationMetadata meta = this.createTransformationMeta(SOURCE_TRANSFORMATION, config);
        InternalTypeInfo outputTypeInfo = InternalTypeInfo.of((RowType)((RowType)this.getOutputType()));
        ScanTableSource tableSource = this.tableSourceSpec.getScanTableSource(planner.getFlinkContext(), ShortcutUtils.unwrapTypeFactory(planner));
        ScanTableSource.ScanRuntimeProvider provider = tableSource.getScanRuntimeProvider((ScanTableSource.ScanContext)ScanRuntimeProviderContext.INSTANCE);
        if (provider instanceof SourceFunctionProvider) {
            SourceFunctionProvider sourceFunctionProvider = (SourceFunctionProvider)provider;
            SourceFunction function = sourceFunctionProvider.createSourceFunction();
            Transformation<RowData> transformation = this.createSourceFunctionTransformation(env, (SourceFunction<RowData>)function, sourceFunctionProvider.isBounded(), meta.getName(), (TypeInformation<RowData>)outputTypeInfo);
            return meta.fill(transformation);
        }
        if (provider instanceof InputFormatProvider) {
            InputFormat inputFormat = ((InputFormatProvider)provider).createInputFormat();
            Transformation<RowData> transformation = this.createInputFormatTransformation(env, inputFormat, (InternalTypeInfo<RowData>)outputTypeInfo, meta.getName());
            return meta.fill(transformation);
        }
        if (provider instanceof SourceProvider) {
            Source source = ((SourceProvider)provider).createSource();
            Transformation transformation = env.fromSource(source, WatermarkStrategy.noWatermarks(), meta.getName(), (TypeInformation)outputTypeInfo).getTransformation();
            return meta.fill(transformation);
        }
        if (provider instanceof DataStreamScanProvider) {
            Transformation transformation = ((DataStreamScanProvider)provider).produceDataStream(this.createProviderContext(config), env).getTransformation();
            meta.fill(transformation);
            transformation.setOutputType((TypeInformation)outputTypeInfo);
            return transformation;
        }
        if (provider instanceof TransformationScanProvider) {
            Transformation<RowData> transformation = ((TransformationScanProvider)provider).createTransformation(this.createProviderContext(config));
            meta.fill(transformation);
            transformation.setOutputType((TypeInformation)outputTypeInfo);
            return transformation;
        }
        throw new UnsupportedOperationException(provider.getClass().getSimpleName() + " is unsupported now.");
    }

    private ProviderContext createProviderContext(ExecNodeConfig config) {
        return name -> {
            if (this instanceof StreamExecNode && config.shouldSetUid()) {
                return Optional.of(this.createTransformationUid(name, config));
            }
            return Optional.empty();
        };
    }

    protected Transformation<RowData> createSourceFunctionTransformation(StreamExecutionEnvironment env, SourceFunction<RowData> function, boolean isBounded, String operatorName, TypeInformation<RowData> outputTypeInfo) {
        int parallelism;
        env.clean(function);
        boolean parallelismConfigured = false;
        if (function instanceof ParallelSourceFunction) {
            parallelism = env.getParallelism();
        } else {
            parallelism = 1;
            parallelismConfigured = true;
        }
        Boundedness boundedness = isBounded ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED;
        StreamSource sourceOperator = new StreamSource(function, !isBounded);
        return new LegacySourceTransformation(operatorName, sourceOperator, outputTypeInfo, parallelism, boundedness, parallelismConfigured);
    }

    protected abstract Transformation<RowData> createInputFormatTransformation(StreamExecutionEnvironment var1, InputFormat<RowData, ?> var2, InternalTypeInfo<RowData> var3, String var4);
}

