/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.leaderelection;

import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
import org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionDriverFactory;
import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriverFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionState;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.BiConsumerWithException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class ZooKeeperLeaderElectionConnectionHandlingTest
extends TestLogger {
    private static final String PATH = "/path";
    @Rule
    public final ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
    @Rule
    public final TestingFatalErrorHandlerResource fatalErrorHandlerResource = new TestingFatalErrorHandlerResource();
    private final Configuration configuration = new Configuration();

    @Before
    public void setup() {
        this.configuration.set(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, (Object)this.zooKeeperResource.getConnectString());
    }

    @Test
    public void testLoseLeadershipOnConnectionSuspended() throws Exception {
        this.runTestWithBrieflySuspendedZooKeeperConnection(this.configuration, (BiConsumerWithException<TestingConnectionStateListener, TestingContender, Exception>)((BiConsumerWithException)(connectionStateListener, contender) -> {
            connectionStateListener.awaitSuspendedConnection();
            contender.awaitRevokeLeadership(Duration.ofSeconds(1L));
        }));
    }

    @Test
    public void testKeepLeadershipOnSuspendedConnectionIfTolerateSuspendedConnectionsIsEnabled() throws Exception {
        this.configuration.set(HighAvailabilityOptions.ZOOKEEPER_TOLERATE_SUSPENDED_CONNECTIONS, (Object)true);
        this.runTestWithBrieflySuspendedZooKeeperConnection(this.configuration, (BiConsumerWithException<TestingConnectionStateListener, TestingContender, Exception>)((BiConsumerWithException)(connectionStateListener, contender) -> {
            connectionStateListener.awaitSuspendedConnection();
            connectionStateListener.awaitReconnectedConnection();
            Assert.assertFalse((boolean)contender.hasRevokeLeadershipBeenTriggered());
        }));
    }

    @Test
    public void testLoseLeadershipOnLostConnectionIfTolerateSuspendedConnectionsIsEnabled() throws Exception {
        this.configuration.set(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, (Object)1000);
        this.configuration.set(HighAvailabilityOptions.ZOOKEEPER_CONNECTION_TIMEOUT, (Object)1000);
        this.configuration.set(HighAvailabilityOptions.ZOOKEEPER_TOLERATE_SUSPENDED_CONNECTIONS, (Object)true);
        this.runTestWithLostZooKeeperConnection(this.configuration, (BiConsumerWithException<TestingConnectionStateListener, TestingContender, Exception>)((BiConsumerWithException)(connectionStateListener, contender) -> {
            connectionStateListener.awaitLostConnection();
            contender.awaitRevokeLeadership(Duration.ofSeconds(1L));
        }));
    }

    private void runTestWithLostZooKeeperConnection(Configuration configuration, BiConsumerWithException<TestingConnectionStateListener, TestingContender, Exception> validationLogic) throws Exception {
        this.runTestWithZooKeeperConnectionProblem(configuration, validationLogic, Problem.LOST_CONNECTION);
    }

    private void runTestWithBrieflySuspendedZooKeeperConnection(Configuration configuration, BiConsumerWithException<TestingConnectionStateListener, TestingContender, Exception> validationLogic) throws Exception {
        this.runTestWithZooKeeperConnectionProblem(configuration, validationLogic, Problem.SUSPENDED_CONNECTION);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runTestWithZooKeeperConnectionProblem(Configuration configuration, BiConsumerWithException<TestingConnectionStateListener, TestingContender, Exception> validationLogic, Problem problem) throws Exception {
        CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper = ZooKeeperUtils.startCuratorFramework((Configuration)configuration, (FatalErrorHandler)this.fatalErrorHandlerResource.getFatalErrorHandler());
        CuratorFramework client = curatorFrameworkWrapper.asCuratorFramework();
        ZooKeeperLeaderElectionDriverFactory leaderElectionDriverFactory = new ZooKeeperLeaderElectionDriverFactory(client, PATH);
        DefaultLeaderElectionService leaderElectionService = new DefaultLeaderElectionService((LeaderElectionDriverFactory)leaderElectionDriverFactory);
        try {
            TestingConnectionStateListener connectionStateListener = new TestingConnectionStateListener();
            client.getConnectionStateListenable().addListener((Object)connectionStateListener);
            TestingContender contender = new TestingContender();
            leaderElectionService.start((LeaderContender)contender);
            contender.awaitGrantLeadership();
            switch (problem) {
                case SUSPENDED_CONNECTION: {
                    this.zooKeeperResource.restart();
                    break;
                }
                case LOST_CONNECTION: {
                    this.zooKeeperResource.stop();
                    break;
                }
                default: {
                    throw new IllegalArgumentException(String.format("Unknown problem type %s.", new Object[]{problem}));
                }
            }
            validationLogic.accept((Object)connectionStateListener, (Object)contender);
        }
        finally {
            leaderElectionService.stop();
            curatorFrameworkWrapper.close();
            if (problem == Problem.LOST_CONNECTION) {
                this.fatalErrorHandlerResource.getFatalErrorHandler().clearError();
            }
        }
    }

    private static final class TestingConnectionStateListener
    implements ConnectionStateListener {
        private final OneShotLatch connectionSuspendedLatch = new OneShotLatch();
        private final OneShotLatch reconnectedLatch = new OneShotLatch();
        private final OneShotLatch connectionLostLatch = new OneShotLatch();

        public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
            if (connectionState == ConnectionState.SUSPENDED) {
                this.connectionSuspendedLatch.trigger();
            }
            if (connectionState == ConnectionState.RECONNECTED) {
                this.reconnectedLatch.trigger();
            }
            if (connectionState == ConnectionState.LOST) {
                this.connectionLostLatch.trigger();
            }
        }

        public void awaitSuspendedConnection() throws InterruptedException {
            this.connectionSuspendedLatch.await();
        }

        public void awaitReconnectedConnection() throws InterruptedException {
            this.reconnectedLatch.await();
        }

        public void awaitLostConnection() throws InterruptedException {
            this.connectionLostLatch.await();
        }
    }

    private final class TestingContender
    implements LeaderContender {
        private final OneShotLatch grantLeadershipLatch = new OneShotLatch();
        private final OneShotLatch revokeLeadershipLatch = new OneShotLatch();

        private TestingContender() {
        }

        public void grantLeadership(UUID leaderSessionID) {
            this.grantLeadershipLatch.trigger();
        }

        public void revokeLeadership() {
            this.revokeLeadershipLatch.trigger();
        }

        public void handleError(Exception exception) {
            ZooKeeperLeaderElectionConnectionHandlingTest.this.fatalErrorHandlerResource.getFatalErrorHandler().onFatalError(exception);
        }

        public void awaitGrantLeadership() throws InterruptedException {
            this.grantLeadershipLatch.await();
        }

        public boolean hasRevokeLeadershipBeenTriggered() {
            return this.revokeLeadershipLatch.isTriggered();
        }

        public void awaitRevokeLeadership(Duration timeout) throws InterruptedException, TimeoutException {
            this.revokeLeadershipLatch.await(timeout.toMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private static enum Problem {
        LOST_CONNECTION,
        SUSPENDED_CONNECTION;

    }
}

