TiDB 事务源码阅读

资料

乐观事务

TiDB 的乐观事务在提交前,所有的写操作都是 buffer 在内存的,最后提交时才会写到底层 TiKV 。

TiDB 有 2 层 事务抽象,一层是 session.TxnState,另一层是 kv.Transaction :

// TxnState wraps kv.Transaction to provide a new kv.Transaction.
// 1. It holds all statement related modification in the buffer before flush to the txn,
// so if execute statement meets error, the txn won't be made dirty.
// 2. It's a lazy transaction, that means it's a txnFuture before StartTS() is really need.
type TxnState struct {
	// States of a TxnState should be one of the followings:
	// Invalid: kv.Transaction == nil && txnFuture == nil
	// Pending: kv.Transaction == nil && txnFuture != nil
	// Valid:	kv.Transaction != nil && txnFuture == nil
	
  kv.Transaction
  txnFuture *txnFuture
	buf          kv.MemBuffer
	dirtyTableOP []dirtyTableOperation
	...
}

Write

TiDB 在执行 insert/update/delete 等 DML 时,会先把写的数据存在 TxnState.buf 里面,当 DML 语句执行成功后,会调用 StmtCommit 方法把写的数据刷到 kv.Transaction里面,如果执行失败,就调 StmtRollbackTxnState里面的buf 清空。其实在写到 TxnState.buf 之前,可能还会先写到一个 buffer 里面,可以看 tableCommon.AddRecord 函数,这个函数的作用是写一行新的数据到某个表里面:

func (t *tableCommon) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID int64, err error) {
	...
  txn, err := ctx.Txn(true)
	if err != nil {
		return 0, err
	}
  
  rm, err := t.getRollbackableMemStore(ctx)
	// Insert new entries into indices.
  // 索引是先写到一个临时的 buffer 里面
	h, err := t.addIndices(ctx, recordID, r, rm, createIdxOpts)
	if err != nil {
		return h, err
	}
  ...
  // 写行数据到 TxnState.buf 里面
  if err = txn.Set(key, value); err != nil {
		return 0, err
	}
	txn.SetAssertion(key, kv.None)
	
  // 把索引数据写到 TxnState.Buf 里面
	if err = rm.(*kv.BufferStore).SaveTo(txn); err != nil {
		return 0, err
	}
  ...
}

为什么要将索引先写到一个临时的 buffer 里面?举个例子:

create table t (a int, b int, c int, index (a),unique index (b));
begin;
insert into t (a,b,c) values (1,1,1);
insert into t (a,b,c) values (2,1,2); -- ERROR 1062 (23000): Duplicate entry '1' for key 'b'
commit;
select * from t;
+------+------+------+
| a    | b    | c    |
+------+------+------+
|    1 |    1 |    1 |
+------+------+------+

表 t 有2个索引,a 列上有一个普通索引,b 列上有一个唯一索引。在执行第二个 insert 写入时,会因为 b 列上的唯一索引冲突而失败,下面详细分析第二次insert 的写入过程 :

1. 写索引到临时的 buffer 里面
  a. 写索引 a 的数据
  b. 写索引 b 的数据,发生冲突,报错返回

如果索引不是写在临时 buffer 里面,而是直接写在 TxnState.Buf 里面,在报错返回后,会导致 TxnState.Buf 里面多了一个索引 a 的数据,但是对应的行数据又不存在,这就导致了索引数据不一致。

Read

RawKV Read

在事务里面读数据,需要先读 buffer 里面的数据,buffer 里面没有,再去 TiKV 读数据。可以看 TxnState.Get 函数,它会先尝试从 TxnState.buf读,没有再去 kv.Transaction的 buffer 读,再没有就去 TiKV 读数据,见tikvSnapshot.Get 函数。

Cop Read

对于 TableReaderExecutorIndexReaderExecutorIndexLookUpExecutor 这3种 Cop 读请求类型的算子,在事务读里面,会包一层 UnionScanExec算子。

假设原来是 TableReaderExecutor算子 ,被UnionScanExec包了一层后, UnionScanExec 内部还会生成另外一个memTableReader 算子,用来读取事务 Buffer 里面的数据,原来的 TableReaderExecutor 用来从 TiKV 读数据,UnionScanExec 算子会将这两个内部算子读出来的数据做合并后输出,具体见 UnionScanExec.getOneRow 函数,唯一注意的是,如果从 TiKV 读回来的数据如果发现在事务 buffer 里面也存在的话,就会丢弃从 TiKV 读上来的数据,而用事务 buffer 里面的数据。

