/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.util.Arrays;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.net.SSLUtilsTest;
import org.apache.flink.testutils.serialization.types.ByteArrayType;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class FileBufferReaderITCase
extends TestLogger {
    private static final int parallelism = 8;
    private static final int numRecords = 100000;
    private static final int bufferSize = 4096;
    private static final int headerSize = 8;
    private static final int recordSize = 4088;
    private static final byte[] dataSource = new byte[4088];
    @Parameterized.Parameter
    public boolean sslEnabled;

    @Parameterized.Parameters(name="SSL Enabled = {0}")
    public static List<Boolean> paras() {
        return Arrays.asList(true, false);
    }

    @BeforeClass
    public static void setup() {
        for (int i = 0; i < dataSource.length; ++i) {
            FileBufferReaderITCase.dataSource[i] = 0;
        }
    }

    @Test
    public void testSequentialReading() throws Exception {
        Configuration configuration = this.sslEnabled ? SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores("JDK") : new Configuration();
        configuration.setInteger(SecurityOptions.SSL_INTERNAL_HANDSHAKE_TIMEOUT, 100000);
        configuration.setString(RestOptions.BIND_PORT, "0");
        configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE, "file");
        configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, (Object)MemorySize.parse((String)"1g"));
        configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, (Object)MemorySize.parse((String)"4096b"));
        MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder().setConfiguration(configuration).setNumTaskManagers(8).setNumSlotsPerTaskManager(1).build();
        try (MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);){
            miniCluster.start();
            JobGraph jobGraph = FileBufferReaderITCase.createJobGraph();
            miniCluster.executeJobBlocking(jobGraph);
        }
    }

    private static JobGraph createJobGraph() {
        SlotSharingGroup group1 = new SlotSharingGroup();
        SlotSharingGroup group2 = new SlotSharingGroup();
        JobVertex source = new JobVertex("source");
        source.setInvokableClass(TestSourceInvokable.class);
        source.setParallelism(8);
        source.setSlotSharingGroup(group1);
        JobVertex sink = new JobVertex("sink");
        sink.setInvokableClass(TestSinkInvokable.class);
        sink.setParallelism(8);
        sink.setSlotSharingGroup(group2);
        sink.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        return JobGraphTestUtils.batchJobGraph(source, sink);
    }

    public static final class TestSinkInvokable
    extends AbstractInvokable {
        private int numReceived = 0;

        public TestSinkInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            RecordReader reader = new RecordReader((InputGate)this.getEnvironment().getInputGate(0), ByteArrayType.class, this.getEnvironment().getTaskManagerInfo().getTmpDirectories());
            while (reader.hasNext()) {
                reader.next();
                ++this.numReceived;
            }
            Assert.assertThat((Object)this.numReceived, (Matcher)Matchers.is((Object)100000));
        }
    }

    public static final class TestSourceInvokable
    extends AbstractInvokable {
        public TestSourceInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            RecordWriter writer = new RecordWriterBuilder().build(this.getEnvironment().getWriter(0));
            ByteArrayType bytes = new ByteArrayType(dataSource);
            int counter = 0;
            while (counter++ < 100000) {
                writer.emit((IOReadableWritable)bytes);
                writer.flushAll();
            }
        }
    }
}

