tidb 7.1.0 使用cdc配置同步任务到下游kafka已经存在的topic时,报错 Error: [CDC:ErrKafkaTopicExprInvalid]invalid topic expression

【 TiDB 使用环境】生产环境
【 TiDB 版本】7.1.0
【复现路径】
【遇到的问题:问题现象及影响】

kafak 版本2.2.0 ,已经存在如下topic

canal-product-item
canal-product-itemSnapshot

配置cdc任务,配置文件如下

case-sensitive = true
enable-old-value = true
[filter]
rules = ['product_c1.*']
[mounter]
worker-num = 16
[sink]
dispatchers = [
    { matcher = ["product_c1.item_*"], topic = "canal-product-item", partition = "table"},
    { matcher = ["product_c1.itemSnapshot_*"], topic = "canal-product-itemSnapshot", partition = "table"}
]
protocol = "canal-json"

执行cdc创建任务命令如下

tiup cdc:v7.1.0 cli changefeed create --pd=http://192.168.3.94:2379 --sink-uri="kafka://biz-kafka-01:9092,biz-kafka-02:9092,biz-kafka-03:9092,biz-kafka-04:9092,biz-kafka-05:9092/ticdc-product_c1?kafka-version=2.2.0&partition-num=12&max-message-bytes=8388608&replication-factor=3" --changefeed-id="product-c1" --config ./cdc-product_c1-v2.yml

命令行报错信息 Error: [CDC:ErrKafkaTopicExprInvalid]invalid topic expression

在cdc服务中看到有错误如下

[2024/09/04 14:56:39.728 +08:00] [INFO] [middleware.go:48] [/api/v2/changefeeds] [status=500] [method=POST] [path=/api/v2/changefeeds] [query=] [ip=192.168.3.5] [user-agent=Go-http-client/1.1] [client-version=v7.1.0] [error="[CDC:ErrKafkaTopicExprInvalid]invalid topic expression"] [errorVerbose="[CDC:ErrKafkaTopicExprInvalid]invalid topic expression\ngithub.com/pingcap/errors.AddStack\n\tgithub.com/pingcap/errors@v0.11.5-0.20221009092201-b66cddb77c32/errors.go:174\ngithub.com/pingcap/errors.(*Error).GenWithStackByArgs\n\tgithub.com/pingcap/errors@v0.11.5-0.20221009092201-b66cddb77c32/normalize.go:164\ngithub.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher/topic.Expression.Validate\n\tgithub.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher/topic/expression.go:54\ngithub.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher.getTopicDispatcher\n\tgithub.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher/event_router.go:275\ngithub.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher.NewEventRouter\n\tgithub.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dispatcher/event_router.go:106\ngithub.com/pingcap/tiflow/cdc/sink/dmlsink/mq.NewKafkaDMLSink\n\tgithub.com/pingcap/tiflow/cdc/sink/dmlsink/mq/kafka_dml_sink.go:104\ngithub.com/pingcap/tiflow/cdc/sink/dmlsink/factory.New\n\tgithub.com/pingcap/tiflow/cdc/sink/dmlsink/factory/factory.go:75\ngithub.com/pingcap/tiflow/cdc/sink/validator.Validate\n\tgithub.com/pingcap/tiflow/cdc/sink/validator/validator.go:51\ngithub.com/pingcap/tiflow/cdc/api/v2.APIV2HelpersImpl.verifyCreateChangefeedConfig\n\tgithub.com/pingcap/tiflow/cdc/api/v2/api_helpers.go:256\ngithub.com/pingcap/tiflow/cdc/api/v2.(*OpenAPIV2).createChangefeed\n\tgithub.com/pingcap/tiflow/cdc/api/v2/changefeed.go:95\ngithub.com/gin-gonic/gin.(*Context).Next\n\tgithub.com/gin-gonic/gin@v1.8.1/context.go:173\ngithub.com/pingcap/tiflow/cdc/api/middleware.ForwardToOwnerMiddleware.func1\n\tgithub.com/pingcap/tiflow/cdc/api/middleware/middleware.go:95\ngithub.com/gin-gonic/gin.(*Context).Next\n\tgithub.com/gin-gonic/gin@v1.8.1/context.go:173\ngithub.com/pingcap/tiflow/cdc/api/middleware.ErrorHandleMiddleware.func1\n\tgithub.com/pingcap/tiflow/cdc/api/middleware/middleware.go:64\ngithub.com/gin-gonic/gin.(*Context).Next\n\tgithub.com/gin-gonic/gin@v1.8.1/context.go:173\ngithub.com/pingcap/tiflow/cdc/api/middleware.LogMiddleware.func1\n\tgithub.com/pingcap/tiflow/cdc/api/middleware/middleware.go:38\ngithub.com/gin-gonic/gin.(*Context).Next\n\tgithub.com/gin-gonic/gin@v1.8.1/context.go:173\ngithub.com/pingcap/tiflow/cdc/api/middleware.CheckServerReadyMiddleware.func1\n\tgithub.com/pingcap/tiflow/cdc/api/middleware/middleware.go:103\ngithub.com/gin-gonic/gin.(*Context).Next\n\tgithub.com/gin-gonic/gin@v1.8.1/context.go:173\ngithub.com/gin-gonic/gin.CustomRecoveryWithWriter.func1\n\tgithub.com/gin-gonic/gin@v1.8.1/recovery.go:101\ngithub.com/gin-gonic/gin.(*Context).Next\n\tgithub.com/gin-gonic/gin@v1.8.1/context.go:173\ngithub.com/gin-gonic/gin.(*Engine).handleHTTPRequest\n\tgithub.com/gin-gonic/gin@v1.8.1/gin.go:616\ngithub.com/gin-gonic/gin.(*Engine).ServeHTTP\n\tgithub.com/gin-gonic/gin@v1.8.1/gin.go:572\nnet/http.serverHandler.ServeHTTP\n\tnet/http/server.go:2936\nnet/http.(*conn).serve\n\tnet/http/server.go:1995\nruntime.goexit\n\truntime/asm_amd64.s:1598"] [duration=968.261861ms]
[2024/09/04 14:56:39.728 +08:00] [INFO] [factory.go:216] ["Close kafka async producer success"] [namespace=] [changefeed=] [duration=1.053782ms]

在 8.1.0 下没有该问题,如果使用 { matcher = [“product_c1.itemSnapshot_*”], topic = “canal-{schema}-itemSnapshot”, partition = “table”} 也没有问题,但是使用动态 {schema} 不满足当前需求

请问大家又遇到该问题么,如何解决?

参考这个问题的解决方式

分析之后发现是不支持 topic 硬编码,
在 7.1.4的release文章看到这是个bug tidb 7.1.0 使用cdc配置同步任务到下游kafka已经存在的topic时,报错 Error: [CDC:ErrKafkaTopicExprInvalid]invalid topic expression

升级到7.1.4 已验证可以topic进行硬编码