/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;

@Internal
final class StatefulSinkWriterOperator<InputT, CommT, WriterStateT>
extends AbstractSinkWriterOperator<InputT, CommT> {
    private static final ListStateDescriptor<byte[]> WRITER_RAW_STATES_DESC = new ListStateDescriptor("writer_raw_states", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
    private final Sink<InputT, CommT, WriterStateT, ?> sink;
    private final SimpleVersionedSerializer<WriterStateT> writerStateSimpleVersionedSerializer;
    @Nullable
    private final String previousSinkStateName;
    @Nullable
    private ListState<WriterStateT> previousSinkState;
    private ListState<WriterStateT> writerState;

    StatefulSinkWriterOperator(@Nullable String previousSinkStateName, ProcessingTimeService processingTimeService, Sink<InputT, CommT, WriterStateT, ?> sink, SimpleVersionedSerializer<WriterStateT> writerStateSimpleVersionedSerializer) {
        super(processingTimeService);
        this.sink = sink;
        this.writerStateSimpleVersionedSerializer = writerStateSimpleVersionedSerializer;
        this.previousSinkStateName = previousSinkStateName;
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        ListState rawState = context.getOperatorStateStore().getListState(WRITER_RAW_STATES_DESC);
        this.writerState = new SimpleVersionedListState<WriterStateT>((ListState<byte[]>)rawState, this.writerStateSimpleVersionedSerializer);
        if (this.previousSinkStateName != null) {
            ListStateDescriptor preSinkStateDesc = new ListStateDescriptor(this.previousSinkStateName, (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
            ListState preRawState = context.getOperatorStateStore().getListState(preSinkStateDesc);
            this.previousSinkState = new SimpleVersionedListState<WriterStateT>((ListState<byte[]>)preRawState, this.writerStateSimpleVersionedSerializer);
        }
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        this.writerState.update(this.sinkWriter.snapshotState());
        if (this.previousSinkState != null) {
            this.previousSinkState.clear();
        }
    }

    @Override
    SinkWriter<InputT, CommT, WriterStateT> createWriter() throws Exception {
        List writerStates = CollectionUtil.iterableToList((Iterable)((Iterable)this.writerState.get()));
        ArrayList states = new ArrayList(writerStates);
        if (this.previousSinkStateName != null) {
            Preconditions.checkNotNull(this.previousSinkState);
            List previousSinkStates = CollectionUtil.iterableToList((Iterable)((Iterable)this.previousSinkState.get()));
            states.addAll(previousSinkStates);
        }
        return this.sink.createWriter(this.createInitContext(), states);
    }
}

