/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.changelog;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryKey;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.StateChangelogHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;

@Internal
public final class StateChangelogHandleStreamImpl
implements StateChangelogHandle<StateChangeStreamReader> {
    private static final long serialVersionUID = -8070326169926626355L;
    private final KeyGroupRange keyGroupRange;
    private final List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets;
    private transient SharedStateRegistry stateRegistry;

    public StateChangelogHandleStreamImpl(List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets, KeyGroupRange keyGroupRange) {
        this.handlesAndOffsets = handlesAndOffsets;
        this.keyGroupRange = keyGroupRange;
    }

    @Override
    public void registerSharedStates(SharedStateRegistry stateRegistry) {
        this.stateRegistry = stateRegistry;
        this.handlesAndOffsets.forEach(handleAndOffset -> stateRegistry.registerReference(StateChangelogHandleStreamImpl.getKey((StreamStateHandle)handleAndOffset.f0), (StreamStateHandle)handleAndOffset.f0));
    }

    @Override
    public KeyGroupRange getKeyGroupRange() {
        return this.keyGroupRange;
    }

    @Override
    @Nullable
    public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
        KeyGroupRange offsets = keyGroupRange.getIntersection(keyGroupRange);
        if (offsets.getNumberOfKeyGroups() == 0) {
            return null;
        }
        return new StateChangelogHandleStreamImpl(this.handlesAndOffsets, offsets);
    }

    @Override
    public CloseableIterator<StateChange> getChanges(final StateChangeStreamReader reader) {
        return new CloseableIterator<StateChange>(){
            private final Iterator<Tuple2<StreamStateHandle, Long>> handleIterator;
            private CloseableIterator<StateChange> current;
            {
                this.handleIterator = StateChangelogHandleStreamImpl.this.handlesAndOffsets.iterator();
                this.current = CloseableIterator.empty();
            }

            public boolean hasNext() {
                this.advance();
                return this.current.hasNext();
            }

            public StateChange next() {
                this.advance();
                return (StateChange)this.current.next();
            }

            private void advance() {
                while (!this.current.hasNext() && this.handleIterator.hasNext()) {
                    Tuple2<StreamStateHandle, Long> tuple2 = this.handleIterator.next();
                    try {
                        this.current = reader.read((StreamStateHandle)tuple2.f0, (Long)tuple2.f1);
                    }
                    catch (IOException e) {
                        ExceptionUtils.rethrow((Throwable)e);
                    }
                }
            }

            public void close() throws Exception {
                this.current.close();
            }
        };
    }

    @Override
    public void discardState() {
        this.handlesAndOffsets.forEach(handleAndOffset -> this.stateRegistry.unregisterReference(StateChangelogHandleStreamImpl.getKey((StreamStateHandle)handleAndOffset.f0)));
    }

    @Override
    public long getStateSize() {
        return 0L;
    }

    private static SharedStateRegistryKey getKey(StreamStateHandle stateHandle) {
        if (stateHandle instanceof FileStateHandle) {
            return new SharedStateRegistryKey(((FileStateHandle)stateHandle).getFilePath().toString());
        }
        if (stateHandle instanceof ByteStreamStateHandle) {
            return new SharedStateRegistryKey(((ByteStreamStateHandle)stateHandle).getHandleName());
        }
        return new SharedStateRegistryKey(Integer.toString(System.identityHashCode(stateHandle)));
    }

    public static interface StateChangeStreamReader {
        public CloseableIterator<StateChange> read(StreamStateHandle var1, long var2) throws IOException;
    }
}

