如何在TiCDC同步数据到Kafka时,根据表中租户ID字段的值将数据路由到Topic的不同分区?

如何在TiCDC同步数据到Kafka时,根据表中租户ID字段的值将数据路由到Topic的不同分区,并确保同一租户的数据严格有序?

【TiDB 使用环境】测试环境
【TiDB 版本】v8.5.3
【部署方式】云上部署
【操作系统/CPU 架构/芯片详情】
【机器部署详情】CPU大小/内存大小/磁盘大小
【集群数据量】
【集群节点数】
【问题复现路径】
数据库中每个表 均有 tenant_id 字段,且该字段创建了索引,使用 canal-json 协议, Partition 分发器 5中分发器 测试均不满足需求,同一个事务多个表数据会 分发到 kafka的不同分区,以下是 测试时创建 changefeeds的核心配置:

   
            "dispatchers": [
                {
                    "matcher": [
                        "*.*"
                    ],
                    "partition": "index-value",
                    "index":"idx_tenant_id"
                    
                }
            ]
        

          "dispatchers": [
                {
                    "matcher": [
                        "*.*"
                    ],
                    "partition": "columns",
                    "columns":["tenant_id"]
                    
                }
            ]



  • Kafka 有序性基础:Kafka 仅保证单个分区内的数据严格有序,跨分区无法保证全局有序,因此实现同一租户数据有序的核心是:将同一租户 ID 的所有数据路由到同一个 Kafka 分区
  • TiCDC 分区路由核心:通过配置 TiCDC Changefeed,将「租户 ID 字段」作为 Kafka 消息的Key,并指定按Key哈希分区,实现同一租户 ID 映射到同一 Kafka 分区。

使用 columns 分发器 + tenant_id 字段路由,配合 max_in_flight_requests_per_connection: 1 确保顺序

{
“changefeed_id”: “tidb-to-kafka-tenant-routing”,
“sink_uri”: “kafka://kafka-cluster:9092/tidb_changestream?partition-num=5&version=2.0&max-message-bytes=1048576”,
“protocol”: “canal-json”,
“table_filter”: [
.” // 同步所有表(可按需求过滤,如 “db1.", "db2.t_”)
],
“dispatchers”: [
{
“matcher”: [“.”], // 匹配所有表(支持精细化匹配,如 “db1.t_user”, “db2.*”)
“partition”: “columns”, // 基于字段路由分区
“columns”: [“tenant_id”], // 路由字段:tenant_id(所有表必须含该字段)
“hash”: “crc32” // 哈希算法(默认 crc32,可选 murmur3,效果一致)
}
],
“sync_point”: {
“enable”: true,
“interval”: 300 // 定期保存同步 checkpoint,避免重启后重放全量数据
},
“sink”: {
“kafka”: {
“compression”: “snappy”, // 压缩算法,提升传输效率
“acks”: “1”, // 消息确认机制(根据 Kafka 集群稳定性调整,1 为默认)
“max_in_flight_requests_per_connection”: 1 // 限制单连接并发请求数,避免乱序(关键!)
}
}
}

kafka的同步问题最好还得看兼容性方面的指导

最新的测试及结果, 看现象 columns 类型 partition 分发器 并没有生效,以下是测试配置:

{
    "sink_uri": "kafka://xxx/tidb_binlog_topic?protocol=canal-json&kafka-version=2.8.2&max-message-bytes=67108864&replication-factor=3",
    "replica_config": {
        "filter": {
            "rules": [
                "*.*"
            ]
           
        },
         "ignore_ineligible_table": true,
        "sink": {
            "dispatchers": [
                {
                    "matcher": [
                        "*.*",
                        "!*.global_sequence"
                    ],
                    "partition": "columns",
                    "columns":["global_seq"]
                    
                }
            ]
        }
    }
}
对应的kafka topic  tidb_binlog_topic 有 12个分区  。
TiDB 中测试脚本:
begin;
update  orders set global_seq =8888 where id < 2;
update products set global_seq=8888 where id < 2;
commit ;


kafka 消息 被路由到了两个分区: