/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.EventSender;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventValve;
import org.apache.flink.runtime.operators.coordination.SubtaskAccess;
import org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl;
import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
import org.apache.flink.util.SerializedValue;

public class EventReceivingTasks
implements SubtaskAccess.SubtaskAccessFactory {
    final ArrayList<EventWithSubtask> events = new ArrayList();
    private final CompletableFuture<Acknowledge> eventSendingResult;
    private final Map<Integer, TestSubtaskAccess> subtasks = new HashMap<Integer, TestSubtaskAccess>();
    private final boolean createdTasksAreRunning;

    public static EventReceivingTasks createForNotYetRunningTasks() {
        return new EventReceivingTasks(false, CompletableFuture.completedFuture(Acknowledge.get()));
    }

    public static EventReceivingTasks createForRunningTasks() {
        return new EventReceivingTasks(true, CompletableFuture.completedFuture(Acknowledge.get()));
    }

    public static EventReceivingTasks createForRunningTasksFailingRpcs(Throwable rpcException) {
        return new EventReceivingTasks(true, FutureUtils.completedExceptionally((Throwable)rpcException));
    }

    public static EventReceivingTasks createForRunningTasksWithRpcResult(CompletableFuture<Acknowledge> result) {
        return new EventReceivingTasks(true, result);
    }

    private EventReceivingTasks(boolean createdTasksAreRunning, CompletableFuture<Acknowledge> eventSendingResult) {
        this.createdTasksAreRunning = createdTasksAreRunning;
        this.eventSendingResult = eventSendingResult;
    }

    public int getNumberOfSentEvents() {
        return this.events.size();
    }

    public List<EventWithSubtask> getAllSentEvents() {
        return this.events;
    }

    public List<OperatorEvent> getSentEventsForSubtask(int subtaskIndex) {
        return this.events.stream().filter(evt -> evt.subtask == subtaskIndex).map(evt -> evt.event).collect(Collectors.toList());
    }

    public SubtaskAccess getAccessForSubtask(int subtask) {
        return this.subtasks.computeIfAbsent(subtask, subtaskIdx -> new TestSubtaskAccess((int)subtaskIdx, this.createdTasksAreRunning));
    }

    public OperatorCoordinator.SubtaskGateway createGatewayForSubtask(int subtask) {
        SubtaskAccess sta = this.getAccessForSubtask(subtask);
        return new SubtaskGatewayImpl(sta, (EventSender)new OperatorEventValve(), Executors.directExecutor(), new IncompleteFuturesTracker());
    }

    public void switchTaskToRunning(int subtask) {
        TestSubtaskAccess task = this.subtasks.get(subtask);
        if (task == null) {
            throw new IllegalArgumentException("No subtask created for " + subtask);
        }
        task.switchToRunning();
    }

    public void switchAllTasksToRunning() {
        for (TestSubtaskAccess tsa : this.subtasks.values()) {
            tsa.switchToRunning();
        }
    }

    Callable<CompletableFuture<Acknowledge>> createSendAction(OperatorEvent event, int subtask) {
        return () -> {
            this.events.add(new EventWithSubtask(event, subtask));
            return this.eventSendingResult;
        };
    }

    private final class TestSubtaskAccess
    implements SubtaskAccess {
        private final ExecutionAttemptID executionAttemptId = new ExecutionAttemptID();
        private final CompletableFuture<?> running;
        private final int subtaskIndex;

        private TestSubtaskAccess(int subtaskIndex, boolean isRunning) {
            this.subtaskIndex = subtaskIndex;
            this.running = new CompletableFuture();
            if (isRunning) {
                this.switchToRunning();
            }
        }

        public Callable<CompletableFuture<Acknowledge>> createEventSendAction(SerializedValue<OperatorEvent> event) {
            OperatorEvent deserializedEvent;
            try {
                deserializedEvent = (OperatorEvent)event.deserializeValue(this.getClass().getClassLoader());
            }
            catch (IOException | ClassNotFoundException e) {
                throw new AssertionError((Object)e);
            }
            return EventReceivingTasks.this.createSendAction(deserializedEvent, this.subtaskIndex);
        }

        public int getSubtaskIndex() {
            return this.subtaskIndex;
        }

        public ExecutionAttemptID currentAttempt() {
            return this.executionAttemptId;
        }

        public String subtaskName() {
            return "test_task-" + this.subtaskIndex + " #: " + this.executionAttemptId;
        }

        public CompletableFuture<?> hasSwitchedToRunning() {
            return this.running;
        }

        public boolean isStillRunning() {
            return true;
        }

        void switchToRunning() {
            this.running.complete(null);
        }

        public void triggerTaskFailover(Throwable cause) {
        }
    }

    public static final class EventWithSubtask {
        public final OperatorEvent event;
        public final int subtask;

        public EventWithSubtask(OperatorEvent event, int subtask) {
            this.event = event;
            this.subtask = subtask;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            EventWithSubtask that = (EventWithSubtask)o;
            return this.subtask == that.subtask && this.event.equals(that.event);
        }

        public int hashCode() {
            return Objects.hash(this.event, this.subtask);
        }

        public String toString() {
            return this.event + " => subtask " + this.subtask;
        }
    }
}

