CDC同步数据到KAFKA大量空value数据

CDC同步数据到KAFKA问题

【tidb版本】:v4.0.0-rc.1

【tiup版本】:升级为最新的0.6.2版本

【cdc版本】:wget https://tiup-mirrors.pingcap.com/cdc-v4.0.0-rc-linux-amd64.tar.gz 下载 cdc 包
通过 tiup cluster patch zj_tidb cdc-v4.0.0-rc-linux-amd64.tar.gz -R cdc 的方式升级 cdc

【tidb集群信息】

【问题1】:创建CDC任务至kafka集群,如何在sink-uri配置多个broker地址(192.168.150.38和192.168.150.39),sink-uri="kafka://192.168.150.38:9092/cdc-scs-topic?

【问题2】:创建任务根据配置文件cdc-config.toml
/cdc cli changefeed create --pd=http://192.168.150.173:2379 --start-ts=0 --sink-uri=“kafka://192.168.150.38:9092/cdc-scs-topic” --config=/apps/tidb/cdc/cdc-config.toml成功,
除了正常的表数据变更事件,通过kafka消费发现大量的空value数据,且key有乱码,cdc-config.toml见附件,消费消息见下图,是否是配置有误造成?
cdc-config.toml (435 字节)


稍等,我们查看下

1.按照官方文档t=3:Resolved Event:代表一个特殊的时间点,表示在这个时间点前的收到的 Event 是完整的,目前我测试变更的数据量只有十几条,但是收到这类消息却很大。


2.正常接受到的Row Changed Event 事件 key和value都是前面

  1. 配置多个broker 不支持
  2. 这个文档你可以可以再看下,稍后会补充回答您,多谢。

https://pingcap.com/docs-cn/stable/ticdc/ticdc-open-protocol/

您好:
1、Resolved Event 是周期性向 kafka 输出的,和 DML 数量无关
2、您图中的乱码是Message 格式中的 version 版本和 key len,具体请参见:

文档中关于 Message 格式的定义:

另外,这种通过配置文件创建changefeed的方式,这个功能处于没有公开的状态,后续配置文件格式可能发生变动,请您注意

1.目前的乱码(key和value)怎么处理,我暂时通过消费kafka消息里卖弄截取{,开始解析;
2.目前4.0.0-rc1已经上生产,马上需要上线CDC功能同步数据到kafka,一部分业务从kafka消费数据写到elasticsearch,关于配置文件方式会持续关注,有变动我们这边会及时跟进。

如同文档所述,目前 open protocol 中 msg key 的格式为:

  • 第一个 8 字节:协议版本号(64位大端序无符号整数,固定为 1)
  • 后一个 8 字节:下一个 event key 的长度 (64位大端序无符号整数)
  • 后 n 个字节:event key (JSON 格式字符串)
  • 后一个 8 字节:下一个 event key 的长度 (64位大端序无符号整数)
  • 后 n 个字节:event key (JSON 格式字符串)

目前 open protocol 中msg value 的格式为:

  • 第一个 8 字节:下一个 event value 的长度 (64位大端序无符号整数)
  • 后 n 个字节:event value (JSON 格式字符串)
  • 后一个 8 字节:下一个 event value 的长度 (64位大端序无符号整数)
  • 后 n 个字节:event value (JSON 格式字符串)

您提到的乱码,即是 msg key 中的前两个8字节。

因此,对于 msg key,您需要:

  1. 截取第一个 8 字节,对比是否为常量 1
  2. 然后截取下一个 8 字节,按照大端序解析为 uint64, 记该值为 L
  3. 截取后 L 个字节,这 L 个字节即为 JSON 格式的 event key
  4. 循环步骤 2、3 直到 msg key 末尾。

对于 msg value 是类似的,只是没有步骤 1。

目前 Open Protocol 格式是稳定的,配置文件格式暂不稳定