/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor.slot;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotPayload;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotState;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskSlot<T extends TaskSlotPayload>
implements AutoCloseableAsync {
    private static final Logger LOG = LoggerFactory.getLogger(TaskSlot.class);
    private final int index;
    private final ResourceProfile resourceProfile;
    private final Map<ExecutionAttemptID, T> tasks;
    private final MemoryManager memoryManager;
    private TaskSlotState state;
    private final JobID jobId;
    private final AllocationID allocationId;
    private final CompletableFuture<Void> closingFuture;
    private final Executor asyncExecutor;

    public TaskSlot(int index, ResourceProfile resourceProfile, int memoryPageSize, JobID jobId, AllocationID allocationId, Executor asyncExecutor) {
        this.index = index;
        this.resourceProfile = (ResourceProfile)Preconditions.checkNotNull((Object)resourceProfile);
        this.asyncExecutor = (Executor)Preconditions.checkNotNull((Object)asyncExecutor);
        this.tasks = new HashMap<ExecutionAttemptID, T>(4);
        this.state = TaskSlotState.ALLOCATED;
        this.jobId = jobId;
        this.allocationId = allocationId;
        this.memoryManager = TaskSlot.createMemoryManager(resourceProfile, memoryPageSize);
        this.closingFuture = new CompletableFuture();
    }

    public int getIndex() {
        return this.index;
    }

    public ResourceProfile getResourceProfile() {
        return this.resourceProfile;
    }

    public JobID getJobId() {
        return this.jobId;
    }

    public AllocationID getAllocationId() {
        return this.allocationId;
    }

    TaskSlotState getState() {
        return this.state;
    }

    public boolean isEmpty() {
        return this.tasks.isEmpty();
    }

    public boolean isActive(JobID activeJobId, AllocationID activeAllocationId) {
        Preconditions.checkNotNull((Object)activeJobId);
        Preconditions.checkNotNull((Object)((Object)activeAllocationId));
        return TaskSlotState.ACTIVE == this.state && activeJobId.equals((Object)this.jobId) && activeAllocationId.equals((Object)this.allocationId);
    }

    public boolean isAllocated(JobID jobIdToCheck, AllocationID allocationIDToCheck) {
        Preconditions.checkNotNull((Object)jobIdToCheck);
        Preconditions.checkNotNull((Object)((Object)allocationIDToCheck));
        return jobIdToCheck.equals((Object)this.jobId) && allocationIDToCheck.equals((Object)this.allocationId) && (TaskSlotState.ACTIVE == this.state || TaskSlotState.ALLOCATED == this.state);
    }

    public boolean isReleasing() {
        return TaskSlotState.RELEASING == this.state;
    }

    public Iterator<T> getTasks() {
        return this.tasks.values().iterator();
    }

    public MemoryManager getMemoryManager() {
        return this.memoryManager;
    }

    public boolean add(T task) {
        Preconditions.checkArgument((boolean)task.getJobID().equals((Object)this.jobId), (Object)"The task's job id does not match the job id for which the slot has been allocated.");
        Preconditions.checkArgument((boolean)task.getAllocationId().equals((Object)this.allocationId), (Object)"The task's allocation id does not match the allocation id for which the slot has been allocated.");
        Preconditions.checkState((TaskSlotState.ACTIVE == this.state ? 1 : 0) != 0, (Object)"The task slot is not in state active.");
        TaskSlotPayload oldTask = (TaskSlotPayload)this.tasks.put(task.getExecutionId(), task);
        if (oldTask != null) {
            this.tasks.put(task.getExecutionId(), oldTask);
            return false;
        }
        return true;
    }

    public T remove(ExecutionAttemptID executionAttemptId) {
        return (T)((TaskSlotPayload)this.tasks.remove((Object)executionAttemptId));
    }

    public void clear() {
        this.tasks.clear();
    }

    public boolean markActive() {
        if (TaskSlotState.ALLOCATED == this.state || TaskSlotState.ACTIVE == this.state) {
            this.state = TaskSlotState.ACTIVE;
            return true;
        }
        return false;
    }

    public boolean markInactive() {
        if (TaskSlotState.ACTIVE == this.state || TaskSlotState.ALLOCATED == this.state) {
            this.state = TaskSlotState.ALLOCATED;
            return true;
        }
        return false;
    }

    public SlotOffer generateSlotOffer() {
        Preconditions.checkState((TaskSlotState.ACTIVE == this.state || TaskSlotState.ALLOCATED == this.state ? 1 : 0) != 0, (Object)"The task slot is not in state active or allocated.");
        Preconditions.checkState((this.allocationId != null ? 1 : 0) != 0, (Object)"The task slot are not allocated");
        return new SlotOffer(this.allocationId, this.index, this.resourceProfile);
    }

    public String toString() {
        return "TaskSlot(index:" + this.index + ", state:" + (Object)((Object)this.state) + ", resource profile: " + this.resourceProfile + ", allocationId: " + (this.allocationId != null ? this.allocationId.toString() : "none") + ", jobId: " + (this.jobId != null ? this.jobId.toString() : "none") + ')';
    }

    public CompletableFuture<Void> closeAsync() {
        return this.closeAsync(new FlinkException("Closing the slot"));
    }

    CompletableFuture<Void> closeAsync(Throwable cause) {
        if (!this.isReleasing()) {
            this.state = TaskSlotState.RELEASING;
            if (!this.isEmpty()) {
                this.tasks.values().forEach(task -> task.failExternally(cause));
            }
            CompletionStage shutdownFuture = FutureUtils.waitForAll(this.tasks.values().stream().map(TaskSlotPayload::getTerminationFuture).collect(Collectors.toList())).thenRun(this.memoryManager::shutdown);
            this.verifyAllManagedMemoryIsReleasedAfter((CompletableFuture<Void>)shutdownFuture);
            FutureUtils.forward(shutdownFuture, this.closingFuture);
        }
        return this.closingFuture;
    }

    private void verifyAllManagedMemoryIsReleasedAfter(CompletableFuture<Void> after2) {
        after2.thenRunAsync(() -> {
            if (!this.memoryManager.verifyEmpty()) {
                LOG.warn("Not all slot memory is freed, potential memory leak at {}", (Object)this);
            }
        }, this.asyncExecutor);
    }

    private static MemoryManager createMemoryManager(ResourceProfile resourceProfile, int pageSize) {
        return MemoryManager.create(resourceProfile.getManagedMemory().getBytes(), pageSize);
    }
}

