【 TiDB 使用环境】测试
【 TiDB 版本】 v7.5.1
【复现路径】多条数据合并到一起
【遇到的问题:问题现象及影响】
建表语句:
CREATE TABLE ods.`ods_test` (
`id` int(11) NOT NULL,
`user_id` int(11) DEFAULT NULL COMMENT '用户id',
`user_name` varchar(48) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '',
`mysql_delete_type` int(11) NOT NULL DEFAULT '0' COMMENT 'mysql 数据类型',
PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='';
创建cdc:
case-sensitive = true
[filter]
rules = ['!*.*', 'ods.*']
[mounter]
worker-num = 16
[sink]
dispatchers = [
{matcher = ['*.*'], dispatcher = "table"}
]
protocol = "maxwell"
执行SQL:
INSERT INTO ods.ods_test
(id, user_id, user_name, mysql_delete_type)
VALUES(1, 10749, '111', 0);
kafka 显示数据:
{
"database": "ods",
"table": "ods_test",
"type": "insert",
"ts": 1719833732,
"data": {
"id": 1,
"mysql_delete_type": 0,
"user_id": 10749,
"user_name": "111"
}
}
再执行SQL:
INSERT INTO ods.ods_test
(id, user_id, user_name, mysql_delete_type)
VALUES(4, 10596, '555', 0);
INSERT INTO ods.ods_test
(id, user_id, user_name, mysql_delete_type)
VALUES(5, 10749, '666', 0);
INSERT INTO ods.ods_test
(id, user_id, user_name, mysql_delete_type)
VALUES(6, 10596, '777', 0);
INSERT INTO ods.ods_test
(id, user_id, user_name, mysql_delete_type)
VALUES(7, 10749, '888', 0);
INSERT INTO ods.ods_test
(id, user_id, user_name, mysql_delete_type)
VALUES(8, 10596, '999', 0);
INSERT INTO ods.ods_test
(id, user_id, user_name, mysql_delete_type)
VALUES(9, 10749, '000', 0);
INSERT INTO ods.ods_test
(id, user_id, user_name, mysql_delete_type)
VALUES(10, 10596, '111', 0);
kafka 展示数据:
第一条:
{
"database": "ods",
"table": "ods_test",
"type": "insert",
"ts": 1719833912,
"data": {
"id": 4,
"mysql_delete_type": 0,
"user_id": 10596,
"user_name": "555"
}
}
第二条:
{"database":"ods","table":"ods_test","type":"insert","ts":1719833912,"data":{"id":5,"mysql_delete_type":0,"user_id":10749,"user_name":"666"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719833912,"data":{"id":6,"mysql_delete_type":0,"user_id":10596,"user_name":"777"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719833912,"data":{"id":7,"mysql_delete_type":0,"user_id":10749,"user_name":"888"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719833912,"data":{"id":8,"mysql_delete_type":0,"user_id":10596,"user_name":"999"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719833912,"data":{"id":9,"mysql_delete_type":0,"user_id":10749,"user_name":"000"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719833912,"data":{"id":10,"mysql_delete_type":0,"user_id":10596,"user_name":"111"}}
更改:dispatch =“ts” ,清空表数据,
执行SQL:
INSERT INTO ods.ods_test
(id, user_id, user_name, mysql_delete_type)
VALUES(2, 10596, '222', 0),
(3, 10749, '333', 0),
(4, 10596, '555', 0);
kafka 数据 (这里还是合并到一起了):
{"database":"ods","table":"ods_test","type":"insert","ts":1719884437,"data":{"id":2,"mysql_delete_type":0,"user_id":10596,"user_name":"222"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719884437,"data":{"id":3,"mysql_delete_type":0,"user_id":10749,"user_name":"333"}}
{"database":"ods","table":"ods_test","type":"insert","ts":1719884437,"data":{"id":4,"mysql_delete_type":0,"user_id":10596,"user_name":"555"}}