/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.runtime.state.heap.CopyOnWriteStateMap;
import org.apache.flink.runtime.state.heap.CopyOnWriteStateTable;
import org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot;
import org.apache.flink.runtime.state.heap.MockInternalKeyContext;
import org.apache.flink.runtime.state.heap.StateMap;
import org.apache.flink.runtime.state.heap.TestDuplicateSerializer;
import org.junit.Assert;
import org.junit.Test;

public class CopyOnWriteStateTableTest {
    @Test
    public void testSerializerDuplicationInSnapshot() throws IOException {
        TestDuplicateSerializer namespaceSerializer = new TestDuplicateSerializer();
        TestDuplicateSerializer stateSerializer = new TestDuplicateSerializer();
        TestDuplicateSerializer keySerializer = new TestDuplicateSerializer();
        RegisteredKeyValueStateBackendMetaInfo metaInfo = new RegisteredKeyValueStateBackendMetaInfo(StateDescriptor.Type.VALUE, "test", (TypeSerializer)namespaceSerializer, (TypeSerializer)stateSerializer);
        MockInternalKeyContext mockKeyContext = new MockInternalKeyContext();
        CopyOnWriteStateTable table = new CopyOnWriteStateTable(mockKeyContext, metaInfo, (TypeSerializer)keySerializer);
        table.put((Object)0, 0, (Object)0, (Object)0);
        table.put((Object)1, 0, (Object)0, (Object)1);
        table.put((Object)2, 0, (Object)1, (Object)2);
        CopyOnWriteStateTableSnapshot snapshot = table.stateSnapshot();
        StateSnapshot.StateKeyGroupWriter partitionedSnapshot = snapshot.getKeyGroupWriter();
        namespaceSerializer.disable();
        keySerializer.disable();
        stateSerializer.disable();
        partitionedSnapshot.writeStateInKeyGroup((DataOutputView)new DataOutputViewStreamWrapper((OutputStream)new ByteArrayOutputStreamWithPos(1024)), 0);
    }

    @Test
    public void testReleaseForSuccessfulSnapshot() throws IOException {
        int numberOfKeyGroups = 10;
        CopyOnWriteStateTable<Integer, Integer, Float> table = this.createStateTableForSnapshotRelease(numberOfKeyGroups);
        ByteArrayOutputStreamWithPos byteArrayOutputStreamWithPos = new ByteArrayOutputStreamWithPos();
        DataOutputViewStreamWrapper dataOutputView = new DataOutputViewStreamWrapper((OutputStream)byteArrayOutputStreamWithPos);
        CopyOnWriteStateTableSnapshot snapshot = table.stateSnapshot();
        for (int group = 0; group < numberOfKeyGroups; ++group) {
            snapshot.writeStateInKeyGroup((DataOutputView)dataOutputView, group);
            Assert.assertTrue((boolean)this.isResourceReleasedForKeyGroup(table, group));
        }
        snapshot.release();
        this.verifyResourceIsReleasedForAllKeyGroup(table, 1);
    }

    @Test
    public void testReleaseForFailedSnapshot() throws IOException {
        int group;
        int numberOfKeyGroups = 10;
        CopyOnWriteStateTable<Integer, Integer, Float> table = this.createStateTableForSnapshotRelease(numberOfKeyGroups);
        ByteArrayOutputStreamWithPos byteArrayOutputStreamWithPos = new ByteArrayOutputStreamWithPos();
        DataOutputViewStreamWrapper dataOutputView = new DataOutputViewStreamWrapper((OutputStream)byteArrayOutputStreamWithPos);
        CopyOnWriteStateTableSnapshot snapshot = table.stateSnapshot();
        for (group = 0; group < numberOfKeyGroups / 2; ++group) {
            snapshot.writeStateInKeyGroup((DataOutputView)dataOutputView, group);
            Assert.assertTrue((boolean)this.isResourceReleasedForKeyGroup(table, group));
        }
        for (group = numberOfKeyGroups / 2; group < numberOfKeyGroups; ++group) {
            Assert.assertFalse((boolean)this.isResourceReleasedForKeyGroup(table, group));
        }
        snapshot.release();
        this.verifyResourceIsReleasedForAllKeyGroup(table, 2);
    }

    private CopyOnWriteStateTable<Integer, Integer, Float> createStateTableForSnapshotRelease(int numberOfKeyGroups) {
        RegisteredKeyValueStateBackendMetaInfo metaInfo = new RegisteredKeyValueStateBackendMetaInfo(StateDescriptor.Type.VALUE, "test", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)FloatSerializer.INSTANCE);
        MockInternalKeyContext<Integer> mockKeyContext = new MockInternalKeyContext<Integer>(0, numberOfKeyGroups - 1, numberOfKeyGroups);
        CopyOnWriteStateTable table = new CopyOnWriteStateTable(mockKeyContext, metaInfo, (TypeSerializer)IntSerializer.INSTANCE);
        ThreadLocalRandom random = ThreadLocalRandom.current();
        for (int i = 0; i < 1000; ++i) {
            mockKeyContext.setCurrentKeyAndKeyGroup(i);
            table.put((Object)random.nextInt(), (Object)Float.valueOf(random.nextFloat()));
        }
        return table;
    }

    private void verifyResourceIsReleasedForAllKeyGroup(CopyOnWriteStateTable table, int snapshotVersion) {
        StateMap[] stateMaps;
        for (StateMap map : stateMaps = table.getState()) {
            Assert.assertFalse((boolean)((CopyOnWriteStateMap)map).getSnapshotVersions().contains(snapshotVersion));
        }
    }

    private boolean isResourceReleasedForKeyGroup(CopyOnWriteStateTable table, int keyGroup) {
        CopyOnWriteStateMap stateMap = (CopyOnWriteStateMap)table.getMapForKeyGroup(keyGroup);
        return !stateMap.getSnapshotVersions().contains(1);
    }
}

