/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import javax.annotation.Nullable;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator;
import org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterOperator;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;

public final class StatefulSinkWriterOperatorFactory<InputT, CommT, WriterStateT>
extends AbstractSinkWriterOperatorFactory<InputT, CommT> {
    private final Sink<InputT, CommT, WriterStateT, ?> sink;
    @Nullable
    private final String previousSinkStateName;

    public StatefulSinkWriterOperatorFactory(Sink<InputT, CommT, WriterStateT, ?> sink) {
        this(sink, null);
    }

    public StatefulSinkWriterOperatorFactory(Sink<InputT, CommT, WriterStateT, ?> sink, @Nullable String previousSinkStateName) {
        this.sink = sink;
        this.previousSinkStateName = previousSinkStateName;
    }

    @Override
    AbstractSinkWriterOperator<InputT, CommT> createWriterOperator(ProcessingTimeService processingTimeService) {
        return new StatefulSinkWriterOperator<InputT, CommT, WriterStateT>(this.previousSinkStateName, processingTimeService, this.sink, (SimpleVersionedSerializer)this.sink.getWriterStateSerializer().get());
    }

    @Override
    public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
        return StatefulSinkWriterOperator.class;
    }
}

