/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.leaderretrieval;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Objects;
import java.util.UUID;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator.org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.flink.shaded.curator.org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionState;
import org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperLeaderRetrievalService
implements LeaderRetrievalService,
NodeCacheListener,
UnhandledErrorListener {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderRetrievalService.class);
    private final Object lock = new Object();
    private final CuratorFramework client;
    private final NodeCache cache;
    private final String retrievalPath;
    private volatile LeaderRetrievalListener leaderListener;
    private String lastLeaderAddress;
    private UUID lastLeaderSessionID;
    private volatile boolean running;
    private final ConnectionStateListener connectionStateListener = new ConnectionStateListener(){

        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            ZooKeeperLeaderRetrievalService.this.handleStateChange(newState);
        }
    };

    public ZooKeeperLeaderRetrievalService(CuratorFramework client, String retrievalPath) {
        this.client = (CuratorFramework)Preconditions.checkNotNull((Object)client, (String)"CuratorFramework client");
        this.cache = new NodeCache(client, retrievalPath);
        this.retrievalPath = (String)Preconditions.checkNotNull((Object)retrievalPath);
        this.leaderListener = null;
        this.lastLeaderAddress = null;
        this.lastLeaderSessionID = null;
        this.running = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start(LeaderRetrievalListener listener) throws Exception {
        Preconditions.checkNotNull((Object)listener, (String)"Listener must not be null.");
        Preconditions.checkState((this.leaderListener == null ? 1 : 0) != 0, (Object)"ZooKeeperLeaderRetrievalService can only be started once.");
        LOG.info("Starting ZooKeeperLeaderRetrievalService {}.", (Object)this.retrievalPath);
        Object object = this.lock;
        synchronized (object) {
            this.leaderListener = listener;
            this.client.getUnhandledErrorListenable().addListener(this);
            this.cache.getListenable().addListener(this);
            this.cache.start();
            this.client.getConnectionStateListenable().addListener(this.connectionStateListener);
            this.running = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() throws Exception {
        LOG.info("Stopping ZooKeeperLeaderRetrievalService {}.", (Object)this.retrievalPath);
        Object object = this.lock;
        synchronized (object) {
            if (!this.running) {
                return;
            }
            this.running = false;
        }
        this.client.getUnhandledErrorListenable().removeListener(this);
        this.client.getConnectionStateListenable().removeListener(this.connectionStateListener);
        try {
            this.cache.close();
        }
        catch (IOException e) {
            throw new Exception("Could not properly stop the ZooKeeperLeaderRetrievalService.", e);
        }
    }

    @Override
    public void nodeChanged() {
        this.retrieveLeaderInformationFromZooKeeper();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void retrieveLeaderInformationFromZooKeeper() {
        Object object = this.lock;
        synchronized (object) {
            if (this.running) {
                try {
                    UUID leaderSessionID;
                    String leaderAddress;
                    LOG.debug("Leader node has changed.");
                    ChildData childData = this.cache.getCurrentData();
                    if (childData == null) {
                        leaderAddress = null;
                        leaderSessionID = null;
                    } else {
                        byte[] data = childData.getData();
                        if (data == null || data.length == 0) {
                            leaderAddress = null;
                            leaderSessionID = null;
                        } else {
                            ByteArrayInputStream bais = new ByteArrayInputStream(data);
                            ObjectInputStream ois = new ObjectInputStream(bais);
                            leaderAddress = ois.readUTF();
                            leaderSessionID = (UUID)ois.readObject();
                        }
                    }
                    this.notifyIfNewLeaderAddress(leaderAddress, leaderSessionID);
                }
                catch (Exception e) {
                    this.leaderListener.handleError(new Exception("Could not handle node changed event.", e));
                    ExceptionUtils.checkInterrupted((Throwable)e);
                }
            } else {
                LOG.debug("Ignoring node change notification since the service has already been stopped.");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleStateChange(ConnectionState newState) {
        switch (newState) {
            case CONNECTED: {
                LOG.debug("Connected to ZooKeeper quorum. Leader retrieval can start.");
                break;
            }
            case SUSPENDED: {
                LOG.warn("Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.");
                Object object = this.lock;
                synchronized (object) {
                    this.notifyLeaderLoss();
                    break;
                }
            }
            case RECONNECTED: {
                LOG.info("Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.");
                this.onReconnectedConnectionState();
                break;
            }
            case LOST: {
                LOG.warn("Connection to ZooKeeper lost. Can no longer retrieve the leader from ZooKeeper.");
                Object object = this.lock;
                synchronized (object) {
                    this.notifyLeaderLoss();
                    break;
                }
            }
        }
    }

    private void onReconnectedConnectionState() {
        this.retrieveLeaderInformationFromZooKeeper();
    }

    @Override
    public void unhandledError(String s, Throwable throwable) {
        this.leaderListener.handleError((Exception)((Object)new FlinkException("Unhandled error in ZooKeeperLeaderRetrievalService:" + s, throwable)));
    }

    @GuardedBy(value="lock")
    private void notifyIfNewLeaderAddress(String newLeaderAddress, UUID newLeaderSessionID) {
        if (!Objects.equals(newLeaderAddress, this.lastLeaderAddress) || !Objects.equals(newLeaderSessionID, this.lastLeaderSessionID)) {
            if (newLeaderAddress == null && newLeaderSessionID == null) {
                LOG.debug("Leader information was lost: The listener will be notified accordingly.");
            } else {
                LOG.debug("New leader information: Leader={}, session ID={}.", (Object)newLeaderAddress, (Object)newLeaderSessionID);
            }
            this.lastLeaderAddress = newLeaderAddress;
            this.lastLeaderSessionID = newLeaderSessionID;
            this.leaderListener.notifyLeaderAddress(newLeaderAddress, newLeaderSessionID);
        }
    }

    @GuardedBy(value="lock")
    private void notifyLeaderLoss() {
        this.notifyIfNewLeaderAddress(null, null);
    }
}

