/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.Collections;
import java.util.List;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecExchange;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

@ExecNodeMetadata(name="stream-exec-exchange", version=1, producedTransformations={"exchange"}, minPlanVersion=FlinkVersion.v1_15, minStateVersion=FlinkVersion.v1_15)
public class StreamExecExchange
extends CommonExecExchange
implements StreamExecNode<RowData> {
    public static final String EXCHANGE_TRANSFORMATION = "exchange";

    public StreamExecExchange(ReadableConfig tableConfig, InputProperty inputProperty, RowType outputType, String description) {
        this(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecExchange.class), ExecNodeContext.newPersistedConfig(StreamExecExchange.class, tableConfig), Collections.singletonList(inputProperty), outputType, description);
    }

    @JsonCreator
    public StreamExecExchange(@JsonProperty(value="id") int id, @JsonProperty(value="type") ExecNodeContext context, @JsonProperty(value="configuration") ReadableConfig persistedConfig, @JsonProperty(value="inputProperties") List<InputProperty> inputProperties, @JsonProperty(value="outputType") RowType outputType, @JsonProperty(value="description") String description) {
        super(id, context, persistedConfig, inputProperties, outputType, description);
        Preconditions.checkArgument((inputProperties.size() == 1 ? 1 : 0) != 0);
    }

    @Override
    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner, ExecNodeConfig config) {
        int parallelism;
        GlobalPartitioner partitioner;
        Transformation<?> inputTransform = this.getInputEdges().get(0).translateToPlan(planner);
        InputProperty inputProperty = this.getInputProperties().get(0);
        InputProperty.DistributionType distributionType = inputProperty.getRequiredDistribution().getType();
        switch (distributionType) {
            case SINGLETON: {
                partitioner = new GlobalPartitioner();
                parallelism = 1;
                break;
            }
            case HASH: {
                int[] keys = ((InputProperty.HashDistribution)inputProperty.getRequiredDistribution()).getKeys();
                InternalTypeInfo inputType = (InternalTypeInfo)inputTransform.getOutputType();
                RowDataKeySelector keySelector = KeySelectorUtil.getRowDataSelector(planner.getFlinkContext().getClassLoader(), keys, (InternalTypeInfo<RowData>)inputType);
                partitioner = new KeyGroupStreamPartitioner((KeySelector)keySelector, 128);
                parallelism = -1;
                break;
            }
            default: {
                throw new TableException(String.format("%s is not supported now!", new Object[]{distributionType}));
            }
        }
        PartitionTransformation transformation = new PartitionTransformation(inputTransform, (StreamPartitioner)partitioner);
        this.createTransformationMeta(EXCHANGE_TRANSFORMATION, config).fill(transformation);
        transformation.setParallelism(parallelism);
        transformation.setOutputType((TypeInformation)InternalTypeInfo.of((LogicalType)this.getOutputType()));
        return transformation;
    }
}