Commit

事务提交是直接调的 kv.TransactionCommit 方法,其具体实现是 tikvTxn.Commit。TiDB 的事务总体来说就是一个经过优化的二阶段提交的实现。所以会先初始化一个 twoPhaseCommitter ,然后用 initKeysAndMutations 将 buffer 里面的数据转成 pb.Mutation map,并同时做一些事务限制的检查,然后根据 key/value 的size 大小计算 lock TTL 时间(txnLockTTL)。一共有下面几种类型的 mutation:

	Op_Put      Op = 0
	Op_Del      Op = 1
	Op_Lock     Op = 2
	Op_Rollback Op = 3
	// insert operation has a constraint that key should not exist before.
  // 用于将 key should not exists 的 check 下推给 TiKV, 不存在就写入,存在就返回 error
	Op_Insert          Op = 4
	Op_PessimisticLock Op = 5

twoPhaseCommitter.execute 是 2PC 的实现,首先会 prewriteKeys,用 doActionOnKeys 这个函数来对 keys 做 prewritecommitrollbackcleanup 等操作。其具体实现步骤如下:

1. 用 regionCache.GroupKeysByRegion 函数将要操作的 keys 根据 Region 分组。
2. 用 appendBatchBySize 根据 batch limit 将同一个 group(region)的 key 分成多个 batch 
3. 如果是 prewrite 操作,调用 doActionOnBatches 函数,对所有的 key 按 batch 进行 prewrite.
  a. 用 batchExecutor 来启动多个 worker 并行操作一个 batch 的 key

如果 prewrite 的具体实现函数是 prewriteSingleBatch,如果 prewrite 失败,就会终止 commit 然后异步清理(cleanupKeys)已经 prewrite 了的 key.

prewrite 成功后,会用 getTimestampWithRetry 去PD拿一个 commitTS 的时间戳。

然后会检查这个事务操作的相关 table 是否在事务期间有相关的 schema 变更,以及事务时间是否超过了 max-txn-time-use 的限制。通过检查后,会进行 commitKeyscommitprewrite 有一个不同的是,commit 只要将 primary key 提交成功后,其他的 second keys 可以走异步commitcommit的具体实现函数是 commitSingleBatch

ResolveLocks

当在事务中读数据或者 prewrite keys 时,如果 key 上已经有 Lock 了,这时就需要进行 ResolveLock 。为什么会出现这种情况?有以下几种情况:

  1. 事务 txn_1 在完成 prewrite key_1,key_2 后就异常退出了,那么此时事务 txn_2 再去读 key_1, key_2 时,就会发现有 txn_1 在 prewrite 时写的 Lock。
  2. 事务 txn_1 在 prewrite key_1完成,但在 key_2 因为冲突而失败时,txn_1 会终止并异步清理 key_1 上的锁,如果异步清理锁还没完成,此时 txn_2 去读 key_1 ,也会遇到 Lock
  3. 事务 txn_1 在 commit primary key 成功后,是用异步 commit second keys,在异步 commit 还没完成时,txn_2 去读 second keys 时也会遇到 Lock。

那么如何 ResolveLocks 呢?prewrite keys 时会同时带上一个 LockTTLResolveLock的流程首先是检查所有 Lock 的 TTL,记下最久的 expire Time ,并发现如果有 keys 上的 locks 的 ttl 已经过期后,就会发起对这些过期 keys 的 Locks 进行 resolveLock,如果还有 keys 的 locks 没有 resolve,就根据最久的 expire time 进行 back off 后重试。

resolve 已经过期的 Locks 的流程如下 (resolveLock 函数):

  1. getTxnStatusFromLock查询 Lock key 的状态
  2. 如果 lock 以及过期,并根据 Lock 的状态发送 ResolveLockRequest 类型的请到 locked key 所在的region, 根据 lock 的状态执行 commit or rollback.

悲观事务

悲观事务会将需要修改的 key 在执行完 DML 后就上锁,具体见 handlePessimisticDML 函数。 对于 select for update, 会单独用

SelectLockExec 算子在获取相应数据后,就会对相应行的 handle key 上锁,见 doLockKeys 函数。

3赞

感谢分享:+1: