/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public abstract class SinkWriterOperatorTestBase
extends TestLogger {
    protected abstract AbstractSinkWriterOperatorFactory<Integer, String> createWriterOperator(TestSink var1);

    @Test
    public void nonBufferingWriterEmitsWithoutFlush() throws Exception {
        long initialTime = 0L;
        OneInputStreamOperatorTestHarness<Integer, String> testHarness = this.createTestHarness(TestSink.newBuilder().setWriter(new TestSink.DefaultSinkWriter()).withWriterState().build());
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.processElement(1, 1L);
        testHarness.processElement(2, 2L);
        testHarness.prepareSnapshotPreBarrier(1L);
        testHarness.snapshot(1L, 1L);
        Assert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])new Object[]{new Watermark(0L), new StreamRecord((Object)Tuple3.of((Object)1, (Object)1L, (Object)0L).toString()), new StreamRecord((Object)Tuple3.of((Object)2, (Object)2L, (Object)0L).toString())}));
    }

    @Test
    public void nonBufferingWriterEmitsOnFlush() throws Exception {
        long initialTime = 0L;
        OneInputStreamOperatorTestHarness<Integer, String> testHarness = this.createTestHarness(TestSink.newBuilder().setWriter(new TestSink.DefaultSinkWriter()).withWriterState().build());
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.processElement(1, 1L);
        testHarness.processElement(2, 2L);
        testHarness.endInput();
        Assert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])new Object[]{new Watermark(0L), new StreamRecord((Object)Tuple3.of((Object)1, (Object)1L, (Object)0L).toString()), new StreamRecord((Object)Tuple3.of((Object)2, (Object)2L, (Object)0L).toString())}));
    }

    @Test
    public void bufferingWriterDoesNotEmitWithoutFlush() throws Exception {
        long initialTime = 0L;
        OneInputStreamOperatorTestHarness<Integer, String> testHarness = this.createTestHarness(TestSink.newBuilder().setWriter(new BufferingSinkWriter()).withWriterState().build());
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.processElement(1, 1L);
        testHarness.processElement(2, 2L);
        testHarness.prepareSnapshotPreBarrier(1L);
        testHarness.snapshot(1L, 1L);
        Assert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])new Object[]{new Watermark(0L)}));
    }

    @Test
    public void bufferingWriterEmitsOnFlush() throws Exception {
        long initialTime = 0L;
        OneInputStreamOperatorTestHarness<Integer, String> testHarness = this.createTestHarness(TestSink.newBuilder().setWriter(new BufferingSinkWriter()).withWriterState().build());
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.processElement(1, 1L);
        testHarness.processElement(2, 2L);
        testHarness.endInput();
        Assert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])new Object[]{new Watermark(0L), new StreamRecord((Object)Tuple3.of((Object)1, (Object)1L, (Object)0L).toString()), new StreamRecord((Object)Tuple3.of((Object)2, (Object)2L, (Object)0L).toString())}));
    }

    @Test
    public void timeBasedBufferingSinkWriter() throws Exception {
        long initialTime = 0L;
        OneInputStreamOperatorTestHarness<Integer, String> testHarness = this.createTestHarness(TestSink.newBuilder().setWriter(new TimeBasedBufferingSinkWriter()).withWriterState().build());
        testHarness.open();
        testHarness.setProcessingTime(0L);
        testHarness.processElement(1, 1L);
        testHarness.processElement(2, 2L);
        testHarness.prepareSnapshotPreBarrier(1L);
        Assert.assertThat((Object)testHarness.getOutput().size(), (Matcher)Matchers.equalTo((Object)0));
        testHarness.getProcessingTimeService().setCurrentTime(2001L);
        testHarness.prepareSnapshotPreBarrier(2L);
        testHarness.endInput();
        Assert.assertThat(testHarness.getOutput(), (Matcher)Matchers.contains((Object[])new Object[]{new StreamRecord((Object)Tuple3.of((Object)1, (Object)1L, (Object)Long.MIN_VALUE).toString()), new StreamRecord((Object)Tuple3.of((Object)2, (Object)2L, (Object)Long.MIN_VALUE).toString())}));
    }

    @Test
    public void testSinkWriterIsClosedWhenDisposed() throws Exception {
        ClosingSinkWriter sinkWriter = new ClosingSinkWriter();
        OneInputStreamOperatorTestHarness<Integer, String> testHarness = this.createTestHarness(TestSink.newBuilder().withWriterState().setWriter(sinkWriter).build());
        testHarness.open();
        Assert.assertFalse((boolean)sinkWriter.closed);
        testHarness.dispose();
        Assert.assertTrue((boolean)sinkWriter.closed);
    }

    protected OneInputStreamOperatorTestHarness<Integer, String> createTestHarness(TestSink sink) throws Exception {
        return new OneInputStreamOperatorTestHarness<Integer, String>((OneInputStreamOperatorFactory<Integer, String>)this.createWriterOperator(sink), (TypeSerializer<Integer>)IntSerializer.INSTANCE);
    }

    private static class ClosingSinkWriter
    extends TestSink.DefaultSinkWriter {
        boolean closed = false;

        private ClosingSinkWriter() {
        }

        @Override
        public void close() throws Exception {
            super.close();
            this.closed = true;
        }
    }

    static class TimeBasedBufferingSinkWriter
    extends TestSink.DefaultSinkWriter
    implements Sink.ProcessingTimeService.ProcessingTimeCallback {
        private final List<String> cachedCommittables = new ArrayList<String>();

        TimeBasedBufferingSinkWriter() {
        }

        @Override
        public void write(Integer element, SinkWriter.Context context) {
            this.cachedCommittables.add(Tuple3.of((Object)element, (Object)context.timestamp(), (Object)context.currentWatermark()).toString());
        }

        @Override
        void setProcessingTimerService(Sink.ProcessingTimeService processingTimerService) {
            super.setProcessingTimerService(processingTimerService);
            this.processingTimerService.registerProcessingTimer(1000L, (Sink.ProcessingTimeService.ProcessingTimeCallback)this);
        }

        public void onProcessingTime(long time) throws IOException {
            this.elements.addAll(this.cachedCommittables);
            this.cachedCommittables.clear();
            this.processingTimerService.registerProcessingTimer(time + 1000L, (Sink.ProcessingTimeService.ProcessingTimeCallback)this);
        }
    }

    static class BufferingSinkWriter
    extends TestSink.DefaultSinkWriter {
        BufferingSinkWriter() {
        }

        @Override
        public List<String> prepareCommit(boolean flush) {
            if (!flush) {
                return Collections.emptyList();
            }
            List result = this.elements;
            this.elements = new ArrayList();
            return result;
        }
    }
}

