TICDC到kafka报错

tiup ctl:v6.1.0 cdc changefeed create --pd=http://slave007:2379
–sink-uri=“kafka://slave002:9092,slave003:9092,slave004:9092,slave005:9092,slave006:9092/tidb-example-t1?kafka-version=2.4.1&replication-factor=3”
–changefeed-id=“simple-kafka-task”
–sort-engine=“unified”
–config …/config/cdc/tidb_example_kafka.toml
后报错:
[2022/08/18 10:40:47.537 +08:00] [WARN] [event_router.go:236] [“This index-value distribution mode does not guarantee row-level orderliness when switching on the old value, so please use caution!”]
Error: [CDC:ErrKafkaTopicExprInvalid]invalid topic expression
另外,我把kafka的ip写成一个后还是报这样的错。

CDC版本V6.1.0
tidb_example_kafka.toml文件配置如下:
case-sensitive = true

enable-old-value = true

[filter]
ignore-txn-start-ts = [1, 2]

rules = [‘tidb_example.*’]

[mounter]
worker-num = 16

[sink]
dispatchers = [
{matcher = [‘tidb_example.*’], topic = “tidb_example_{table}”, partition = “index-value” }
]

protocol = “canal-json”

kafka 的版本是多少

2.4.1

手动的创建完topic 试试

还是一样报错

以下是敲了创建任务命令后的所有打印信息:
Starting component ctl: /root/.tiup/components/ctl/v6.1.0/ctl cdc changefeed create --pd=http://slave007:2379 --sink-uri=kafka://slave002:9092/tidb-example-t?kafka-version=2.4.1&replication-factor=5 --changefeed-id=simple-kafka-task --sort-engine=unified --config …/config/cdc/tidb_example_kafka.toml
[WARN] This index-value distribution mode does not guarantee row-level orderliness when switching on the old value, so please use caution! dispatch-rules: &config.DispatchRule{Matcher:[]string{“tidb_example.*”}, DispatcherRule:"", PartitionRule:“index-value”, TopicRule:“tidbexample{table}”}[2022/08/18 11:29:45.311 +08:00] [WARN] [kafka.go:442] [“topic already exist, TiCDC will not create the topic”] [topic=tidb-example-t] [detail="{“NumPartitions”:3,“ReplicationFactor”:5,“ReplicaAssignment”:{“0”:[3,5,1,4,2],“1”:[5,1,4,2,3],“2”:[1,4,2,3,5]},“ConfigEntries”:{“compression.type”:“producer”,“flush.messages”:“10000”,“flush.ms”:“1000”,“index.interval.bytes”:“4096”,“max.message.bytes”:“10485760”,“min.insync.replicas”:“2”,“segment.bytes”:“1073741824”,“segment.index.bytes”:“10485760”}}"]
[2022/08/18 11:29:45.388 +08:00] [WARN] [event_router.go:236] [“This index-value distribution mode does not guarantee row-level orderliness when switching on the old value, so please use caution!”]
Error: [CDC:ErrKafkaTopicExprInvalid]invalid topic expression
Usage:
cdc cli changefeed create [flags]

Flags:
-c, --changefeed-id string Replication task (changefeed) ID
–config string Path of the configuration file
–cyclic-filter-replica-ids uints (Experimental) Cyclic replication filter replica ID of changefeed (default [])
–cyclic-replica-id uint (Experimental) Cyclic replication replica ID of changefeed
–cyclic-sync-ddl (Experimental) Cyclic replication sync DDL of changefeed (default true)
–disable-gc-check Disable GC safe point check
-h, --help help for create
–no-confirm Don’t ask user whether to ignore ineligible table
–opts key=value Extra options, in the key=value format
–schema-registry string Avro Schema Registry URI
–sink-uri string sink uri
–sort-engine string sort engine used for data sort (default “unified”)
–start-ts uint Start ts of changefeed
–sync-interval duration (Experimental) Set the interval for syncpoint in replication(default 10min) (default 10m0s)
–sync-point (Experimental) Set and Record syncpoint in replication(default off)
–target-ts uint Target ts of changefeed
–tz string timezone used when checking sink uri (changefeed timezone is determined by cdc server) (default “SYSTEM”)

