/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase;
import org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class StatefulSinkWriterOperatorTest
extends SinkWriterOperatorTestBase {
    protected AbstractSinkWriterOperatorFactory createWriterOperator(TestSink sink) {
        return new StatefulSinkWriterOperatorFactory((Sink)sink);
    }

    @Test
    public void stateIsRestored() throws Exception {
        long initialTime = 0L;
        OneInputStreamOperatorTestHarness<Integer, String> testHarness = this.createTestHarness(TestSink.newBuilder().setWriter(new SnapshottingBufferingSinkWriter()).withWriterState().build());
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.processElement(1, 1L);
        testHarness.processElement(2, 2L);
        testHarness.prepareSnapshotPreBarrier(1L);
        OperatorSubtaskState snapshot = testHarness.snapshot(1L, 1L);
        Assert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])new Object[]{new Watermark(0L)}));
        testHarness.close();
        OneInputStreamOperatorTestHarness<Integer, String> restoredTestHarness = this.createTestHarness(TestSink.newBuilder().setWriter(new SnapshottingBufferingSinkWriter()).withWriterState().build());
        restoredTestHarness.initializeState(snapshot);
        restoredTestHarness.open();
        restoredTestHarness.endInput();
        Assert.assertThat(restoredTestHarness.getOutput(), (Matcher)Matchers.contains((Object[])new Object[]{new StreamRecord((Object)Tuple3.of((Object)1, (Object)1L, (Object)0L).toString()), new StreamRecord((Object)Tuple3.of((Object)2, (Object)2L, (Object)0L).toString())}));
    }

    @Test
    public void loadPreviousSinkState() throws Exception {
        List<String> previousSinkInputs = Arrays.asList("bit", "mention", "thick", "stick", "stir", "easy", "sleep", "forth", "cost", "prompt");
        OneInputStreamOperatorTestHarness<String, String> previousSink = new OneInputStreamOperatorTestHarness<String, String>((OneInputStreamOperator<String, String>)new DummySinkOperator(), (TypeSerializer<String>)StringSerializer.INSTANCE);
        OperatorSubtaskState previousSinkState = TestHarnessUtil.buildSubtaskState(previousSink, previousSinkInputs);
        OneInputStreamOperatorTestHarness<Integer, String> compatibleWriterOperator = this.createCompatibleSinkOperator();
        List expectedOutput1 = previousSinkInputs.stream().map(StreamRecord::new).collect(Collectors.toList());
        expectedOutput1.add(new StreamRecord((Object)Tuple3.of((Object)1, (Object)1, (Object)Long.MIN_VALUE).toString()));
        compatibleWriterOperator.initializeState(previousSinkState);
        compatibleWriterOperator.open();
        compatibleWriterOperator.processElement(1, 1L);
        compatibleWriterOperator.endInput();
        OperatorSubtaskState operatorStateWithoutPreviousState = compatibleWriterOperator.snapshot(1L, 1L);
        compatibleWriterOperator.close();
        Assert.assertThat(compatibleWriterOperator.getOutput(), (Matcher)Matchers.containsInAnyOrder((Object[])expectedOutput1.toArray()));
        OneInputStreamOperatorTestHarness<Integer, String> restoredSinkOperator = this.createCompatibleSinkOperator();
        List<StreamRecord> expectedOutput2 = Arrays.asList(new StreamRecord((Object)Tuple3.of((Object)2, (Object)2, (Object)Long.MIN_VALUE).toString()), new StreamRecord((Object)Tuple3.of((Object)3, (Object)3, (Object)Long.MIN_VALUE).toString()));
        restoredSinkOperator.initializeState(operatorStateWithoutPreviousState);
        restoredSinkOperator.open();
        restoredSinkOperator.processElement(2, 2L);
        restoredSinkOperator.processElement(3, 3L);
        restoredSinkOperator.endInput();
        Assert.assertThat(restoredSinkOperator.getOutput(), (Matcher)Matchers.containsInAnyOrder((Object[])expectedOutput2.toArray()));
    }

    private OneInputStreamOperatorTestHarness<Integer, String> createCompatibleSinkOperator() throws Exception {
        return new OneInputStreamOperatorTestHarness<Integer, String>((OneInputStreamOperatorFactory<Integer, String>)new StatefulSinkWriterOperatorFactory((Sink)TestSink.newBuilder().setWriter(new SnapshottingBufferingSinkWriter()).withWriterState().build(), "dummy_sink_state"), (TypeSerializer<Integer>)IntSerializer.INSTANCE);
    }

    static class DummySinkOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        static final String DUMMY_SINK_STATE_NAME = "dummy_sink_state";
        static final ListStateDescriptor<byte[]> SINK_STATE_DESC = new ListStateDescriptor("dummy_sink_state", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
        ListState<String> sinkState;

        DummySinkOperator() {
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            this.sinkState = new SimpleVersionedListState(context.getOperatorStateStore().getListState(SINK_STATE_DESC), (SimpleVersionedSerializer)TestSink.StringCommittableSerializer.INSTANCE);
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            this.sinkState.add(element.getValue());
        }
    }

    static class SnapshottingBufferingSinkWriter
    extends SinkWriterOperatorTestBase.BufferingSinkWriter {
        SnapshottingBufferingSinkWriter() {
        }

        @Override
        public List<String> snapshotState() {
            return this.elements;
        }

        @Override
        void restoredFrom(List<String> states) {
            this.elements = states;
        }
    }
}

