/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.util.function.FunctionWithException;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;

public class SourceStreamTaskTestBase {
    public void testMetrics(FunctionWithException<Environment, ? extends StreamTask<Integer, ?>, Exception> taskFactory, StreamOperatorFactory<?> operatorFactory, Matcher<Double> busyTimeMatcher) throws Exception {
        long sleepTime = 42L;
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(taskFactory, BasicTypeInfo.INT_TYPE_INFO);
        ConcurrentHashMap<String, Metric> metrics = new ConcurrentHashMap<String, Metric>();
        StreamTaskTestHarness.TestTaskMetricGroup taskMetricGroup = new StreamTaskTestHarness.TestTaskMetricGroup(metrics);
        try (StreamTaskMailboxTestHarness harness = builder.setupOutputForSingletonOperatorChain(operatorFactory).setTaskMetricGroup(taskMetricGroup).build();){
            Future triggerFuture = harness.streamTask.triggerCheckpointAsync(new CheckpointMetaData(1L, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation());
            OneShotLatch checkpointAcknowledgeLatch = new OneShotLatch();
            harness.getCheckpointResponder().setAcknowledgeLatch(checkpointAcknowledgeLatch);
            Assert.assertFalse((boolean)triggerFuture.isDone());
            Thread.sleep(sleepTime);
            while (!triggerFuture.isDone()) {
                harness.streamTask.runMailboxStep();
            }
            Gauge checkpointStartDelayGauge = (Gauge)metrics.get("checkpointStartDelayNanos");
            Assert.assertThat((Object)checkpointStartDelayGauge.getValue(), (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(sleepTime * 1000000L)));
            Gauge busyTimeGauge = (Gauge)metrics.get("busyTimeMsPerSecond");
            Assert.assertThat((Object)busyTimeGauge.getValue(), busyTimeMatcher);
            checkpointAcknowledgeLatch.await();
            TestCheckpointResponder.AcknowledgeReport acknowledgeReport = (TestCheckpointResponder.AcknowledgeReport)Iterables.getOnlyElement((Iterable)harness.getCheckpointResponder().getAcknowledgeReports());
            Assert.assertThat((Object)acknowledgeReport.getCheckpointMetrics().getCheckpointStartDelayNanos(), (Matcher)Matchers.greaterThanOrEqualTo((Comparable)Long.valueOf(sleepTime * 1000000L)));
        }
    }
}

