/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.TestSingleInputGate;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;

public class StreamTestSingleInputGate<T>
extends TestSingleInputGate {
    private final int numInputChannels;
    private final TestInputChannel[] inputChannels;
    private final int bufferSize;
    private TypeSerializer<T> serializer;
    private ConcurrentLinkedQueue<InputValue<Object>>[] inputQueues;

    public StreamTestSingleInputGate(int numInputChannels, int gateIndex, TypeSerializer<T> serializer, int bufferSize) {
        super(numInputChannels, gateIndex, false);
        this.bufferSize = bufferSize;
        this.serializer = serializer;
        this.numInputChannels = numInputChannels;
        this.inputChannels = new TestInputChannel[numInputChannels];
        this.inputQueues = new ConcurrentLinkedQueue[numInputChannels];
        this.setupInputChannels();
    }

    private void setupInputChannels() {
        for (int i = 0; i < this.numInputChannels; ++i) {
            int channelIndex = i;
            DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128);
            SerializationDelegate delegate = new SerializationDelegate((TypeSerializer)new StreamElementSerializer(this.serializer));
            this.inputQueues[channelIndex] = new ConcurrentLinkedQueue();
            this.inputChannels[channelIndex] = new TestInputChannel(this.inputGate, i);
            TestInputChannel.BufferAndAvailabilityProvider answer = () -> {
                Buffer.DataType nextType;
                InputValue<Object> input;
                ConcurrentLinkedQueue<InputValue<Object>> inputQueue;
                ConcurrentLinkedQueue<InputValue<Object>> concurrentLinkedQueue = inputQueue = this.inputQueues[channelIndex];
                synchronized (concurrentLinkedQueue) {
                    input = inputQueue.poll();
                    nextType = !inputQueue.isEmpty() ? Buffer.DataType.DATA_BUFFER : Buffer.DataType.NONE;
                }
                if (input != null && input.isStreamEnd()) {
                    this.inputChannels[channelIndex].setReleased();
                    return Optional.of(new InputChannel.BufferAndAvailability(EventSerializer.toBuffer((AbstractEvent)EndOfPartitionEvent.INSTANCE, (boolean)false), nextType, 0, 0));
                }
                if (input != null && input.isStreamRecord()) {
                    Object inputElement = input.getStreamRecord();
                    delegate.setInstance(inputElement);
                    ByteBuffer serializedRecord = RecordWriter.serializeRecord((DataOutputSerializer)dataOutputSerializer, (IOReadableWritable)delegate);
                    BufferBuilder bufferBuilder = BufferBuilderTestUtils.createBufferBuilder((int)this.bufferSize);
                    BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
                    bufferBuilder.appendAndCommit(serializedRecord);
                    bufferBuilder.finish();
                    bufferBuilder.close();
                    return Optional.of(new InputChannel.BufferAndAvailability(bufferConsumer.build(), nextType, 0, 0));
                }
                if (input != null && input.isEvent()) {
                    AbstractEvent event = input.getEvent();
                    if (event instanceof EndOfPartitionEvent) {
                        this.inputChannels[channelIndex].setReleased();
                    }
                    return Optional.of(new InputChannel.BufferAndAvailability(EventSerializer.toBuffer((AbstractEvent)event, (boolean)false), nextType, 0, 0));
                }
                return Optional.empty();
            };
            this.inputChannels[channelIndex].addBufferAndAvailability(answer);
        }
        this.inputGate.setInputChannels((InputChannel[])this.inputChannels);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendElement(Object element, int channel) {
        ConcurrentLinkedQueue<InputValue<Object>> concurrentLinkedQueue = this.inputQueues[channel];
        synchronized (concurrentLinkedQueue) {
            this.inputQueues[channel].add(InputValue.element(element));
            this.inputQueues[channel].notifyAll();
        }
        this.inputGate.notifyChannelNonEmpty((InputChannel)this.inputChannels[channel]);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendEvent(AbstractEvent event, int channel) {
        ConcurrentLinkedQueue<InputValue<Object>> concurrentLinkedQueue = this.inputQueues[channel];
        synchronized (concurrentLinkedQueue) {
            this.inputQueues[channel].add(InputValue.event(event));
            this.inputQueues[channel].notifyAll();
        }
        this.inputGate.notifyChannelNonEmpty((InputChannel)this.inputChannels[channel]);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void endInput() {
        for (int i = 0; i < this.numInputChannels; ++i) {
            ConcurrentLinkedQueue<InputValue<Object>> concurrentLinkedQueue = this.inputQueues[i];
            synchronized (concurrentLinkedQueue) {
                this.inputQueues[i].add(InputValue.streamEnd());
                this.inputQueues[i].notifyAll();
            }
            this.inputGate.notifyChannelNonEmpty((InputChannel)this.inputChannels[i]);
        }
    }

    public boolean allQueuesEmpty() {
        for (int i = 0; i < this.numInputChannels; ++i) {
            if (this.inputQueues[i].size() <= 0) continue;
            return false;
        }
        return true;
    }

    private static class InputValue<T> {
        private Object elementOrEvent;
        private boolean isStreamEnd;
        private boolean isStreamRecord;
        private boolean isEvent;

        private InputValue(Object elementOrEvent, boolean isStreamEnd, boolean isEvent, boolean isStreamRecord) {
            this.elementOrEvent = elementOrEvent;
            this.isStreamEnd = isStreamEnd;
            this.isStreamRecord = isStreamRecord;
            this.isEvent = isEvent;
        }

        public static <X> InputValue<X> element(Object element) {
            return new InputValue(element, false, false, true);
        }

        public static <X> InputValue<X> streamEnd() {
            return new InputValue(null, true, false, false);
        }

        public static <X> InputValue<X> event(AbstractEvent event) {
            return new InputValue(event, false, true, false);
        }

        public Object getStreamRecord() {
            return this.elementOrEvent;
        }

        public AbstractEvent getEvent() {
            return (AbstractEvent)this.elementOrEvent;
        }

        public boolean isStreamEnd() {
            return this.isStreamEnd;
        }

        public boolean isStreamRecord() {
            return this.isStreamRecord;
        }

        public boolean isEvent() {
            return this.isEvent;
        }
    }
}

