/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.lockmgr;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.LockComponentBuilder;
import org.apache.hadoop.hive.metastore.LockRequestBuilder;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lockmgr.DbLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManagerImpl;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.HiveOperation;
import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.LockTableDesc;
import org.apache.hadoop.hive.ql.plan.UnlockDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class DbTxnManager
extends HiveTxnManagerImpl {
    private static final String CLASS_NAME = DbTxnManager.class.getName();
    private static final Logger LOG = LoggerFactory.getLogger((String)CLASS_NAME);
    private volatile DbLockManager lockMgr = null;
    private volatile long txnId = 0L;
    private Map<String, Long> tableWriteIds = new HashMap<String, Long>();
    private int stmtId = -1;
    private int numStatements = 0;
    private boolean isExplicitTransaction = false;
    private int startTransactionCount = 0;
    private String queryId;
    private static ScheduledExecutorService heartbeatExecutorService = null;
    private ScheduledFuture<?> heartbeatTask = null;
    private Runnable shutdownRunner = new Runnable(){

        @Override
        public void run() {
            if (heartbeatExecutorService != null && !heartbeatExecutorService.isShutdown() && !heartbeatExecutorService.isTerminated()) {
                LOG.info("Shutting down Heartbeater thread pool.");
                heartbeatExecutorService.shutdown();
            }
        }
    };
    private static final int SHUTDOWN_HOOK_PRIORITY = 0;

    IMetaStoreClient getMS() throws LockException {
        try {
            return Hive.get(this.conf).getMSC();
        }
        catch (MetaException | HiveException e) {
            String msg = "Unable to reach Hive Metastore: " + e.getMessage();
            LOG.error(msg, (Throwable)e);
            throw new LockException(e);
        }
    }

    DbTxnManager() {
        ShutdownHookManager.addShutdownHook(this.shutdownRunner, 0);
    }

    @Override
    void setHiveConf(HiveConf conf) {
        super.setHiveConf(conf);
        if (!conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY)) {
            throw new RuntimeException(ErrorMsg.DBTXNMGR_REQUIRES_CONCURRENCY.getMsg());
        }
    }

    @Override
    public List<Long> replOpenTxn(String replPolicy, List<Long> srcTxnIds, String user) throws LockException {
        try {
            return this.getMS().replOpenTxn(replPolicy, srcTxnIds, user);
        }
        catch (TException e) {
            throw new LockException(e, ErrorMsg.METASTORE_COMMUNICATION_FAILED);
        }
    }

    @Override
    public long openTxn(Context ctx, String user) throws LockException {
        return this.openTxn(ctx, user, 0L);
    }

    @VisibleForTesting
    long openTxn(Context ctx, String user, long delay) throws LockException {
        this.init();
        this.getLockManager();
        if (this.isTxnOpen()) {
            throw new LockException("Transaction already opened. " + JavaUtils.txnIdToString(this.txnId));
        }
        try {
            this.txnId = this.getMS().openTxn(user);
            this.stmtId = 0;
            this.numStatements = 0;
            this.tableWriteIds.clear();
            this.isExplicitTransaction = false;
            this.startTransactionCount = 0;
            LOG.debug("Opened " + JavaUtils.txnIdToString(this.txnId));
            ctx.setHeartbeater(this.startHeartbeat(delay));
            return this.txnId;
        }
        catch (TException e) {
            throw new LockException(e, ErrorMsg.METASTORE_COMMUNICATION_FAILED);
        }
    }

    @Override
    public HiveLockManager getLockManager() throws LockException {
        this.init();
        if (this.lockMgr == null) {
            this.lockMgr = new DbLockManager(this.conf, this);
        }
        return this.lockMgr;
    }

    @Override
    public void acquireLocks(QueryPlan plan, Context ctx, String username) throws LockException {
        try {
            this.acquireLocksWithHeartbeatDelay(plan, ctx, username, 0L);
        }
        catch (LockException e) {
            if (e.getCause() instanceof TxnAbortedException) {
                this.txnId = 0L;
                this.stmtId = -1;
                this.tableWriteIds.clear();
            }
            throw e;
        }
    }

    private static String getQueryIdWaterMark(QueryPlan queryPlan) {
        return "queryId=" + queryPlan.getQueryId();
    }

    private void markExplicitTransaction(QueryPlan queryPlan) throws LockException {
        this.isExplicitTransaction = true;
        if (++this.startTransactionCount > 1) {
            throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, queryPlan.getOperationName(), JavaUtils.txnIdToString(this.getCurrentTxnId()), queryPlan.getQueryId());
        }
    }

    private void verifyState(QueryPlan queryPlan) throws LockException {
        if (!this.isTxnOpen()) {
            throw new LockException("No transaction context for operation: " + queryPlan.getOperationName() + " for " + DbTxnManager.getQueryIdWaterMark(queryPlan));
        }
        if (queryPlan.getOperation() == null) {
            throw new IllegalStateException("Unkown HiverOperation for " + DbTxnManager.getQueryIdWaterMark(queryPlan));
        }
        ++this.numStatements;
        switch (queryPlan.getOperation()) {
            case START_TRANSACTION: {
                this.markExplicitTransaction(queryPlan);
                break;
            }
            case COMMIT: 
            case ROLLBACK: {
                if (!this.isTxnOpen()) {
                    throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_WITHOUT_TXN, queryPlan.getOperationName());
                }
                if (this.isExplicitTransaction) break;
                throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_IMPLICIT_TXN, queryPlan.getOperationName());
            }
            default: {
                if (queryPlan.getOperation().isAllowedInTransaction() || !this.isExplicitTransaction || this.allowOperationInATransaction(queryPlan)) break;
                throw new LockException(null, ErrorMsg.OP_NOT_ALLOWED_IN_TXN, queryPlan.getOperationName(), JavaUtils.txnIdToString(this.getCurrentTxnId()), queryPlan.getQueryId());
            }
        }
    }

    private boolean allowOperationInATransaction(QueryPlan queryPlan) {
        WriteEntity writeEntity;
        if (queryPlan.getOperation() == HiveOperation.LOAD && queryPlan.getOutputs() != null && queryPlan.getOutputs().size() == 1 && AcidUtils.isTransactionalTable((writeEntity = queryPlan.getOutputs().iterator().next()).getTable())) {
            switch (writeEntity.getWriteType()) {
                case INSERT: {
                    return true;
                }
                case INSERT_OVERWRITE: {
                    return false;
                }
            }
            return false;
        }
        return false;
    }

    private boolean needsLock(Entity entity) {
        switch (entity.getType()) {
            case TABLE: {
                return this.isLockableTable(entity.getTable());
            }
            case PARTITION: {
                return this.isLockableTable(entity.getPartition().getTable());
            }
        }
        return true;
    }

    private boolean isLockableTable(Table t) {
        if (t.isTemporary()) {
            return false;
        }
        switch (t.getTableType()) {
            case MANAGED_TABLE: 
            case MATERIALIZED_VIEW: {
                return true;
            }
        }
        return false;
    }

    @VisibleForTesting
    LockState acquireLocks(QueryPlan plan, Context ctx, String username, boolean isBlocking) throws LockException {
        LockComponent comp;
        Table t;
        LockComponentBuilder compBuilder;
        this.init();
        this.getLockManager();
        this.verifyState(plan);
        boolean atLeastOneLock = false;
        this.queryId = plan.getQueryId();
        switch (plan.getOperation()) {
            case SET_AUTOCOMMIT: {
                return null;
            }
        }
        LockRequestBuilder rqstBuilder = new LockRequestBuilder(this.queryId);
        LOG.info("Setting lock request transaction to " + JavaUtils.txnIdToString(this.txnId) + " for queryId=" + this.queryId);
        rqstBuilder.setTransactionId(this.txnId).setUser(username);
        block27: for (ReadEntity input : plan.getInputs()) {
            if (!input.needsLock() || input.isUpdateOrDelete() || !this.needsLock(input)) continue;
            compBuilder = new LockComponentBuilder();
            compBuilder.setShared();
            compBuilder.setOperationType(DataOperationType.SELECT);
            t = null;
            switch (input.getType()) {
                case DATABASE: {
                    compBuilder.setDbName(input.getDatabase().getName());
                    break;
                }
                case TABLE: {
                    t = input.getTable();
                    compBuilder.setDbName(t.getDbName());
                    compBuilder.setTableName(t.getTableName());
                    break;
                }
                case PARTITION: 
                case DUMMYPARTITION: {
                    compBuilder.setPartitionName(input.getPartition().getName());
                    t = input.getPartition().getTable();
                    compBuilder.setDbName(t.getDbName());
                    compBuilder.setTableName(t.getTableName());
                    break;
                }
                default: {
                    continue block27;
                }
            }
            if (t != null) {
                compBuilder.setIsTransactional(AcidUtils.isTransactionalTable(t));
            }
            comp = compBuilder.build();
            LOG.debug("Adding lock component to lock request " + comp.toString());
            rqstBuilder.addLockComponent(comp);
            atLeastOneLock = true;
        }
        block28: for (WriteEntity output : plan.getOutputs()) {
            LOG.debug("output is null " + (output == null));
            if (output.getType() == Entity.Type.DFS_DIR || output.getType() == Entity.Type.LOCAL_DIR || !this.needsLock(output)) continue;
            compBuilder = new LockComponentBuilder();
            t = null;
            switch (output.getType()) {
                case DATABASE: {
                    compBuilder.setDbName(output.getDatabase().getName());
                    break;
                }
                case TABLE: 
                case DUMMYPARTITION: {
                    t = output.getTable();
                    compBuilder.setDbName(t.getDbName());
                    compBuilder.setTableName(t.getTableName());
                    break;
                }
                case PARTITION: {
                    compBuilder.setPartitionName(output.getPartition().getName());
                    t = output.getPartition().getTable();
                    compBuilder.setDbName(t.getDbName());
                    compBuilder.setTableName(t.getTableName());
                    break;
                }
                default: {
                    continue block28;
                }
            }
            switch (output.getWriteType()) {
                case DDL_EXCLUSIVE: {
                    compBuilder.setExclusive();
                    compBuilder.setOperationType(DataOperationType.NO_TXN);
                    break;
                }
                case INSERT_OVERWRITE: {
                    t = DbTxnManager.getTable(output);
                    if (AcidUtils.isTransactionalTable(t)) {
                        if (this.conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK)) {
                            compBuilder.setExclusive();
                        } else {
                            compBuilder.setSemiShared();
                        }
                        compBuilder.setOperationType(DataOperationType.UPDATE);
                        break;
                    }
                    compBuilder.setExclusive();
                    compBuilder.setOperationType(DataOperationType.NO_TXN);
                    break;
                }
                case INSERT: {
                    assert (t != null);
                    if (AcidUtils.isTransactionalTable(t)) {
                        compBuilder.setShared();
                    } else if (MetaStoreUtils.isNonNativeTable(t.getTTable())) {
                        HiveStorageHandler storageHandler = Preconditions.checkNotNull(t.getStorageHandler(), "Thought all the non native tables have an instance of storage handler");
                        LockType lockType = storageHandler.getLockType(output);
                        switch (lockType) {
                            case EXCLUSIVE: {
                                compBuilder.setExclusive();
                                break;
                            }
                            case SHARED_READ: {
                                compBuilder.setShared();
                                break;
                            }
                            case SHARED_WRITE: {
                                compBuilder.setSemiShared();
                                break;
                            }
                            default: {
                                throw new IllegalArgumentException(String.format("Lock type [%s] for Database.Table [%s.%s] is unknown", lockType, t.getDbName(), t.getTableName()));
                            }
                        }
                    } else if (this.conf.getBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE)) {
                        compBuilder.setExclusive();
                    } else {
                        compBuilder.setShared();
                    }
                    compBuilder.setOperationType(DataOperationType.INSERT);
                    break;
                }
                case DDL_SHARED: {
                    compBuilder.setShared();
                    compBuilder.setOperationType(DataOperationType.NO_TXN);
                    break;
                }
                case UPDATE: {
                    compBuilder.setSemiShared();
                    compBuilder.setOperationType(DataOperationType.UPDATE);
                    break;
                }
                case DELETE: {
                    compBuilder.setSemiShared();
                    compBuilder.setOperationType(DataOperationType.DELETE);
                    break;
                }
                case DDL_NO_LOCK: {
                    continue block28;
                }
                default: {
                    throw new RuntimeException("Unknown write type " + output.getWriteType().toString());
                }
            }
            if (t != null) {
                compBuilder.setIsTransactional(AcidUtils.isTransactionalTable(t));
            }
            compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite());
            comp = compBuilder.build();
            LOG.debug("Adding lock component to lock request " + comp.toString());
            rqstBuilder.addLockComponent(comp);
            atLeastOneLock = true;
        }
        if (!atLeastOneLock) {
            LOG.debug("No locks needed for queryId" + this.queryId);
            return null;
        }
        ArrayList<HiveLock> locks = new ArrayList<HiveLock>(1);
        LockState lockState = this.lockMgr.lock(rqstBuilder.build(), this.queryId, isBlocking, locks);
        ctx.setHiveLocks(locks);
        return lockState;
    }

    private static Table getTable(WriteEntity we) {
        Table t = we.getTable();
        if (t == null) {
            throw new IllegalStateException("No table info for " + we);
        }
        return t;
    }

    @VisibleForTesting
    void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String username, long delay) throws LockException {
        LockState ls = this.acquireLocks(plan, ctx, username, true);
        if (ls != null && !this.isTxnOpen()) {
            ctx.setHeartbeater(this.startHeartbeat(delay));
        }
    }

    @Override
    public void releaseLocks(List<HiveLock> hiveLocks) throws LockException {
        if (this.lockMgr != null) {
            this.stopHeartbeat();
            this.lockMgr.releaseLocks(hiveLocks);
        }
    }

    @Override
    public void replCommitTxn(String replPolicy, long srcTxnId) throws LockException {
        try {
            this.getMS().replCommitTxn(srcTxnId, replPolicy);
        }
        catch (NoSuchTxnException e) {
            LOG.error("Metastore could not find " + JavaUtils.txnIdToString(srcTxnId));
            throw new LockException((Throwable)e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(srcTxnId));
        }
        catch (TxnAbortedException e) {
            LockException le = new LockException((Throwable)e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(srcTxnId), e.getMessage());
            LOG.error(le.getMessage());
            throw le;
        }
        catch (TException e) {
            throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
        }
    }

    @Override
    public void commitTxn() throws LockException {
        if (!this.isTxnOpen()) {
            throw new RuntimeException("Attempt to commit before opening a transaction");
        }
        try {
            this.lockMgr.clearLocalLockRecords();
            this.stopHeartbeat();
            LOG.debug("Committing txn " + JavaUtils.txnIdToString(this.txnId));
            this.getMS().commitTxn(this.txnId);
        }
        catch (NoSuchTxnException e) {
            LOG.error("Metastore could not find " + JavaUtils.txnIdToString(this.txnId));
            throw new LockException((Throwable)e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(this.txnId));
        }
        catch (TxnAbortedException e) {
            LockException le = new LockException((Throwable)e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(this.txnId), e.getMessage());
            LOG.error(le.getMessage());
            throw le;
        }
        catch (TException e) {
            throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
        }
        finally {
            this.txnId = 0L;
            this.stmtId = -1;
            this.numStatements = 0;
            this.tableWriteIds.clear();
        }
    }

    @Override
    public void replRollbackTxn(String replPolicy, long srcTxnId) throws LockException {
        try {
            this.getMS().replRollbackTxn(srcTxnId, replPolicy);
        }
        catch (NoSuchTxnException e) {
            LOG.error("Metastore could not find " + JavaUtils.txnIdToString(srcTxnId));
            throw new LockException((Throwable)e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(srcTxnId));
        }
        catch (TxnAbortedException e) {
            LockException le = new LockException((Throwable)e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(srcTxnId), e.getMessage());
            LOG.error(le.getMessage());
            throw le;
        }
        catch (TException e) {
            throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
        }
    }

    @Override
    public void rollbackTxn() throws LockException {
        if (!this.isTxnOpen()) {
            throw new RuntimeException("Attempt to rollback before opening a transaction");
        }
        try {
            this.lockMgr.clearLocalLockRecords();
            this.stopHeartbeat();
            LOG.debug("Rolling back " + JavaUtils.txnIdToString(this.txnId));
            this.getMS().rollbackTxn(this.txnId);
        }
        catch (NoSuchTxnException e) {
            LOG.error("Metastore could not find " + JavaUtils.txnIdToString(this.txnId));
            throw new LockException((Throwable)e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(this.txnId));
        }
        catch (TxnAbortedException e) {
            throw new LockException((Throwable)e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(this.txnId));
        }
        catch (TException e) {
            throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
        }
        finally {
            this.txnId = 0L;
            this.stmtId = -1;
            this.numStatements = 0;
            this.tableWriteIds.clear();
        }
    }

    @Override
    public void replTableWriteIdState(String validWriteIdList, String dbName, String tableName, List<String> partNames) throws LockException {
        try {
            this.getMS().replTableWriteIdState(validWriteIdList, dbName, tableName, partNames);
        }
        catch (TException e) {
            throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
        }
    }

    @Override
    public void heartbeat() throws LockException {
        List<HiveLock> locks;
        if (this.isTxnOpen()) {
            DbLockManager.DbHiveLock dummyLock = new DbLockManager.DbHiveLock(0L);
            locks = new ArrayList<HiveLock>(1);
            locks.add(dummyLock);
        } else {
            locks = this.lockMgr.getLocks(false, false);
        }
        if (LOG.isInfoEnabled()) {
            StringBuilder sb = new StringBuilder("Sending heartbeat for ").append(JavaUtils.txnIdToString(this.txnId)).append(" and");
            for (HiveLock lock2 : locks) {
                sb.append(" ").append(lock2.toString());
            }
            LOG.info(sb.toString());
        }
        if (!this.isTxnOpen() && locks.isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("No need to send heartbeat as there is no transaction and no locks.");
            }
            return;
        }
        for (HiveLock lock3 : locks) {
            long lockId = ((DbLockManager.DbHiveLock)lock3).lockId;
            try {
                this.getMS().heartbeat(this.txnId, lockId);
            }
            catch (NoSuchLockException e) {
                LOG.error("Unable to find lock " + JavaUtils.lockIdToString(lockId));
                throw new LockException((Throwable)e, ErrorMsg.LOCK_NO_SUCH_LOCK, JavaUtils.lockIdToString(lockId));
            }
            catch (NoSuchTxnException e) {
                LOG.error("Unable to find transaction " + JavaUtils.txnIdToString(this.txnId));
                throw new LockException((Throwable)e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(this.txnId));
            }
            catch (TxnAbortedException e) {
                LockException le = new LockException((Throwable)e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(this.txnId), e.getMessage());
                LOG.error(le.getMessage());
                throw le;
            }
            catch (TException e) {
                throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg() + "(" + JavaUtils.txnIdToString(this.txnId) + "," + lock3.toString() + ")", e);
            }
        }
    }

    private Heartbeater startHeartbeat(long initialDelay) throws LockException {
        UserGroupInformation currentUser;
        long heartbeatInterval = DbTxnManager.getHeartbeatInterval(this.conf);
        assert (heartbeatInterval > 0L);
        try {
            currentUser = UserGroupInformation.getCurrentUser();
        }
        catch (IOException e) {
            throw new LockException("error while getting current user,", e);
        }
        Heartbeater heartbeater = new Heartbeater(this, this.conf, this.queryId, currentUser);
        this.heartbeatTask = this.startHeartbeat(initialDelay, heartbeatInterval, heartbeater);
        LOG.debug("Started heartbeat with delay/interval = " + initialDelay + "/" + heartbeatInterval + " " + (Object)((Object)TimeUnit.MILLISECONDS) + " for query: " + this.queryId);
        return heartbeater;
    }

    private ScheduledFuture<?> startHeartbeat(long initialDelay, long heartbeatInterval, Runnable heartbeater) {
        if (this.conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && this.conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) {
            initialDelay = 0L;
        } else if (initialDelay == 0L) {
            initialDelay = (long)Math.floor((double)heartbeatInterval * 0.75 * Math.random());
        }
        ScheduledFuture<?> task = heartbeatExecutorService.scheduleAtFixedRate(heartbeater, initialDelay, heartbeatInterval, TimeUnit.MILLISECONDS);
        return task;
    }

    private void stopHeartbeat() throws LockException {
        if (this.heartbeatTask != null) {
            this.heartbeatTask.cancel(true);
            long startTime = System.currentTimeMillis();
            long sleepInterval = 100L;
            while (!this.heartbeatTask.isCancelled() && !this.heartbeatTask.isDone()) {
                long now = System.currentTimeMillis();
                if (now - startTime > 30000L) {
                    LOG.warn("Heartbeat task cannot be cancelled for unknown reason. QueryId: " + this.queryId);
                    break;
                }
                try {
                    Thread.sleep(sleepInterval);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                sleepInterval *= 2L;
            }
            if (this.heartbeatTask.isCancelled() || this.heartbeatTask.isDone()) {
                LOG.info("Stopped heartbeat for query: " + this.queryId);
            }
            this.heartbeatTask = null;
            this.queryId = null;
        }
    }

    @Override
    public ValidTxnList getValidTxns() throws LockException {
        assert (this.isTxnOpen());
        this.init();
        try {
            return this.getMS().getValidTxns(this.txnId);
        }
        catch (TException e) {
            throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
        }
    }

    @Override
    public ValidTxnWriteIdList getValidWriteIds(List<String> tableList, String validTxnList) throws LockException {
        assert (this.isTxnOpen());
        assert (validTxnList != null && !validTxnList.isEmpty());
        try {
            return TxnUtils.createValidTxnWriteIdList(this.txnId, this.getMS().getValidWriteIds(tableList, validTxnList));
        }
        catch (TException e) {
            throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
        }
    }

    @Override
    public String getTxnManagerName() {
        return CLASS_NAME;
    }

    @Override
    public boolean supportsExplicitLock() {
        return false;
    }

    @Override
    public int lockTable(Hive db, LockTableDesc lockTbl) throws HiveException {
        super.lockTable(db, lockTbl);
        throw new UnsupportedOperationException();
    }

    @Override
    public int unlockTable(Hive hiveDB, UnlockTableDesc unlockTbl) throws HiveException {
        super.unlockTable(hiveDB, unlockTbl);
        throw new UnsupportedOperationException();
    }

    @Override
    public int lockDatabase(Hive hiveDB, LockDatabaseDesc lockDb) throws HiveException {
        super.lockDatabase(hiveDB, lockDb);
        throw new UnsupportedOperationException();
    }

    @Override
    public int unlockDatabase(Hive hiveDB, UnlockDatabaseDesc unlockDb) throws HiveException {
        super.unlockDatabase(hiveDB, unlockDb);
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean useNewShowLocksFormat() {
        return true;
    }

    @Override
    public boolean supportsAcid() {
        return true;
    }

    @Override
    public boolean recordSnapshot(QueryPlan queryPlan) {
        assert (this.isTxnOpen());
        assert (this.numStatements > 0) : "was acquireLocks() called already?";
        if (queryPlan.getOperation() == HiveOperation.START_TRANSACTION) {
            assert (this.isExplicitTransaction);
            assert (this.numStatements == 1);
            return true;
        }
        if (!this.isExplicitTransaction) {
            assert (this.numStatements == 1) : "numStatements=" + this.numStatements + " in implicit txn";
            if (queryPlan.hasAcidResourcesInQuery()) {
                return true;
            }
        }
        return false;
    }

    @Override
    public boolean isImplicitTransactionOpen() {
        if (!this.isTxnOpen()) {
            return false;
        }
        if (!this.isExplicitTransaction) {
            assert (this.numStatements == 1) : "numStatements=" + this.numStatements;
            return true;
        }
        return false;
    }

    @Override
    protected void destruct() {
        try {
            this.stopHeartbeat();
            if (this.shutdownRunner != null) {
                ShutdownHookManager.removeShutdownHook(this.shutdownRunner);
            }
            if (this.isTxnOpen()) {
                this.rollbackTxn();
            }
            if (this.lockMgr != null) {
                this.lockMgr.close();
            }
        }
        catch (Exception e) {
            LOG.error("Caught exception " + e.getClass().getName() + " with message <" + e.getMessage() + ">, swallowing as there is nothing we can do with it.");
        }
    }

    private void init() throws LockException {
        if (this.conf == null) {
            throw new RuntimeException("Must call setHiveConf before any other methods.");
        }
        this.initHeartbeatExecutorService();
    }

    private synchronized void initHeartbeatExecutorService() {
        if (heartbeatExecutorService != null && !heartbeatExecutorService.isShutdown() && !heartbeatExecutorService.isTerminated()) {
            return;
        }
        heartbeatExecutorService = Executors.newScheduledThreadPool(this.conf.getIntVar(HiveConf.ConfVars.HIVE_TXN_HEARTBEAT_THREADPOOL_SIZE), new ThreadFactory(){
            private final AtomicInteger threadCounter = new AtomicInteger();

            @Override
            public Thread newThread(Runnable r) {
                return new HeartbeaterThread(r, "Heartbeater-" + this.threadCounter.getAndIncrement());
            }
        });
        ((ScheduledThreadPoolExecutor)heartbeatExecutorService).setRemoveOnCancelPolicy(true);
    }

    @Override
    public boolean isTxnOpen() {
        return this.txnId > 0L;
    }

    @Override
    public long getCurrentTxnId() {
        return this.txnId;
    }

    @Override
    public int getStmtIdAndIncrement() {
        assert (this.isTxnOpen());
        return this.stmtId++;
    }

    @Override
    public long getTableWriteId(String dbName, String tableName) throws LockException {
        assert (this.isTxnOpen());
        String fullTableName = AcidUtils.getFullTableName(dbName, tableName);
        if (this.tableWriteIds.containsKey(fullTableName)) {
            return this.tableWriteIds.get(fullTableName);
        }
        try {
            long writeId = this.getMS().allocateTableWriteId(this.txnId, dbName, tableName);
            this.tableWriteIds.put(fullTableName, writeId);
            return writeId;
        }
        catch (TException e) {
            throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
        }
    }

    @Override
    public LockResponse acquireMaterializationRebuildLock(String dbName, String tableName, long txnId) throws LockException {
        LockResponse lockResponse;
        try {
            lockResponse = this.getMS().lockMaterializationRebuild(dbName, tableName, txnId);
        }
        catch (TException e) {
            throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
        }
        if (lockResponse.getState() == LockState.ACQUIRED) {
            long initialDelay = 0L;
            long heartbeatInterval = DbTxnManager.getHeartbeatInterval(this.conf);
            assert (heartbeatInterval > 0L);
            MaterializationRebuildLockHeartbeater heartbeater = new MaterializationRebuildLockHeartbeater(this, dbName, tableName, this.queryId, txnId);
            ScheduledFuture<?> task = this.startHeartbeat(initialDelay, heartbeatInterval, heartbeater);
            heartbeater.task.set(task);
            LOG.debug("Started heartbeat for materialization rebuild lock for {} with delay/interval = {}/{} {} for query: {}", new Object[]{AcidUtils.getFullTableName(dbName, tableName), initialDelay, heartbeatInterval, TimeUnit.MILLISECONDS, this.queryId});
        }
        return lockResponse;
    }

    private boolean heartbeatMaterializationRebuildLock(String dbName, String tableName, long txnId) throws LockException {
        try {
            return this.getMS().heartbeatLockMaterializationRebuild(dbName, tableName, txnId);
        }
        catch (TException e) {
            throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
        }
    }

    @Override
    public void replAllocateTableWriteIdsBatch(String dbName, String tableName, String replPolicy, List<TxnToWriteId> srcTxnToWriteIdList) throws LockException {
        try {
            this.getMS().replAllocateTableWriteIdsBatch(dbName, tableName, replPolicy, srcTxnToWriteIdList);
        }
        catch (TException e) {
            throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
        }
    }

    public static long getHeartbeatInterval(Configuration conf) throws LockException {
        long interval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2L;
        if (interval == 0L) {
            throw new LockException(HiveConf.ConfVars.HIVE_TXN_TIMEOUT.toString() + " not set, heartbeats won't be sent");
        }
        return interval;
    }

    private static class MaterializationRebuildLockHeartbeater
    implements Runnable {
        private final DbTxnManager txnMgr;
        private final String dbName;
        private final String tableName;
        private final String queryId;
        private final long txnId;
        private final AtomicReference<ScheduledFuture<?>> task;

        MaterializationRebuildLockHeartbeater(DbTxnManager txnMgr, String dbName, String tableName, String queryId, long txnId) {
            this.txnMgr = txnMgr;
            this.queryId = queryId;
            this.dbName = dbName;
            this.tableName = tableName;
            this.txnId = txnId;
            this.task = new AtomicReference();
        }

        @Override
        public void run() {
            ScheduledFuture<?> t;
            boolean refreshed;
            LOG.trace("Heartbeating materialization rebuild lock for {} for query: {}", (Object)AcidUtils.getFullTableName(this.dbName, this.tableName), (Object)this.queryId);
            try {
                refreshed = this.txnMgr.heartbeatMaterializationRebuildLock(this.dbName, this.tableName, this.txnId);
            }
            catch (LockException e) {
                LOG.error("Failed trying to acquire lock", (Throwable)e);
                throw new RuntimeException(e);
            }
            if (!refreshed && (t = this.task.get()) != null) {
                t.cancel(false);
                LOG.debug("Stopped heartbeat for materialization rebuild lock for {} for query: {}", (Object)AcidUtils.getFullTableName(this.dbName, this.tableName), (Object)this.queryId);
            }
        }
    }

    public static class Heartbeater
    implements Runnable {
        private HiveTxnManager txnMgr;
        private HiveConf conf;
        private UserGroupInformation currentUser;
        LockException lockException;
        private final String queryId;

        public LockException getLockException() {
            return this.lockException;
        }

        Heartbeater(HiveTxnManager txnMgr, HiveConf conf, String queryId, UserGroupInformation currentUser) {
            this.txnMgr = txnMgr;
            this.conf = conf;
            this.currentUser = currentUser;
            this.lockException = null;
            this.queryId = queryId;
        }

        @Override
        public void run() {
            try {
                if (this.conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && this.conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) {
                    throw new LockException(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER.name() + "=true");
                }
                LOG.debug("Heartbeating...for currentUser: " + this.currentUser);
                this.currentUser.doAs(() -> {
                    this.txnMgr.heartbeat();
                    return null;
                });
            }
            catch (LockException e) {
                LOG.error("Failed trying to heartbeat queryId=" + this.queryId + ", currentUser: " + this.currentUser + ": " + e.getMessage());
                this.lockException = e;
            }
            catch (Throwable t) {
                String errorMsg = "Failed trying to heartbeat queryId=" + this.queryId + ", currentUser: " + this.currentUser + ": " + t.getMessage();
                LOG.error(errorMsg, t);
                this.lockException = new LockException(errorMsg, t);
            }
        }
    }

    public static class HeartbeaterThread
    extends Thread {
        HeartbeaterThread(Runnable target, String name) {
            super(target, name);
            this.setDaemon(true);
        }
    }
}

