tidb cdc 同步数据异常

【 TiDB 使用环境】测试
【 TiDB 版本】 v7.5.1
【复现路径】多条数据合并到一起
【遇到的问题:问题现象及影响】

建表语句:

CREATE TABLE ods.`ods_test` (
  `id` int(11) NOT NULL,
  `user_id` int(11) DEFAULT NULL COMMENT '用户id',
  `user_name` varchar(48) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '',
  `mysql_delete_type` int(11) NOT NULL DEFAULT '0' COMMENT 'mysql 数据类型',
  PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='';

创建cdc:

case-sensitive = true

[filter]
rules = ['!*.*', 'ods.*']

[mounter]
worker-num = 16
[sink]
dispatchers = [
     {matcher = ['*.*'], dispatcher = "table"}
         ]
protocol = "maxwell"

执行SQL:

INSERT INTO ods.ods_test
(id, user_id, user_name,  mysql_delete_type)
VALUES(1, 10749, '111', 0);

kafka 显示数据:

{
  "database": "ods",
  "table": "ods_test",
  "type": "insert",
  "ts": 1719833732,
  "data": {
    "id": 1,
    "mysql_delete_type": 0,
    "user_id": 10749,
    "user_name": "111"
  }
}

再执行SQL:

INSERT INTO ods.ods_test
(id, user_id, user_name,  mysql_delete_type)
VALUES(4, 10596, '555', 0);
INSERT INTO ods.ods_test
(id, user_id, user_name,  mysql_delete_type)
VALUES(5, 10749, '666', 0);
INSERT INTO ods.ods_test
(id, user_id, user_name,  mysql_delete_type)
VALUES(6, 10596, '777', 0);
INSERT INTO ods.ods_test
(id, user_id, user_name,  mysql_delete_type)
VALUES(7, 10749, '888', 0);
INSERT INTO ods.ods_test
(id, user_id, user_name,  mysql_delete_type)
VALUES(8, 10596, '999', 0);
INSERT INTO ods.ods_test
(id, user_id, user_name,  mysql_delete_type)
VALUES(9, 10749, '000', 0);
INSERT INTO ods.ods_test
(id, user_id, user_name,  mysql_delete_type)
VALUES(10, 10596, '111', 0);

kafka 展示数据:

第一条:

{
  "database": "ods",
  "table": "ods_test",
  "type": "insert",
  "ts": 1719833912,
  "data": {
    "id": 4,
    "mysql_delete_type": 0,
    "user_id": 10596,
    "user_name": "555"
  }
}

第二条:

{"database":"ods","table":"ods_test","type":"insert","ts":1719833912,"data":{"id":5,"mysql_delete_type":0,"user_id":10749,"user_name":"666"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719833912,"data":{"id":6,"mysql_delete_type":0,"user_id":10596,"user_name":"777"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719833912,"data":{"id":7,"mysql_delete_type":0,"user_id":10749,"user_name":"888"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719833912,"data":{"id":8,"mysql_delete_type":0,"user_id":10596,"user_name":"999"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719833912,"data":{"id":9,"mysql_delete_type":0,"user_id":10749,"user_name":"000"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719833912,"data":{"id":10,"mysql_delete_type":0,"user_id":10596,"user_name":"111"}}

更改:dispatch =“ts” ,清空表数据,
执行SQL:

INSERT INTO ods.ods_test
(id, user_id, user_name,  mysql_delete_type)
VALUES(2, 10596, '222', 0),
(3, 10749, '333', 0),
(4, 10596, '555', 0);

kafka 数据 (这里还是合并到一起了):

{"database":"ods","table":"ods_test","type":"insert","ts":1719884437,"data":{"id":2,"mysql_delete_type":0,"user_id":10596,"user_name":"222"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719884437,"data":{"id":3,"mysql_delete_type":0,"user_id":10749,"user_name":"333"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719884437,"data":{"id":4,"mysql_delete_type":0,"user_id":10596,"user_name":"555"}}

更改配置文件: max-batch-size=1 无效

目前数据要求,下游解析JSON,所以不要多条合并

目前测试了dispatcher = “table” 后多条合并
dispatch =“rowid” 报错: Error: [CDC:ErrDispatcherFailed]index not found when verify the table, table: ods.ods_consult_qy_wx_user, index:

dispatch =“ts” 多条合并情况消失

执行多条写入:
INSERT INTO ods.ods_test
(id, user_id, user_name, mysql_delete_type)
VALUES(2, 10596, ‘222’, 0),
(3, 10749, ‘333’, 0),
(4, 10596, ‘555’, 0);

数据依旧合并到了一起

{“database”:“ods”,“table”:“ods_test”,“type”:“insert”,“ts”:1719884437,“data”:{“id”:2,“mysql_delete_type”:0,“user_id”:10596,“user_name”:“222”}}
{“database”:“ods”,“table”:“ods_test”,“type”:“insert”,“ts”:1719884437,“data”:{“id”:3,“mysql_delete_type”:0,“user_id”:10749,“user_name”:“333”}}
{“database”:“ods”,“table”:“ods_test”,“type”:“insert”,“ts”:1719884437,“data”:{“id”:4,“mysql_delete_type”:0,“user_id”:10596,“user_name”:“555”}}

这不是三条数据么?你的意思是,这三条数据在同一个消息中?

是的,kafka 里面一条数据

# 1. 按 Region 的数量分配,即每个 CDC 节点处理 region 的个数基本相等。当某个表 Region 个数大于 `region-threshold` 值时,会将表分配到多个节点处理。`region-threshold` 默认值为 10000。
# region-threshold = 10000
# 2. 按写入的流量分配,即每个 CDC 节点处理 region 总修改行数基本相当。只有当表中每分钟修改行数超过 `write-key-threshold` 值时,该表才会生效。
# write-key-threshold = 30000
# Pulsar Producer 发送消息时的单个 batch 内的消息数量上限,默认值为 1000。
batching-max-messages=1000

这些参数试试看有木有用,理论上合并能提高增量同步速率,一条条处理太慢了

这个参数下游kafka 不生效
以下参数仅在下游为 Pulsar 时生效。
下游kafka 报错:
Error: component TiCDC changefeed’s config file /Cluster-data/ticdc/changefeed_ods.toml contained unknown configuration options: batching-max-message

我看你的json消息如果是多个event,都是\n分割的。
代码是否可以尝试\n分割之后再尝试解析?

那个\n 是我自己加上去的,方便查看,实际没有这个换行符

还有就是,既然是cdc 应该和主流cdc 一致,不然,生态不兼容

1719905377424

现有的ticdc的输出协议里面都看不到maxwell了。
你要不试试simple协议。
我看格式类似,就是多了一些字段。

https://docs.pingcap.com/zh/tidb/stable/ticdc-simple-protocol

首先,这个之前是有支持的,开发的下游需要maxwell 格式支持。另外,文档中有说明支持的:


https://docs.pingcap.com/zh/tidb/v7.5/ticdc-sink-to-kafka#sink-uri-配置-kafka

1 个赞

可以理解。那我也没有什么更好的办法了。

非常抱歉,TiCDC 的 maxwell 协议支持目前处于废弃状态,我们会尽快从文档上删掉。建议用其他明确支持的格式,参考 https://docs.pingcap.com/zh/tidb/stable/ticdc-avro-protocol