/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.flink.runtime.scheduler.InputsLocationsRetriever;
import org.apache.flink.runtime.scheduler.PreferredLocationsRetriever;
import org.apache.flink.runtime.scheduler.StateLocationRetriever;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;

public class DefaultPreferredLocationsRetriever
implements PreferredLocationsRetriever {
    static final int MAX_DISTINCT_LOCATIONS_TO_CONSIDER = 8;
    static final int MAX_DISTINCT_CONSUMERS_TO_CONSIDER = 8;
    private final StateLocationRetriever stateLocationRetriever;
    private final InputsLocationsRetriever inputsLocationsRetriever;

    DefaultPreferredLocationsRetriever(StateLocationRetriever stateLocationRetriever, InputsLocationsRetriever inputsLocationsRetriever) {
        this.stateLocationRetriever = (StateLocationRetriever)Preconditions.checkNotNull((Object)stateLocationRetriever);
        this.inputsLocationsRetriever = (InputsLocationsRetriever)Preconditions.checkNotNull((Object)inputsLocationsRetriever);
    }

    @Override
    public CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocations(ExecutionVertexID executionVertexId, Set<ExecutionVertexID> producersToIgnore) {
        Preconditions.checkNotNull((Object)executionVertexId);
        Preconditions.checkNotNull(producersToIgnore);
        Collection<TaskManagerLocation> preferredLocationsBasedOnState = this.getPreferredLocationsBasedOnState(executionVertexId);
        if (!preferredLocationsBasedOnState.isEmpty()) {
            return CompletableFuture.completedFuture(preferredLocationsBasedOnState);
        }
        return this.getPreferredLocationsBasedOnInputs(executionVertexId, producersToIgnore);
    }

    private Collection<TaskManagerLocation> getPreferredLocationsBasedOnState(ExecutionVertexID executionVertexId) {
        return this.stateLocationRetriever.getStateLocation(executionVertexId).map(Collections::singleton).orElse(Collections.emptySet());
    }

    private CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocationsBasedOnInputs(ExecutionVertexID executionVertexId, Set<ExecutionVertexID> producersToIgnore) {
        CompletableFuture<Collection<TaskManagerLocation>> preferredLocations = CompletableFuture.completedFuture(Collections.emptyList());
        Collection<ConsumedPartitionGroup> consumedPartitionGroups = this.inputsLocationsRetriever.getConsumedPartitionGroups(executionVertexId);
        for (ConsumedPartitionGroup consumedPartitionGroup : consumedPartitionGroups) {
            if (consumedPartitionGroup.getConsumerVertexGroup().size() > 8) continue;
            Collection<CompletableFuture<TaskManagerLocation>> locationsFutures = this.getInputLocationFutures(producersToIgnore, this.inputsLocationsRetriever.getProducersOfConsumedPartitionGroup(consumedPartitionGroup));
            preferredLocations = this.combineLocations(preferredLocations, locationsFutures);
        }
        return preferredLocations;
    }

    private Collection<CompletableFuture<TaskManagerLocation>> getInputLocationFutures(Set<ExecutionVertexID> producersToIgnore, Collection<ExecutionVertexID> producers) {
        ArrayList<CompletableFuture<TaskManagerLocation>> locationsFutures = new ArrayList<CompletableFuture<TaskManagerLocation>>();
        for (ExecutionVertexID producer : producers) {
            Optional<CompletableFuture<Object>> optionalLocationFuture = !producersToIgnore.contains(producer) ? this.inputsLocationsRetriever.getTaskManagerLocation(producer) : Optional.empty();
            optionalLocationFuture.ifPresent(locationsFutures::add);
            if (locationsFutures.size() <= 8) continue;
            return Collections.emptyList();
        }
        return locationsFutures;
    }

    private CompletableFuture<Collection<TaskManagerLocation>> combineLocations(CompletableFuture<Collection<TaskManagerLocation>> locationsCombinedAlready, Collection<CompletableFuture<TaskManagerLocation>> locationsToCombine) {
        CompletableFuture uniqueLocationsFuture = FutureUtils.combineAll(locationsToCombine).thenApply(HashSet::new);
        return locationsCombinedAlready.thenCombine((CompletionStage)uniqueLocationsFuture, (locationsOnOneEdge, locationsOnAnotherEdge) -> {
            if (!locationsOnOneEdge.isEmpty() && locationsOnAnotherEdge.size() > locationsOnOneEdge.size() || locationsOnAnotherEdge.isEmpty()) {
                return locationsOnOneEdge;
            }
            return locationsOnAnotherEdge;
        });
    }
}

