ticdc同步到kafka中断报错

【 TiDB 使用环境】生产环境
【 TiDB 版本】v4.0.14
【复现路径】用ticdc同步到kafka命令:
tiup ctl:v4.0.14 cdc changefeed create --pd=http://10.20.10.10:2379 --sink-uri=“kafka://kafka1.com:9092,kafka2.com:9092,kafka3.com:9092/topic1?kafka-version=2.7.0&sasl-user=user&sasl-password=password&sasl-mechanism=SCRAM-SHA-256&partition-num=5&max-message-bytes=1048576&replication-factor=1&protocol=canal-json” --changefeed-id=“changefeed-djg” --config=/home/tidb/changefeed-djg.toml
【遇到的问题:问题现象及影响】
使用tiup ctl:v4.0.14 cdc changefeed list --pd=http://10.20.10.10:2379查看:
{
“id”: “changefeed-djg”,
“summary”: {
“state”: “stopped”,
“tso”: 442620606907154444,
“checkpoint”: “2023-07-04 17:40:18.878”,
“error”: {
“addr”: “10.20.24.243:8300”,
“code”: “CDC:ErrProcessorUnknown”,
“message”: “[CDC:ErrOperateOnClosedNotifier]operate on a closed notifier”
}
}
}
【资源配置】cdc.log报错日志:
[2023/07/04 17:40:19.537 +08:00] [ERROR] [processor.go:1382] [“error on running processor”] [capture=10.20.26.249:8300] [changefeed=changefeed-djg] [processor=d4b0285b-c533-4bbb-9dab-beb551144f6a] [error=“[CDC:ErrOperateOnClosedNotifier]operate on a closed notifier”] [errorVerbose=“[CDC:ErrOperateOnClosedNotifier]operate on a closed notifier\ngithub.com/pingcap/errors.AddStack\n\tgithub.com/pingcap/errors@v0.11.5-0.20201126102027-b0a155152ca3/errors.go:174\ngithub.com/pingcap/errors.(*Error).GenWithStackByArgs\n\tgithub.com/pingcap/errors@v0.11.5-0.20201126102027-b0a155152ca3/normalize.go:156\ngithub.com/pingcap/ticdc/pkg/notify.(*Notifier).NewReceiver\n\tgithub.com/pingcap/ticdc@/pkg/notify/notify.go:87\ngithub.com/pingcap/ticdc/cdc.(*oldProcessor).sorterConsume\n\tgithub.com/pingcap/ticdc@/cdc/processor.go:1111\ngithub.com/pingcap/ticdc/cdc.(*oldProcessor).addTable.func2.5\n\tgithub.com/pingcap/ticdc@/cdc/processor.go:868\nruntime.goexit\n\truntime/asm_amd64.s:1357”]
[2023/07/04 17:40:19.537 +08:00] [ERROR] [processor.go:1420] [“processor receives redundant error”] [error=“[CDC:ErrPDEtcdAPIError]context canceled”] [errorVerbose=“[CDC:ErrPDEtcdAPIError]context canceled\ngithub.com/pingcap/errors.AddStack\n\tgithub.com/pingcap/errors@v0.11.5-0.20201126102027-b0a155152ca3/errors.go:174\ngithub.com/pingcap/errors.(*Error).GenWithStackByCause\n\tgithub.com/pingcap/errors@v0.11.5-0.20201126102027-b0a155152ca3/normalize.go:279\ngithub.com/pingcap/ticdc/pkg/errors.WrapError\n\tgithub.com/pingcap/ticdc@/pkg/errors/helper.go:28\ngithub.com/pingcap/ticdc/cdc/kv.CDCEtcdClient.PutTaskWorkload\n\tgithub.com/pingcap/ticdc@/cdc/kv/etcd.go:563\ngithub.com/pingcap/ticdc/cdc.(*oldProcessor).workloadWorker\n\tgithub.com/pingcap/ticdc@/cdc/processor.go:464\ngithub.com/pingcap/ticdc/cdc.(*oldProcessor).Run.func6\n\tgithub.com/pingcap/ticdc@/cdc/processor.go:279\ngolang.org/x/sync/errgroup.(*Group).Go.func1\n\tgolang.org/x/sync@v0.0.0-20201020160332-67f06af15bc9/errgroup/errgroup.go:57\nruntime.goexit\n\truntime/asm_amd64.s:1357”]

请问什么原因造成的,如何解决

参考这个issue,可能是已知bug,建议升级ticdc版本

tiup ctl:v6.1.3 cdc changefeed create --pd=http://10.20.10.10:2379 --sink-uri=“kafka://kafka1.com:9092,kafka2.com:9092,kafka3.com:9092/topic1?kafka-version=2.7.0&sasl-user=user&sasl-password=password&sasl-mechanism=SCRAM-SHA-256&partition-num=5&max-message-bytes=1048576&replication-factor=1&protocol=canal-json” --changefeed-id=“changefeed-djg” --config=/home/tidb/changefeed-djg.toml
使用6.1.3后报错信息如下:
[2023/07/06 16:02:55.652 +08:00] [WARN] [sink.go:167] [“protocol is specified in both sink URI and config filethe value in sink URI will be usedprotocol in sink URI:canal-json, protocol in config file:canal-json”]
Error: [CDC:ErrKafkaNewSaramaProducer]new sarama producer: Cluster authorization failed.
Usage:
cdc cli changefeed create [flags]

Flags:
-c, --changefeed-id string Replication task (changefeed) ID
–config string Path of the configuration file
–cyclic-filter-replica-ids uints (Experimental) Cyclic replication filter replica ID of changefeed (default [])
–cyclic-replica-id uint (Experimental) Cyclic replication replica ID of changefeed
–cyclic-sync-ddl (Experimental) Cyclic replication sync DDL of changefeed (default true)
–disable-gc-check Disable GC safe point check
-h, --help help for create
–no-confirm Don’t ask user whether to ignore ineligible table
–opts key=value Extra options, in the key=value format
–schema-registry string Avro Schema Registry URI
–sink-uri string sink uri
–sort-engine string sort engine used for data sort (default “unified”)
–start-ts uint Start ts of changefeed
–sync-interval duration (Experimental) Set the interval for syncpoint in replication(default 10min) (default 10m0s)
–sync-point (Experimental) Set and Record syncpoint in replication(default off)
–target-ts uint Target ts of changefeed
–tz string timezone used when checking sink uri (changefeed timezone is determined by cdc server) (default “SYSTEM”)

Global Flags:
–ca string CA certificate path for TLS connection
–cert string Certificate path for TLS connection
-i, --interact Run cdc cli with readline
–key string Private key path for TLS connection
–log-level string log level (etc: debug|info|warn|error) (default “warn”)
–pd string PD address, use ‘,’ to separate multiple PDs (default “http://127.0.0.1:2379”)

[CDC:ErrKafkaNewSaramaProducer]new sarama producer: Cluster authorization failed.
Error: exit status 1

认证的问题,看看sasl证书相关配置

请问如何把kafka以下配置加到ticdc命令行中配置
properties.put(“security.protocol”,“SASL_PLAINTEXT”);

properties.put(“sasl.mechanism”,“SCRAM-SHA-256”);

properties.put(“sasl.jaas.config”,“org.apache.kafka.common.security.scram.ScramLoginModule required username=‘账户’ password=‘密码’;”);

此话题已在最后回复的 60 天后被自动关闭。不再允许新回复。