/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.api.writer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.writer.MultipleRecordWriters;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterTest;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class RecordWriterDelegateTest
extends TestLogger {
    private static final int recordSize = 8;
    private static final int numberOfBuffers = 10;
    private static final int memorySegmentSize = 128;
    private NetworkBufferPool globalPool;

    @Before
    public void setup() {
        Assert.assertEquals((String)"Illegal memory segment size,", (long)0L, (long)0L);
        this.globalPool = new NetworkBufferPool(10, 128);
    }

    @After
    public void teardown() {
        this.globalPool.destroyAllBufferPools();
        this.globalPool.destroy();
    }

    @Test
    public void testSingleRecordWriterAvailability() throws Exception {
        RecordWriter recordWriter = this.createRecordWriter(this.globalPool);
        SingleRecordWriter writerDelegate = new SingleRecordWriter(recordWriter);
        Assert.assertEquals((Object)recordWriter, (Object)writerDelegate.getRecordWriter(0));
        this.verifyAvailability((RecordWriterDelegate)writerDelegate);
    }

    @Test
    public void testMultipleRecordWritersAvailability() throws Exception {
        int numRecordWriters = 2;
        ArrayList<RecordWriter> recordWriters = new ArrayList<RecordWriter>(2);
        for (int i = 0; i < 2; ++i) {
            recordWriters.add(this.createRecordWriter(this.globalPool));
        }
        MultipleRecordWriters writerDelegate = new MultipleRecordWriters(recordWriters);
        for (int i = 0; i < 2; ++i) {
            Assert.assertEquals(recordWriters.get(i), (Object)writerDelegate.getRecordWriter(i));
        }
        this.verifyAvailability((RecordWriterDelegate)writerDelegate);
    }

    @Test
    public void testSingleRecordWriterBroadcastEvent() throws Exception {
        ResultPartition partition = RecordWriterTest.createResultPartition(128, 2);
        RecordWriter recordWriter = new RecordWriterBuilder().build((ResultPartitionWriter)partition);
        SingleRecordWriter writerDelegate = new SingleRecordWriter(recordWriter);
        this.verifyBroadcastEvent((RecordWriterDelegate)writerDelegate, Collections.singletonList(partition));
    }

    @Test
    public void testMultipleRecordWritersBroadcastEvent() throws Exception {
        int numRecordWriters = 2;
        ArrayList<RecordWriter> recordWriters = new ArrayList<RecordWriter>(2);
        ArrayList<ResultPartition> partitions = new ArrayList<ResultPartition>(2);
        for (int i = 0; i < 2; ++i) {
            ResultPartition partition = RecordWriterTest.createResultPartition(128, 2);
            partitions.add(partition);
            recordWriters.add(new RecordWriterBuilder().build((ResultPartitionWriter)partition));
        }
        MultipleRecordWriters writerDelegate = new MultipleRecordWriters(recordWriters);
        this.verifyBroadcastEvent((RecordWriterDelegate)writerDelegate, partitions);
    }

    private RecordWriter createRecordWriter(NetworkBufferPool globalPool) throws Exception {
        BufferPool localPool = globalPool.createBufferPool(1, 1, 1, Integer.MAX_VALUE);
        ResultPartition partition = new ResultPartitionBuilder().setBufferPoolFactory((SupplierWithException<BufferPool, IOException>)((SupplierWithException)() -> localPool)).build();
        partition.setup();
        return new RecordWriterBuilder().build((ResultPartitionWriter)partition);
    }

    private void verifyAvailability(RecordWriterDelegate writerDelegate) throws Exception {
        Assert.assertTrue((boolean)writerDelegate.isAvailable());
        Assert.assertTrue((boolean)writerDelegate.getAvailableFuture().isDone());
        RecordWriter recordWriter = writerDelegate.getRecordWriter(0);
        for (int i = 0; i < 16; ++i) {
            recordWriter.emit((IOReadableWritable)new IntValue(i));
        }
        Assert.assertFalse((boolean)writerDelegate.isAvailable());
        CompletableFuture future = writerDelegate.getAvailableFuture();
        Assert.assertFalse((boolean)future.isDone());
        ResultSubpartitionView readView = recordWriter.getTargetPartition().createSubpartitionView(0, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        Buffer buffer = readView.getNextBuffer().buffer();
        buffer.recycleBuffer();
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertTrue((boolean)writerDelegate.isAvailable());
        Assert.assertTrue((boolean)writerDelegate.getAvailableFuture().isDone());
    }

    private void verifyBroadcastEvent(RecordWriterDelegate writerDelegate, List<ResultPartition> partitions) throws Exception {
        CancelCheckpointMarker message = new CancelCheckpointMarker(1L);
        writerDelegate.broadcastEvent((AbstractEvent)message);
        for (ResultPartition partition : partitions) {
            for (int i = 0; i < partition.getNumberOfSubpartitions(); ++i) {
                Assert.assertEquals((long)1L, (long)partition.getNumberOfQueuedBuffers(i));
                ResultSubpartitionView view = partition.createSubpartitionView(i, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
                BufferOrEvent boe = RecordWriterTest.parseBuffer(view.getNextBuffer().buffer(), i);
                Assert.assertTrue((boolean)boe.isEvent());
                Assert.assertEquals((Object)message, (Object)boe.getEvent());
            }
        }
    }
}