Global Flags:
–ca string CA certificate path for TLS connection
–cert string Certificate path for TLS connection
-i, --interact Run cdc cli with readline
–key string Private key path for TLS connection
–log-level string log level (etc: debug|info|warn|error) (default “warn”)
–pd string PD address, use ‘,’ to separate multiple PDs (default “http://127.0.0.1:2379”)

[CDC:ErrKafkaTopicExprInvalid]invalid topic expression
Error: exit status 1

应该是这块的问题 你表名中有topic 不支持的字符

use tidb_example;
show tables;
Persons

就一张Persons命名的表

我刚才改成:
dispatchers = [
{matcher = [‘tidb_example.*’], topic = “cnsbdnc”, partition = “index-value” }
]
后,还是一样报错

换一个topic名称重新创建下,别的不用改,重新创建下就可以

创建topic时还会检测是否包含“.”或“”字符。为什么要检测这两个字符呢? 因为在 Kafka 的内部做埋点时会根据主题的名称来命名 metrics 的名称,并且会将点号“.”改成下画线 “"。假设遇到一个名称为“topic.1_2’’的主题,还有一个名称为“topic_1.2” 的主题,那么最后的 metrics 的名称都会为“topic_1_2”,这样就发生了名称冲突。举例如下, 首先创建一个以“topic.1_2”为名称的主题,提示 WARNING 警告, 之后再创建“topic.1_2” 时发生 InvalidTopicException 异常。

1 个赞

还是一样
[root@slave005 bin]# tiup ctl:v6.1.0 cdc changefeed create --pd=http://slave007:2379 --sink-uri=“kafka://slave002:9092/abcd?kafka-version=2.4.1&replication-factor=5” --changefeed-id=“simple-kafka-task” --sort-engine=“unified” --config …/config/cdc/tidb_example_kafka.toml
Starting component ctl: /root/.tiup/components/ctl/v6.1.0/ctl cdc changefeed create --pd=http://slave007:2379 --sink-uri=kafka://slave002:9092/abcd?kafka-version=2.4.1&replication-factor=5 --changefeed-id=simple-kafka-task --sort-engine=unified --config …/config/cdc/tidb_example_kafka.toml
[WARN] This index-value distribution mode does not guarantee row-level orderliness when switching on the old value, so please use caution! dispatch-rules: &config.DispatchRule{Matcher:[]string{“tidb_example.*”}, DispatcherRule:“”, PartitionRule:“index-value”, TopicRule:“abcd”}[2022/08/18 14:05:18.375 +08:00] [WARN] [kafka.go:442] [“topic already exist, TiCDC will not create the topic”] [topic=abcd] [detail=“{"NumPartitions":3,"ReplicationFactor":5,"ReplicaAssignment":{"0":[1,3,4,5,2],"1":[2,4,5,1,3],"2":[3,5,1,2,4]},"ConfigEntries":{"compression.type":"producer","flush.messages":"10000","flush.ms":"1000","index.interval.bytes":"4096","max.message.bytes":"10485760","min.insync.replicas":"2","segment.bytes":"1073741824","segment.index.bytes":"10485760"}}”]
[2022/08/18 14:05:18.409 +08:00] [WARN] [event_router.go:236] [“This index-value distribution mode does not guarantee row-level orderliness when switching on the old value, so please use caution!”]
Error: [CDC:ErrKafkaTopicExprInvalid]invalid topic expression
Usage:
cdc cli changefeed create [flags]

Flags:
-c, --changefeed-id string Replication task (changefeed) ID
–config string Path of the configuration file
–cyclic-filter-replica-ids uints (Experimental) Cyclic replication filter replica ID of changefeed (default [])
–cyclic-replica-id uint (Experimental) Cyclic replication replica ID of changefeed
–cyclic-sync-ddl (Experimental) Cyclic replication sync DDL of changefeed (default true)
–disable-gc-check Disable GC safe point check
-h, --help help for create
–no-confirm Don’t ask user whether to ignore ineligible table
–opts key=value Extra options, in the key=value format
–schema-registry string Avro Schema Registry URI
–sink-uri string sink uri
–sort-engine string sort engine used for data sort (default “unified”)
–start-ts uint Start ts of changefeed
–sync-interval duration (Experimental) Set the interval for syncpoint in replication(default 10min) (default 10m0s)
–sync-point (Experimental) Set and Record syncpoint in replication(default off)
–target-ts uint Target ts of changefeed
–tz string timezone used when checking sink uri (changefeed timezone is determined by cdc server) (default “SYSTEM”)

Global Flags:
–ca string CA certificate path for TLS connection
–cert string Certificate path for TLS connection
-i, --interact Run cdc cli with readline
–key string Private key path for TLS connection
–log-level string log level (etc: debug|info|warn|error) (default “warn”)
–pd string PD address, use ‘,’ to separate multiple PDs (default “http://127.0.0.1:2379”)

[CDC:ErrKafkaTopicExprInvalid]invalid topic expression
Error: exit status 1

1660802838(1)

用这个内容直接在kafka建立topic,不通过cdc,看看kafka的报错是什么,kafka日志内容是什么

我上面的测试,就是已经在kafka上创建了该topic后进行的,并没有报错

[root@slave002 kafka]# bin/kafka-topics.sh --create --topic abcd --zookeeper slave002:2181,slave003:2181,slave004:2181 --partitions 3 --replication-factor 5
Created topic abcd.
[root@slave002 kafka]#

大概知道了,好像是分发规则这块做的不对
[sink]
dispatchers = [
{matcher = [‘tidb_example.*’], topic = “{schema}_{table}”, partition = “index-value” }
]
这么写试下

可以看下这个

1 个赞

问题是,我刚才的测试已经将topic改为常量abcd,不是用变量

可以了,我将
1660805365(1)
改为:


然后创建同步任务,居然成功了。
所以这是为啥呢?

[root@slave005 bin]# tiup ctl:v6.1.0 cdc changefeed create --pd=http://slave007:2379 --sink-uri=“kafka://slave002:9092/abcd?kafka-version=2.4.1&replication-factor=5” --changefeed-id=“simple-kafka-task” --sort-engine=“unified” --config …/config/cdc/tidb_example_kafka.toml
Create changefeed successfully!
ID: simple-kafka-task

https://docs.pingcap.com/zh/tidb/v6.5/ticdc-sink-to-kafka

可以看下官方文档
{schema}是必选项,你这里没有写,你写的确定的库名他会识别为前缀
ps:如果解决了问题,辛苦点点对我有用,方便别人查找

1 个赞

已点赞,并对勾最佳答案。
另外创建成功后有个警告,需要管它吗?
[WARN] This index-value distribution mode does not guarantee row-level orderliness when switching on the old value, so please use caution!

我的partition = “index-value”,enable-old-value = true

这个看对你业务有没有影响,cdc同步有个enable-old-value参数,他只是提示下这个开启了,在index-value的模式下可能不保证顺序,如果这一点对业务没有影响可以不管,要不然可以关闭old-value或者换表级别分发等方式来处理
这种 index-value 分布模式在旧值开启时不保证行级有序,请谨慎使用!

1 个赞

明白了,关于index-value会在分布式模式下带来的顺序问题,麻烦移步到我昨天提出的issue里,我在那里提出了建议,麻烦看下tidb在后期是否可以进行这方面的改进。