cdc 同步到 s3 的故障

【是否原创】是
【首发渠道】TiDB
【首发渠道链接】其他平台首发请附上对应链接
【目录】
【正文】

故障描述

测试环境中开启 cdc 备份到 s3 存储,出现下面问题:

  • cdc 同步点停留在 2021-07-26 11:48:20.649 +0800 CST
  • 因为 cdc 的原因集群 gc safe point 卡在 20210726-11:48:20 +0800

cdc 日志片段 1:

[2021/07/29 02:38:29.845 +08:00] [INFO] [utils.go:131] ["start Flush asynchronously to storage"] ["table id"=3310] [size=340386481] ["event count"=201962]
[2021/07/29 02:38:41.904 +08:00] [INFO] [s3.go:155] ["[FlushRowChangedEvents] complete file"] [tableID=3310]
[2021/07/29 02:38:41.976 +08:00] [INFO] [utils.go:131] ["start Flush asynchronously to storage"] ["table id"=3310] [size=2157261890] ["event count"=1280000]

cdc 日志片段 二:

[2021/07/29 02:51:50.410 +08:00] [ERROR] [processor.go:1291] ["error on running processor"] [capture=10.95.249.200:8300] [changefeed=1622788388883] [processor=51a0e857-509d-499f-816f-e5293c765aee] [error="[CDC:ErrS3SinkStorageAPI]RequestCanceled: request context canceled\ncaused by: context canceled"] [errorVerbose="[CDC:ErrS3SinkStorageAPI]RequestCanceled: request context canceled\ncaused by: context canceled\ngithub.com/pingcap/errors.AddStack\n\tgithub.com/pingcap/errors@v0.11.5-0.20201126102027-b0a155152ca3/errors.go:174\ngithub.com/pingcap/errors.(*Error).GenWithStackByCause\n\tgithub.com/pingcap/errors@v0.11.5-0.20201126102027-b0a155152ca3/normalize.go:279\ngithub.com/pingcap/ticdc/pkg/errors.WrapError\n\tgithub.com/pingcap/ticdc/pkg/errors/helper.go:28\ngithub.com/pingcap/ticdc/cdc/sink/cdclog.(*tableBuffer).flush\n\tgithub.com/pingcap/ticdc/cdc/sink/cdclog/s3.go:145\ngithub.com/pingcap/ticdc/cdc/sink/cdclog.(*logSink).startFlush.func2\n\tgithub.com/pingcap/ticdc/cdc/sink/cdclog/utils.go:136\ngolang.org/x/sync/errgroup.(*Group).Go.func1\n\tgolang.org/x/sync@v0.0.0-20200625203802-6e8e738ad208/errgroup/errgroup.go:57\nruntime.goexit\n\truntime/asm_amd64.s:1373"]
[2021/07/29 02:51:50.410 +08:00] [WARN] [processor.go:1310] ["upload processor error failed"] [changefeed=1622788388883] [error="[CDC:ErrPDEtcdAPIError]context canceled"] [errorVerbose="[CDC:ErrPDEtcdAPIError]context canceled\ngithub.com/pingcap/errors.AddStack\n\tgithub.com/pingcap/errors@v0.11.5-0.20201126102027-b0a155152ca3/errors.go:174\ngithub.com/pingcap/errors.(*Error).GenWithStackByCause\n\tgithub.com/pingcap/errors@v0.11.5-0.20201126102027-b0a155152ca3/normalize.go:279\ngithub.com/pingcap/ticdc/pkg/errors.WrapError\n\tgithub.com/pingcap/ticdc/pkg/errors/helper.go:28\ngithub.com/pingcap/ticdc/cdc/kv.CDCEtcdClient.PutTaskPositionOnChange\n\tgithub.com/pingcap/ticdc/cdc/kv/etcd.go:706\ngithub.com/pingcap/ticdc/cdc.runProcessor.func1\n\tgithub.com/pingcap/ticdc/cdc/processor.go:1308\nruntime.goexit\n\truntime/asm_amd64.s:1373"]

集群当时执行的操作

这个集群目前还在做开发测试,规模已经接近十个 tikv 节点,整体吞吐量已经不小。

业务开发在 2021-07-26 11:48:20 对一个有 50G 大小的表进行主键更改:

  1. 导出表数据到文件
  2. drop 旧表
  3. 建新表,导入旧表数据
  4. 新表创建、开始导入数据时间恰好是 2021-07-26 11:48:04

故障分析

由于集群有一定的规模,导入数据的吞吐还是挺大的。

cdc 把 tikv 的变更同步到 s3 是对产生的 event 批量进行的,每次同步 flush 操作会有两条日志对应开始和结束:

[2021/07/29 02:38:29.845 +08:00] [INFO] [utils.go:131] ["start Flush asynchronously to storage"] ["table id"=3310] [size=340386481] ["event count"=201962]
[2021/07/29 02:38:41.904 +08:00] [INFO] [s3.go:155] ["[FlushRowChangedEvents] complete file"] [tableID=3310]

但是在 cdc 日志片段 1 中,最后一个 start Flush 对应的 complete 一直没有找到。

[2021/07/29 02:38:41.976 +08:00] [INFO] [utils.go:131] ["start Flush asynchronously to storage"] ["table id"=3310] [size=2157261890] ["event count"=1280000]

