悲观事务死锁检测

【是否原创】是
【首发渠道】TiDB 社区

死锁检测 leader

每个 tikv 都会开启死锁检测进程,开启的进程有 leader 和 follow 两种角色可以切换,默认为 follow 角色。

leader 角色:维护锁的 DAG 信息,接受检测请求,计算 DAG 检测是否有锁存在等

follow 角色:通过 GRPC 将检测请求发送给 leader,接受检测结果

死锁检测 的 leader 为:包含 key 为空字符串的 region,也就是集群按 key 排序第一个 region 的 region leader 所在的 store 将成为死锁检测的 leader。

    const LEADER_KEY: &[u8] = b"";
    fn is_leader_region(region: &Region) -> bool {
        // The key range of a new created region is empty which misleads the leader
        // of the deadlock detector stepping down.
        //
        // If the peers of a region is not empty, the region info is complete.
        is_region_initialized(region)
            && region.get_start_key() <= LEADER_KEY
            && (region.get_end_key().is_empty() || LEADER_KEY < region.get_end_key())
    }

通过订阅 region 变化的信息,包括 region 创建、更新、role 角色变更来捕获 leader region 的角色变化,然后同步死锁检测的角色。

    pub(crate) fn register(self, host: &mut CoprocessorHost<RocksEngine>) {
        host.registry
            .register_role_observer(1, BoxRoleObserver::new(self.clone()));  // role 变化的时候调用
        host.registry
            .register_region_change_observer(1, BoxRegionChangeObserver::new(self));  // region 变化的时候改变
    }

检测接口函数

死锁检测接口函数,当自己是 leader 时候,直接调用 local 函数接口,如果是 follower,那么通过 grpc 向 leader 查询。

    /// Handles detect requests of itself.
    /// 处理锁 detect.
    fn handle_detect(&mut self, tp: DetectType, txn_ts: TimeStamp, lock: Lock) {
        if self.is_leader() {
            self.handle_detect_locally(tp, txn_ts, lock);
        } else {
            for _ in 0..2 {
                ...
                if self.send_request_to_leader(tp, txn_ts, lock) {
                    return;
                }
                ...
            }
            ...
        }
    }

从这里接口对应三种查询类型:

  • DetectType::Detect,也就是死锁检测的接口,当悲观事务遇到一个锁的时候,就会通过这个接口来检测是否产生了死锁
  • DetectType::CleanUpWaitFor,删除事务等待的一个锁,事务对这个锁没有等待了
  • DetectType::CleanUp,删除这个事务所有等待的锁,比如事务回滚了,所以就清除这个事务的锁等待信息
    fn handle_detect_locally(&self, tp: DetectType, txn_ts: TimeStamp, lock: Lock) {
        let detect_table = &mut self.inner.borrow_mut().detect_table;    // 原子锁的。
        match tp {
            DetectType::Detect => {    // 检测死锁是否存在。
                if let Some(deadlock_key_hash) = detect_table.detect(txn_ts, lock.ts, lock.hash) {
                    self.waiter_mgr_scheduler
                        .deadlock(txn_ts, lock, deadlock_key_hash);    // 处理死锁吧?
                }
            }
            DetectType::CleanUpWaitFor => {    // 清理一个索等待。
                detect_table.clean_up_wait_for(txn_ts, lock.ts, lock.hash)
            }
            DetectType::CleanUp => detect_table.clean_up(txn_ts),    // 删除这个事务的锁等待。
        }
    }

死锁检测算法

当悲观事务过程中,尝试锁定一个 key,发现 key 已经被上锁,这时候会调用死锁检测接口,假设当前事务是 txn_1,持有锁的事务是 txn_lock,检测当事务 txn_1 等待 txn_lock 的锁的情况下,存不存在死锁。

检测算法是构建一个 DAG 有向无环图,如果目前集群存在的锁中存在一条从 txn_lock 到 txn_1 的边,那么就代表死锁将会存在。

/// `Locks` is a set of locks belonging to one transaction.
struct Locks {
    ts: TimeStamp,    // 事务ts吧。
    hashes: Vec<u64>,
    last_detect_time: Instant,
}

/// Used to detect the deadlock of wait-for-lock in the cluster.
pub struct DetectTable {
    /// Keeps the DAG of wait-for-lock. Every edge from `txn_ts` to `lock_ts` has a survival time -- `ttl`.
    /// When checking the deadlock, if the ttl has elpased, the corresponding edge will be removed.
    /// `last_detect_time` is the start time of the edge. `Detect` requests will refresh it.
    // txn_ts => (lock_ts => Locks)
    wait_for_map: HashMap<TimeStamp, HashMap<TimeStamp, Locks>>,

    /// The ttl of every edge.
    ttl: Duration,

    /// The time of last `active_expire`.
    last_active_expire: Instant,

    now: Instant,
}
其中 wait_for_map 是个两层的 hashMap,第一层 key 是等待锁的事务 txn_ts,第二层 key 是等待的事务 txn_lock,第二层 value 是事务 txn_ts 等待事务 txn_lock 持有的锁列表。

