/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.metadata;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointTestUtils;
import org.apache.flink.runtime.checkpoint.metadata.MetadataV2V3SerializerBase;
import org.apache.flink.runtime.checkpoint.metadata.MetadataV3Serializer;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class MetadataV3SerializerTest {
    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Test
    public void testCheckpointWithNoState() throws Exception {
        Random rnd = new Random();
        for (int i = 0; i < 100; ++i) {
            long checkpointId = rnd.nextLong() & Long.MAX_VALUE;
            List<OperatorState> taskStates = Collections.emptyList();
            List<MasterState> masterStates = Collections.emptyList();
            this.testCheckpointSerialization(checkpointId, taskStates, masterStates, null);
        }
    }

    @Test
    public void testCheckpointWithOnlyMasterState() throws Exception {
        Random rnd = new Random();
        int maxNumMasterStates = 5;
        for (int i = 0; i < 100; ++i) {
            long checkpointId = rnd.nextLong() & Long.MAX_VALUE;
            List<OperatorState> operatorStates = Collections.emptyList();
            int numMasterStates = rnd.nextInt(5) + 1;
            Collection<MasterState> masterStates = CheckpointTestUtils.createRandomMasterStates(rnd, numMasterStates);
            this.testCheckpointSerialization(checkpointId, operatorStates, masterStates, null);
        }
    }

    @Test
    public void testCheckpointWithOnlyTaskStateForCheckpoint() throws Exception {
        this.testCheckpointWithOnlyTaskState(null);
    }

    @Test
    public void testCheckpointWithOnlyTaskStateForSavepoint() throws Exception {
        this.testCheckpointWithOnlyTaskState(this.temporaryFolder.newFolder().toURI().toString());
    }

    private void testCheckpointWithOnlyTaskState(String basePath) throws Exception {
        Random rnd = new Random();
        int maxTaskStates = 20;
        int maxNumSubtasks = 20;
        for (int i = 0; i < 100; ++i) {
            long checkpointId = rnd.nextLong() & Long.MAX_VALUE;
            int numTasks = rnd.nextInt(20) + 1;
            int numSubtasks = rnd.nextInt(20) + 1;
            Collection<OperatorState> taskStates = CheckpointTestUtils.createOperatorStates(rnd, basePath, numTasks, 0, 0, numSubtasks);
            List<MasterState> masterStates = Collections.emptyList();
            this.testCheckpointSerialization(checkpointId, taskStates, masterStates, basePath);
        }
    }

    @Test
    public void testCheckpointWithMasterAndTaskStateForCheckpoint() throws Exception {
        this.testCheckpointWithMasterAndTaskState(null);
    }

    @Test
    public void testCheckpointWithMasterAndTaskStateForSavepoint() throws Exception {
        this.testCheckpointWithMasterAndTaskState(this.temporaryFolder.newFolder().toURI().toString());
    }

    private void testCheckpointWithMasterAndTaskState(String basePath) throws Exception {
        Random rnd = new Random();
        int maxNumMasterStates = 5;
        int maxTaskStates = 20;
        int maxNumSubtasks = 20;
        for (int i = 0; i < 100; ++i) {
            long checkpointId = rnd.nextLong() & Long.MAX_VALUE;
            int numTasks = rnd.nextInt(20) + 1;
            int numSubtasks = rnd.nextInt(20) + 1;
            Collection<OperatorState> taskStates = CheckpointTestUtils.createOperatorStates(rnd, basePath, numTasks, 0, 0, numSubtasks);
            int numMasterStates = rnd.nextInt(5) + 1;
            Collection<MasterState> masterStates = CheckpointTestUtils.createRandomMasterStates(rnd, numMasterStates);
            this.testCheckpointSerialization(checkpointId, taskStates, masterStates, basePath);
        }
    }

    @Test
    public void testCheckpointWithFinishedTasksForCheckpoint() throws Exception {
        this.testCheckpointWithFinishedTasks(null);
    }

    @Test
    public void testCheckpointWithFinishedTasksForSavepoint() throws Exception {
        this.testCheckpointWithFinishedTasks(this.temporaryFolder.newFolder().toURI().toString());
    }

    private void testCheckpointWithFinishedTasks(String basePath) throws Exception {
        Random rnd = new Random();
        int maxNumMasterStates = 5;
        int maxNumSubtasks = 20;
        int maxAllRunningTaskStates = 20;
        int maxPartlyFinishedStates = 10;
        int maxFullyFinishedSubtasks = 10;
        long checkpointId = rnd.nextLong() & Long.MAX_VALUE;
        int numSubtasks = rnd.nextInt(20) + 1;
        int numAllRunningTasks = rnd.nextInt(20) + 1;
        int numPartlyFinishedTasks = rnd.nextInt(10) + 1;
        int numFullyFinishedTasks = rnd.nextInt(10) + 1;
        Collection<OperatorState> taskStates = CheckpointTestUtils.createOperatorStates(rnd, basePath, numAllRunningTasks, numPartlyFinishedTasks, numFullyFinishedTasks, numSubtasks);
        int numMasterStates = rnd.nextInt(5) + 1;
        Collection<MasterState> masterStates = CheckpointTestUtils.createRandomMasterStates(rnd, numMasterStates);
        this.testCheckpointSerialization(checkpointId, taskStates, masterStates, basePath);
    }

    private void testCheckpointSerialization(long checkpointId, Collection<OperatorState> operatorStates, Collection<MasterState> masterStates, @Nullable String basePath) throws IOException {
        MetadataV3Serializer serializer = MetadataV3Serializer.INSTANCE;
        ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
        DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper((OutputStream)baos);
        CheckpointMetadata metadata = new CheckpointMetadata(checkpointId, operatorStates, masterStates);
        MetadataV3Serializer.serialize((CheckpointMetadata)metadata, (DataOutputStream)out);
        out.close();
        if (basePath != null) {
            Path metaPath = new Path(basePath, "_metadata");
            FileSystem.getLocalFileSystem().create(metaPath, FileSystem.WriteMode.OVERWRITE).close();
        }
        byte[] bytes = baos.toByteArray();
        DataInputViewStreamWrapper in = new DataInputViewStreamWrapper((InputStream)new ByteArrayInputStreamWithPos(bytes));
        CheckpointMetadata deserialized = serializer.deserialize((DataInputStream)in, this.getClass().getClassLoader(), basePath);
        Assert.assertEquals((long)checkpointId, (long)deserialized.getCheckpointId());
        Assert.assertEquals(operatorStates, (Object)deserialized.getOperatorStates());
        Assert.assertEquals(operatorStates.stream().map(OperatorState::isFullyFinished).collect(Collectors.toList()), deserialized.getOperatorStates().stream().map(OperatorState::isFullyFinished).collect(Collectors.toList()));
        Assert.assertEquals((long)masterStates.size(), (long)deserialized.getMasterStates().size());
        Iterator<MasterState> a = masterStates.iterator();
        Iterator b = deserialized.getMasterStates().iterator();
        while (a.hasNext()) {
            CheckpointTestUtils.assertMasterStateEquality(a.next(), (MasterState)b.next());
        }
    }

    @Test
    public void testSerializeKeyGroupsStateHandle() throws IOException {
        KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(0, 123);
        byte[] data = new byte[]{1, 2, 3, 4};
        try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos();){
            MetadataV2V3SerializerBase.serializeStreamStateHandle((StreamStateHandle)new KeyGroupsStateHandle(offsets, (StreamStateHandle)new ByteStreamStateHandle("test", data)), (DataOutputStream)new DataOutputStream((OutputStream)out));
            try (ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());){
                StreamStateHandle handle = MetadataV2V3SerializerBase.deserializeStreamStateHandle((DataInputStream)new DataInputStream(in), null);
                Assert.assertTrue((boolean)(handle instanceof KeyGroupsStateHandle));
                Assert.assertEquals((Object)offsets, (Object)((KeyGroupsStateHandle)handle).getGroupRangeOffsets());
                byte[] deserialized = new byte[data.length];
                try (FSDataInputStream dataStream = handle.openInputStream();){
                    dataStream.read(deserialized);
                    Assert.assertArrayEquals((byte[])data, (byte[])deserialized);
                }
            }
        }
    }
}