这条 flush 对应的 event 个数是 128 万个,数据大小 2157261890,为 2G+,所以会不会是这里同步因为数据量太大导致的问题。

在 cdc 日志片段 二 报错 “context canceled”,看报错位置为 s3.go:145,打开源码文件:

            if len(rowDatas) > 0 { 
                    ...

                    err := hashPart.uploader.UploadPart(ctx, rowDatas)
                    if err != nil {
                            return cerror.WrapError(cerror.ErrS3SinkStorageAPI, err)
                    }   

                    ...
            }   

代码对应的地方就是往 s3 upload 数据的位置,所以基本可以判断是批量数据太大导致。

cdc 同步原理

  1. 从 tikv 中获取的 rowChangeEvent 经过排序后会插入 processor.output 这个 channel 中, chanel 大小 1280000;
  2. 另外 processor 每隔1秒,会发送一个 OpTypeResolved 类型的 event 到 processor.output
  3. processor 实时地从 processor.output 获取 event 插入到 dataCh 这个 channel 中,大小也为 1280000,若获取到的 event 为 **OpTypeResolved **类型,processor 将会把 dataCh 中的 event 刷新到 s3 上
  4. processor 同时定时 500 * time.Millisecond 去检查 dataCh 大小,如果满足 5M 也会刷新

上面 3 和 4 是 cdc 到 s3 的刷新操作,第 3 步是为了把 1s 内的数据变更进行 batch,第 4 步是为了应对变更太多时候,加快刷新;每次刷新的数据放进 dataCh,默认值 1280000 个 event

下面代码分别对应 3 和 4 中触发的 flush 操作。

	case needFlushedUnits := <-l.notifyChan:
		...
		for _, u := range needFlushedUnits {
			...
			eg.Go(func() error {
				log.Info("start Flush asynchronously to storage by caller",
					zap.Int64("table id", uReplica.TableID()),
					zap.Int64("size", uReplica.Size().Load()),
					zap.Int64("event count", uReplica.Events().Load()),
				)
				return uReplica.flush(ectx, l)
			})
		}
		...

	case <-ticker.C:
		...
		for _, u := range l.units {
			...
			if u.shouldFlush() {
				eg.Go(func() error {
					log.Info("start Flush asynchronously to storage",
						zap.Int64("table id", uReplica.TableID()),
						zap.Int64("size", uReplica.Size().Load()),
						zap.Int64("event count", uReplica.Events().Load()),
					)
					return uReplica.flush(ectx, l)
				})
			}
		}

对于第 4 步中,触发 flush 的条件,可以看出 dataCh 大于 5M 才会刷新。并没有考虑 dataCh 满,按道理 dataCh 满了那也要刷新,清空 dataCh 才对。

const	maxPartFlushSize    = 5 << 20   // The minimal multipart upload size is 5Mb.

func (tb *tableBuffer) shouldFlush() bool {
	return tb.sendSize.Load() > maxPartFlushSize
}

问题修复

尝试把 dataCh 改小,改为 20480,编译运行,发现问题修复了,修改后看同步情况:

[2021/07/29 21:32:01.133 +08:00] [INFO] [utils.go:131] ["start Flush asynchronously to storage"] ["table id"=3310] [size=34518876] ["event count"=20480]
[2021/07/29 21:32:02.325 +08:00] [INFO] [s3.go:155] ["[FlushRowChangedEvents] complete file"] [tableID=3310]

可以看到每次 flush 20480 个 event,每次 flush 总大小 34518876 byte,大约 33MB,还是比较合适的。

这次故障的原因是源码中 dataCh 过大,而当时集群吞吐大、或者因为历史 tikv 变更的堆积,导致把 dataCh 塞满,一次 batch flush 时候,数据太大导致了 “context canceled” 问题。

完善修复

上面 cdc 原理中,processor.output 默认 128000,而上面修复 dataCh 为 20480,那么容易造成 processor.output 中 event 个数远远多于 dataCh。

而间隔 1s 时间 processor 触发刷新,写 OpTypeResolved 类型 event 到 processor.output, 然后从 processor.output 流入 dataCh 进行 bath 触发刷新操作。

如果 tikv 变更多在 1s 内产生大量的 event,就容易把 dataCh 塞满,而 processor 用于唤醒刷新的 OpTypeResolved 类型 event 还停留在 processor.output 中,阻塞在 dataCh 外面。

这时候只能靠第 4 步,定时的 shouldFlush 刷新了,但是 shouldFlush 函数是有漏洞的。并没有考虑 dataCh 满,如果 dataCh 又不满 5M,那就永远触发不了刷新了。

达到满 5M 条件,那么单条 event 要满足 5M / 20480 = 256byte,对于字段少、记录小的表,很容易达不到。

所以这里完善一下 shouldFlush 函数,如果 dataCh 已经填满,那么直接触发刷新到 s3。

func (tb *tableBuffer) shouldFlush() bool {
	return tb.size() >= default_chan_size_20480 || tb.sendSize.Load() > maxPartFlushSize
}
4赞

前排学习~

这个问题修复了么?cdc是否支持同步数据到s3?

已经修复,绝对支持

1赞