/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.job;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.dump.MetricDump;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobVertexBackPressureHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo;
import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class JobVertexBackPressureHandlerTest {
    private static final JobID TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE = new JobID();
    private static final JobVertexID TEST_JOB_VERTEX_ID = new JobVertexID();
    private static final JobID TEST_JOB_ID_BACK_PRESSURE_STATS_ABSENT = new JobID();
    private TestingRestfulGateway restfulGateway;
    private JobVertexBackPressureHandler jobVertexBackPressureHandler;
    private MetricStore metricStore;

    private static Collection<MetricDump> getMetricDumps() {
        ArrayList<MetricDump> dumps = new ArrayList<MetricDump>();
        QueryScopeInfo.TaskQueryScopeInfo task0 = new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), 0);
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task0, "backPressuredTimeMsPerSecond", "1000"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task0, "idleTimeMsPerSecond", "0"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task0, "busyTimeMsPerSecond", "0"));
        QueryScopeInfo.TaskQueryScopeInfo task1 = new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), 1);
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task1, "backPressuredTimeMsPerSecond", "500"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task1, "idleTimeMsPerSecond", "100"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task1, "busyTimeMsPerSecond", "900"));
        QueryScopeInfo.TaskQueryScopeInfo task3 = new QueryScopeInfo.TaskQueryScopeInfo(TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString(), TEST_JOB_VERTEX_ID.toString(), 3);
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task3, "backPressuredTimeMsPerSecond", "100"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task3, "idleTimeMsPerSecond", "200"));
        dumps.add((MetricDump)new MetricDump.GaugeDump((QueryScopeInfo)task3, "busyTimeMsPerSecond", "700"));
        return dumps;
    }

    @Before
    public void setUp() {
        this.metricStore = new MetricStore();
        for (MetricDump metricDump : JobVertexBackPressureHandlerTest.getMetricDumps()) {
            this.metricStore.add(metricDump);
        }
        this.jobVertexBackPressureHandler = new JobVertexBackPressureHandler(() -> CompletableFuture.completedFuture(this.restfulGateway), Time.seconds((long)10L), Collections.emptyMap(), (MessageHeaders)JobVertexBackPressureHeaders.getInstance(), new MetricFetcher(){
            private long updateCount = 0L;

            public MetricStore getMetricStore() {
                return JobVertexBackPressureHandlerTest.this.metricStore;
            }

            public void update() {
                ++this.updateCount;
            }

            public long getLastUpdateTime() {
                return this.updateCount;
            }
        });
    }

    @Test
    public void testGetBackPressure() throws Exception {
        HashMap<String, String> pathParameters = new HashMap<String, String>();
        pathParameters.put("jobid", TEST_JOB_ID_BACK_PRESSURE_STATS_AVAILABLE.toString());
        pathParameters.put("vertexid", TEST_JOB_VERTEX_ID.toString());
        HandlerRequest request = new HandlerRequest((RequestBody)EmptyRequestBody.getInstance(), (MessageParameters)new JobVertexMessageParameters(), pathParameters, Collections.emptyMap());
        CompletableFuture jobVertexBackPressureInfoCompletableFuture = this.jobVertexBackPressureHandler.handleRequest(request, (RestfulGateway)this.restfulGateway);
        JobVertexBackPressureInfo jobVertexBackPressureInfo = (JobVertexBackPressureInfo)jobVertexBackPressureInfoCompletableFuture.get();
        Assert.assertThat((Object)jobVertexBackPressureInfo.getStatus(), (Matcher)Matchers.equalTo((Object)JobVertexBackPressureInfo.VertexBackPressureStatus.OK));
        Assert.assertThat((Object)jobVertexBackPressureInfo.getBackpressureLevel(), (Matcher)Matchers.equalTo((Object)JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH));
        Assert.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getBackPressuredRatio).collect(Collectors.toList()), (Matcher)Matchers.contains((Object[])new Double[]{1.0, 0.5, 0.1}));
        Assert.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getIdleRatio).collect(Collectors.toList()), (Matcher)Matchers.contains((Object[])new Double[]{0.0, 0.1, 0.2}));
        Assert.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getBusyRatio).collect(Collectors.toList()), (Matcher)Matchers.contains((Object[])new Double[]{0.0, 0.9, 0.7}));
        Assert.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getBackpressureLevel).collect(Collectors.toList()), (Matcher)Matchers.contains((Object[])new JobVertexBackPressureInfo.VertexBackPressureLevel[]{JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH, JobVertexBackPressureInfo.VertexBackPressureLevel.LOW, JobVertexBackPressureInfo.VertexBackPressureLevel.OK}));
        Assert.assertThat(jobVertexBackPressureInfo.getSubtasks().stream().map(JobVertexBackPressureInfo.SubtaskBackPressureInfo::getSubtask).collect(Collectors.toList()), (Matcher)Matchers.contains((Object[])new Integer[]{0, 1, 3}));
    }

    @Test
    public void testAbsentBackPressure() throws Exception {
        HashMap<String, String> pathParameters = new HashMap<String, String>();
        pathParameters.put("jobid", TEST_JOB_ID_BACK_PRESSURE_STATS_ABSENT.toString());
        pathParameters.put("vertexid", new JobVertexID().toString());
        HandlerRequest request = new HandlerRequest((RequestBody)EmptyRequestBody.getInstance(), (MessageParameters)new JobVertexMessageParameters(), pathParameters, Collections.emptyMap());
        CompletableFuture jobVertexBackPressureInfoCompletableFuture = this.jobVertexBackPressureHandler.handleRequest(request, (RestfulGateway)this.restfulGateway);
        JobVertexBackPressureInfo jobVertexBackPressureInfo = (JobVertexBackPressureInfo)jobVertexBackPressureInfoCompletableFuture.get();
        Assert.assertThat((Object)jobVertexBackPressureInfo.getStatus(), (Matcher)Matchers.equalTo((Object)JobVertexBackPressureInfo.VertexBackPressureStatus.DEPRECATED));
    }
}

