/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.util.Arrays;
import java.util.Collection;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.runtime.operators.sink.BatchCommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.BatchGlobalCommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.StatelessSinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.StreamingCommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.StreamingGlobalCommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class SinkTransformationTranslatorTest
extends TestLogger {
    @Parameterized.Parameter
    public RuntimeExecutionMode runtimeExecutionMode;
    @Parameterized.Parameter(value=1)
    public Class<?> committerClass;
    @Parameterized.Parameter(value=2)
    public Class<?> globalCommitterClass;
    static final String NAME = "FileSink";
    static final String SLOT_SHARE_GROUP = "FileGroup";
    static final String UID = "FileUid";
    static final int PARALLELISM = 2;

    @Parameterized.Parameters(name="Execution Mode: {0}, Expected Committer Operator: {1}, Expected Global Committer Operator: {2}")
    public static Collection<Object[]> data() {
        return Arrays.asList({RuntimeExecutionMode.STREAMING, StreamingCommitterOperatorFactory.class, StreamingGlobalCommitterOperatorFactory.class}, {RuntimeExecutionMode.BATCH, BatchCommitterOperatorFactory.class, BatchGlobalCommitterOperatorFactory.class});
    }

    @Test
    public void generateWriterTopology() {
        StreamGraph streamGraph = this.buildGraph(TestSink.newBuilder().build(), this.runtimeExecutionMode);
        StreamNode sourceNode = this.findNodeNameContains(streamGraph, "Source");
        StreamNode writerNode = this.findNodeNameContains(streamGraph, "Writer");
        MatcherAssert.assertThat((Object)streamGraph.getStreamNodes().size(), (Matcher)CoreMatchers.equalTo((Object)2));
        this.validateTopology(sourceNode, IntSerializer.class, writerNode, String.format("Sink Writer: %s", NAME), UID, StatelessSinkWriterOperatorFactory.class, 2, -1);
    }

    @Test
    public void generateWriterCommitterTopology() {
        StreamGraph streamGraph = this.buildGraph(TestSink.newBuilder().setDefaultCommitter().build(), this.runtimeExecutionMode);
        StreamNode writerNode = this.findNodeNameContains(streamGraph, "Writer");
        StreamNode committerNode = this.findNodeNameContains(streamGraph, "Committer");
        MatcherAssert.assertThat((Object)streamGraph.getStreamNodes().size(), (Matcher)CoreMatchers.equalTo((Object)3));
        this.validateTopology(writerNode, SimpleVersionedSerializerTypeSerializerProxy.class, committerNode, String.format("Sink Committer: %s", NAME), String.format("Sink Committer: %s", UID), this.committerClass, this.runtimeExecutionMode == RuntimeExecutionMode.STREAMING ? 2 : 1, this.runtimeExecutionMode == RuntimeExecutionMode.STREAMING ? -1 : 1);
    }

    @Test
    public void generateWriterCommitterGlobalCommitterTopology() {
        StreamGraph streamGraph = this.buildGraph(TestSink.newBuilder().setDefaultCommitter().setDefaultGlobalCommitter().build(), this.runtimeExecutionMode);
        StreamNode committerNode = this.findNodeNameContains(streamGraph, "Committer");
        StreamNode globalCommitterNode = this.findNodeNameContains(streamGraph, "Global Committer");
        this.validateTopology(committerNode, SimpleVersionedSerializerTypeSerializerProxy.class, globalCommitterNode, String.format("Sink Global Committer: %s", NAME), String.format("Sink Global Committer: %s", UID), this.globalCommitterClass, 1, 1);
    }

    @Test
    public void generateWriterGlobalCommitterTopology() {
        StreamGraph streamGraph = this.buildGraph(TestSink.newBuilder().setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE).setDefaultGlobalCommitter().build(), this.runtimeExecutionMode);
        StreamNode writerNode = this.findNodeNameContains(streamGraph, "Writer");
        StreamNode globalCommitterNode = this.findNodeNameContains(streamGraph, "Global Committer");
        this.validateTopology(writerNode, SimpleVersionedSerializerTypeSerializerProxy.class, globalCommitterNode, String.format("Sink Global Committer: %s", NAME), String.format("Sink Global Committer: %s", UID), this.globalCommitterClass, 1, 1);
    }

    @Test(expected=IllegalStateException.class)
    public void throwExceptionWithoutSettingUid() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration config = new Configuration();
        config.set(ExecutionOptions.RUNTIME_MODE, (Object)this.runtimeExecutionMode);
        env.configure((ReadableConfig)config, ((Object)((Object)this)).getClass().getClassLoader());
        env.getConfig().disableAutoGeneratedUIDs();
        env.fromElements((Object[])new Integer[]{1, 2}).sinkTo((Sink)TestSink.newBuilder().build());
        env.getStreamGraph();
    }

    @Test
    public void disableOperatorChain() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource src = env.fromElements((Object[])new Integer[]{1, 2});
        DataStreamSink dataStreamSink = src.sinkTo((Sink)TestSink.newBuilder().setDefaultCommitter().setDefaultGlobalCommitter().build());
        dataStreamSink.disableChaining();
        StreamGraph streamGraph = env.getStreamGraph();
        StreamNode writer = this.findNodeNameContains(streamGraph, "Writer");
        StreamNode committer = this.findNodeNameContains(streamGraph, "Committer");
        StreamNode globalCommitter = this.findNodeNameContains(streamGraph, "Global Committer");
        MatcherAssert.assertThat((Object)writer.getOperatorFactory().getChainingStrategy(), (Matcher)CoreMatchers.is((Object)ChainingStrategy.NEVER));
        MatcherAssert.assertThat((Object)committer.getOperatorFactory().getChainingStrategy(), (Matcher)CoreMatchers.is((Object)ChainingStrategy.ALWAYS));
        MatcherAssert.assertThat((Object)globalCommitter.getOperatorFactory().getChainingStrategy(), (Matcher)CoreMatchers.is((Object)ChainingStrategy.ALWAYS));
    }

    private void validateTopology(StreamNode src, Class<?> srcOutTypeInfo, StreamNode dest, String name, String uid, Class<?> expectedOperatorFactory, int expectedParallelism, int expectedMaxParallelism) {
        StreamEdge srcOutEdge = (StreamEdge)src.getOutEdges().get(0);
        MatcherAssert.assertThat((Object)srcOutEdge.getTargetId(), (Matcher)CoreMatchers.equalTo((Object)dest.getId()));
        MatcherAssert.assertThat((Object)src.getTypeSerializerOut(), (Matcher)CoreMatchers.instanceOf(srcOutTypeInfo));
        StreamEdge destInputEdge = (StreamEdge)dest.getInEdges().get(0);
        MatcherAssert.assertThat((Object)destInputEdge.getSourceId(), (Matcher)CoreMatchers.equalTo((Object)src.getId()));
        MatcherAssert.assertThat((Object)dest.getTypeSerializersIn()[0], (Matcher)CoreMatchers.instanceOf(srcOutTypeInfo));
        MatcherAssert.assertThat((Object)dest.getOperatorName(), (Matcher)CoreMatchers.equalTo((Object)name));
        MatcherAssert.assertThat((Object)dest.getTransformationUID(), (Matcher)CoreMatchers.equalTo((Object)uid));
        MatcherAssert.assertThat((Object)dest.getOperatorFactory(), (Matcher)CoreMatchers.instanceOf(expectedOperatorFactory));
        MatcherAssert.assertThat((Object)dest.getParallelism(), (Matcher)CoreMatchers.equalTo((Object)expectedParallelism));
        MatcherAssert.assertThat((Object)dest.getMaxParallelism(), (Matcher)CoreMatchers.equalTo((Object)expectedMaxParallelism));
        MatcherAssert.assertThat((Object)dest.getOperatorFactory().getChainingStrategy(), (Matcher)CoreMatchers.is((Object)ChainingStrategy.ALWAYS));
        MatcherAssert.assertThat((Object)dest.getSlotSharingGroup(), (Matcher)CoreMatchers.equalTo((Object)SLOT_SHARE_GROUP));
        MatcherAssert.assertThat((Object)dest.getOutEdges().size(), (Matcher)CoreMatchers.equalTo((Object)0));
    }

    private StreamGraph buildGraph(TestSink sink, RuntimeExecutionMode runtimeExecutionMode) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration config = new Configuration();
        config.set(ExecutionOptions.RUNTIME_MODE, (Object)runtimeExecutionMode);
        env.configure((ReadableConfig)config, ((Object)((Object)this)).getClass().getClassLoader());
        DataStreamSource src = env.fromElements((Object[])new Integer[]{1, 2});
        DataStreamSink dataStreamSink = src.rebalance().sinkTo((Sink)sink);
        this.setSinkProperty((DataStreamSink<Integer>)dataStreamSink);
        return env.getStreamGraph("test");
    }

    private void setSinkProperty(DataStreamSink<Integer> dataStreamSink) {
        dataStreamSink.name(NAME);
        dataStreamSink.uid(UID);
        dataStreamSink.setParallelism(2);
        dataStreamSink.slotSharingGroup(SLOT_SHARE_GROUP);
    }

    private StreamNode findNodeNameContains(StreamGraph streamGraph, String nodeName) {
        return streamGraph.getStreamNodes().stream().filter(x -> x.getOperatorName().contains(nodeName)).findFirst().orElseThrow(() -> new IllegalStateException("Can not find the node contains " + nodeName));
    }
}