wait_for_map 描述了集群事务锁等待的关系,通过 txn_lock,可以查询出当前事务在等待哪些事务的锁、等待哪些锁。

    /// Returns the key hash which causes deadlock.
    /// // 检查是否存在死锁。
    pub fn detect(&mut self, txn_ts: TimeStamp, lock_ts: TimeStamp, lock_hash: u64) -> Option<u64> {
        let _timer = DETECT_DURATION_HISTOGRAM.start_coarse_timer();
        TASK_COUNTER_METRICS.detect.inc();

        self.now = Instant::now_coarse();
        self.active_expire();    // 清理过期的。

        // If `txn_ts` is waiting for `lock_ts`, it won't cause deadlock.
        // 已经有 txn_tx 等待 lock_ts,那么就不会存在 lock_ts 等待 txn_ts,也就是不会存在死锁。
        if self.register_if_existed(txn_ts, lock_ts, lock_hash) {
            return None;
        }

        if let Some(deadlock_key_hash) = self.do_detect(txn_ts, lock_ts) {
            ERROR_COUNTER_METRICS.deadlock.inc();
            return Some(deadlock_key_hash);    // 存在这个死锁。
        }
        self.register(txn_ts, lock_ts, lock_hash);
        None
    }

算法流程:

  1. 清理过期锁(一般很少走这里,只有等待的事务数量达到100000,且距离上次清理达到1个小时才会执行)

  2. 检查是否存在 txn_ts 在等待事务 txn_lock 的锁,如果已经存在,那么必然不存在 txn_lock 到 txn_ts 的边,必然不会有死锁,那么加入新的锁,返回

  3. 调用 do_detect 函数,遍历构建所有 DAG 检查是否有存在 txn_lock 到 txn_ts 的边,如果存在那么死锁就存在

  4. 如果没有死锁存在,那么说明 txn_ts 等待 txn_lock 不会产生死锁,把 txn_ts 等待 txn_lock 的锁信息添加进去

    /// Checks if there is an edge from `wait_for_ts` to `txn_ts`.
    /// 检查有没有从 wait_for_ts 到 txn_tx 的锁。
    fn do_detect(&mut self, txn_ts: TimeStamp, wait_for_ts: TimeStamp) -> Option<u64> {
        let now = self.now;
        let ttl = self.ttl;

        let mut stack = vec![wait_for_ts];
        // Memorize the pushed vertexes to avoid duplicate search.
        let mut pushed: HashSet<TimeStamp> = HashSet::default();
        pushed.insert(wait_for_ts);
        while let Some(wait_for_ts) = stack.pop() {
            if let Some(wait_for) = self.wait_for_map.get_mut(&wait_for_ts) {
                // Remove expired edges.
                wait_for.retain(|_, locks| !locks.is_expired(now, ttl));    // 清理过期的。
                if wait_for.is_empty() {
                    self.wait_for_map.remove(&wait_for_ts);    // 清理掉。
                } else {
                    for (lock_ts, locks) in wait_for {
                        if *lock_ts == txn_ts {
                            return Some(locks.hashes[0]);
                        }
                        if !pushed.contains(lock_ts) {
                            stack.push(*lock_ts);
                            pushed.insert(*lock_ts);
                        }
                    }
                }
            }
        }
        None
    }

do_detect 函数构建 DAG 遍历所有从 wait_for_ts(txn_lock)出发的可能,检查有没有到 txn_ts 的边,如果有,那么返回一个存在的锁的 hash,告诉死锁的存在。

唤醒锁等待

死锁检测中维护了从事务出发可以找到所有等待的锁的信息,当锁被释放、超时、死锁存在情况下,需要唤醒等待锁的事务,这里就需要根据锁 id 找到等待的事务,进行唤醒操作。

锁等待信息

/// If a pessimistic transaction meets a lock, it will wait for the lock
/// released in `WaiterManager`.
///
/// `Waiter` contains the context of the pessimistic transaction. Each `Waiter`
/// has a timeout. Transaction will be notified when the lock is released
/// or the corresponding waiter times out.
pub(crate) struct Waiter {
    pub(crate) start_ts: TimeStamp,
    pub(crate) cb: StorageCallback,
    /// The result of `Command::AcquirePessimisticLock`.
    ///
    /// It contains a `KeyIsLocked` error at the beginning. It will be changed
    /// to `WriteConflict` error if the lock is released or `Deadlock` error if
    /// it causes deadlock.
    pub(crate) pr: ProcessResult,
    pub(crate) lock: Lock,
    delay: Delay,
    _lifetime_timer: HistogramTimer,
}
  • start_ts: 代表等待锁的事务 ts

  • lock: 代表等待的锁

  • pr: 代表等待的锁的结果,例如锁冲突、死锁等

  • delay: 等待超时时间

  • cb: 回调函数,唤醒函数,把锁等待结果 pr 返回给等待锁的事务的钩子函数

// NOTE: Now we assume `Waiters` is not very long.
// Maybe needs to use `BinaryHeap` or sorted `VecDeque` instead.
type Waiters = Vec<Waiter>;

