ticdc通过scram认证连接kafka集群报错

【 TiDB 使用环境】测试环境 kubernetes环境 opertator为1.5.4版本
【 TiDB 版本】企业版7.1.0

【遇到的问题:问题现象及影响】
创建ticdc同步任务到kafka集群,scram安全认证报错,报错任务如下:
Error: [CDC:ErrKafkaNewProducer]new kafka producer: [CDC:ErrReachMaxTry]reach maximum try: 3, error: Cluster authorization failed.: Cluster authorization failed.

创建同步任务命令如下:
sh-5.1# ./cdc cli changefeed create --pd http://basic-pd:2379 --sink-uri=“kafka://xxx:9092,xxx:9092,xxx:9092/ticdc-test?kafka-version=2.8.1&partition-num=3&max-message-bytes=102400&replication-factor=3&protocol=canal-json&sasl-user=ticdc_test&sasl-password=xxx&sasl-mechanism=scram-sha-256” --changefeed-id=“test”

经测试程序验证,可以确认ticdc_test这个账号用于读写、describe cluster、describe resource权限。

测试程序:

func scram() {
// 创建 Sarama 配置
config := sarama.NewConfig()
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGscramClient{
HashGeneratorFcn: SHA256,
}
}

config.Net.SASL.User = "ticdc_test"
config.Net.SASL.Password = "xxx"
config.Version = sarama.V2_8_1_0

// 创建管理员客户端
brokers := strings.Split(host, ",")
admin, err := sarama.NewClusterAdmin(brokers, config)
if err != nil {
	fmt.Println("Error creating admin client:", err)
	os.Exit(0)
}
fmt.Println("create admin client success")
defer func() {
	if err = admin.Close(); err != nil {
		fmt.Println(fmt.Sprintf("Error closing admin client: %v", err))
		os.Exit(0)
	}
}()

brokerList, controllerId, err := admin.DescribeCluster()
if err != nil {
	panic(fmt.Sprintf("describe cluster with err: %+v", err))
}

fmt.Println("controllerId =========", controllerId)
for _, b := range brokerList {
	fmt.Println(fmt.Sprintf("<<<<<<<<< %+v", b))
}

resource := sarama.ConfigResource{
	Type: sarama.TopicResource,
	Name: topic,
}
configEntries, err := admin.DescribeConfig(resource)
if err != nil {
	fmt.Println(fmt.Sprintf("Error describing config: %v", err))
	os.Exit(0)
}
fmt.Println("describe config success")

for index, en := range configEntries {

	fmt.Println(fmt.Sprintf("%d >>>>> %+v", index, en))

}

}

密码里面有没有什么特殊符号没有转义啊。

没有特殊字符,都是大小写字母和数字

kafka 日志里面信息看看呢。

kafka用的云产品,看日志不是怎么太方便。我感觉应该跟kafka没啥关系,目前我看了下ticdc的源码,新版本的ticdc不存在这个问题,

我测试了两个版本一个是最新的master分支,一个是最旧的0220分支,目前都没有这个问题,那么问题来了

1、企业版7.1.0版本的ticdc源码在哪看用的哪个分支发的包

主要是看看日志内容里面应该有验证失败原因。

看起来你现在发现 cdc 版本高一点就好了?那可能是版本功能支持的一些问题?:thinking:
看源码你点击 tags,里面有对应版本。

tag里的版本号对不上

低版本看这个,换地方了。

ok,谢谢

不是 这么神奇的么?

你要不试试集群升级下,你 7.1.0 第三位版本有点低了。

我真对多个版本进行了ticdc测试,测试结果如下

plain模式测试命令:
pd=“http://basic-pd:2379

./cdc cli changefeed create --pd $pd --sink-uri=“kafka://xxx:9092,xxx:9092,xxx:9092/ticdc-test?kafka-version=2.8.2&partition-num=3&max-message-bytes=102400&replication-factor=3&protocol=canal-json&sasl-user=admin&sasl-password=xxx&sasl-mechanism=plain” --changefeed-id=“test”

scram-sha-256模式测试命令:

pd=“http://basic-pd:2379

./cdc cli changefeed create --pd $pd --sink-uri=“kafka://xxx:9092,xxx:9092,xxx:9092/ticdc-test?kafka-version=2.8.1&partition-num=3&max-message-bytes=102400&replication-factor=3&protocol=canal-json&sasl-user=ticdc_test&sasl-password=xxx&sasl-mechanism=SCRAM-SHA-256” --changefeed-id=“test”

7.5 支持么?官网怎么写的这部分。

是不是官网写的有问题,支持范围没写啊。