struct WaitTable {
    // Map lock hash to waiters.
    wait_table: HashMap<u64, Waiters>,
    waiter_count: Arc<AtomicUsize>,
}

wait_table 维护了等待某个锁的所有事务列表,key 为锁的 hashId,value 是等待这个锁的所有 Waiter。

至此,当某个 key 上的锁被释放时候,根据锁的 hash ID 查找到所有的 Waiter,选择等待时间最早的事务进行直接唤醒。

锁唤醒

当事务提交或者回滚以后,事务持有的锁将会被释放,事务持有的每一个锁,都会对其 Waiter 进行唤醒操作(只唤醒等待锁最久的 Waiter)。

    fn handle_wake_up(&mut self, lock_ts: TimeStamp, hashes: Vec<u64>, commit_ts: TimeStamp) {
        ...
        for hash in hashes {    // 对于事务的每一个锁都进行唤醒操作
            let lock = Lock { ts: lock_ts, hash };
            // 找到最老的 waiter 进行唤醒
            if let Some((mut oldest, others)) = wait_table.remove_oldest_waiter(lock) {
                // Notify the oldest one immediately.
                self.detector_scheduler
                    .clean_up_wait_for(oldest.start_ts, oldest.lock);
                oldest.conflict_with(lock_ts, commit_ts);
                oldest.notify();
                // Others will be waked up after `wake_up_delay_duration`.
                //
                // NOTE: Actually these waiters are waiting for an unknown transaction.
                // If there is a deadlock between them, it will be detected after timeout.
                if others.is_empty() {
                    // Remove the empty entry here.
                    wait_table.remove(lock);
                } else {
                    others.iter_mut().for_each(|waiter| {
                        waiter.conflict_with(lock_ts, commit_ts);
                        waiter.reset_timeout(new_timeout);
                    });
                }
            }
        }
    }

参数 lock_ts 代表是有锁的事务,hashes 代表持有的锁信息。

  1. 找到等待时间最久的 Waiter,从 Waiter 列表中删除
  2. 删除死锁检测维护的 txn_ts 到 txn_lock 的锁等待信息
  3. 构建唤醒的 pr 结果,调用唤醒函数
  4. 对于其他的 waiter(除了等待最久剩余的),构建唤醒的 pr,通过等待超时方式唤醒

锁管理接口

/// `LockManager` has two components working in two threads:
///   * One is the `WaiterManager` which manages transactions waiting for locks.
///   * The other one is the `Detector` which detects deadlocks between transactions.
pub struct LockManager {
    waiter_mgr_worker: Option<FutureWorker<waiter_manager::Task>>,
    detector_worker: Option<FutureWorker<deadlock::Task>>,

    waiter_mgr_scheduler: WaiterMgrScheduler,
    detector_scheduler: DetectorScheduler,

    waiter_count: Arc<AtomicUsize>,

    /// Record transactions which have sent requests to detect deadlock.
    detected: Arc<[CachePadded<Mutex<HashSet<TimeStamp>>>]>,

    pipelined: Arc<AtomicBool>,
}


impl LockManagerTrait for LockManager {
    fn wait_for(
        &self,
        start_ts: TimeStamp,
        cb: StorageCallback,
        pr: ProcessResult,
        lock: Lock,
        is_first_lock: bool,
        timeout: Option<WaitTimeout>,
    ) {
        let timeout = match timeout {
            Some(t) => t,
            None => {
                cb.execute(pr);
                return;
            }
        };

        // Increase `waiter_count` here to prevent there is an on-the-fly WaitFor msg
        // but the waiter_mgr haven't processed it, subsequent WakeUp msgs may be lost.
        self.waiter_count.fetch_add(1, Ordering::SeqCst);
        self.waiter_mgr_scheduler
            .wait_for(start_ts, cb, pr, lock, timeout);

        // If it is the first lock the transaction tries to lock, it won't cause deadlock.
        if !is_first_lock {    // 不是第一个锁的时候,不检测?问题是不加入这个锁信息。那这个锁等待不会被以后的锁检查
            self.add_to_detected(start_ts);
            self.detector_scheduler.detect(start_ts, lock);    // 这里检测一下。
        }
    }

    fn wake_up(
        &self,
        lock_ts: TimeStamp,
        hashes: Vec<u64>,
        commit_ts: TimeStamp,
        is_pessimistic_txn: bool,
    ) {
        // If `hashes` is some, there may be some waiters waiting for these locks.
        // Try to wake up them.
        if !hashes.is_empty() && self.has_waiter() {
            self.waiter_mgr_scheduler
                .wake_up(lock_ts, hashes, commit_ts);
        }
        // If a pessimistic transaction is committed or rolled back and it once sent requests to
        // detect deadlock, clean up its wait-for entries in the deadlock detector.
        if is_pessimistic_txn && self.remove_from_detected(lock_ts) {
            self.detector_scheduler.clean_up(lock_ts);
        }
    }

    fn has_waiter(&self) -> bool {
        self.waiter_count.load(Ordering::SeqCst) > 0
    }
}