【 TiDB 使用环境】测试环境 kubernetes环境 opertator为1.5.4版本
【 TiDB 版本】企业版7.1.0
【遇到的问题:问题现象及影响】
创建ticdc同步任务到kafka集群,scram安全认证报错,报错任务如下:
Error: [CDC:ErrKafkaNewProducer]new kafka producer: [CDC:ErrReachMaxTry]reach maximum try: 3, error: Cluster authorization failed.: Cluster authorization failed.
创建同步任务命令如下:
sh-5.1# ./cdc cli changefeed create --pd http://basic-pd:2379 --sink-uri=“kafka://xxx:9092,xxx:9092,xxx:9092/ticdc-test?kafka-version=2.8.1&partition-num=3&max-message-bytes=102400&replication-factor=3&protocol=canal-json&sasl-user=ticdc_test&sasl-password=xxx&sasl-mechanism=scram-sha-256” --changefeed-id=“test”
经测试程序验证,可以确认ticdc_test这个账号用于读写、describe cluster、describe resource权限。
测试程序:
func scram() {
// 创建 Sarama 配置
config := sarama.NewConfig()
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
return &XDGscramClient{
HashGeneratorFcn: SHA256,
}
}
config.Net.SASL.User = "ticdc_test"
config.Net.SASL.Password = "xxx"
config.Version = sarama.V2_8_1_0
// 创建管理员客户端
brokers := strings.Split(host, ",")
admin, err := sarama.NewClusterAdmin(brokers, config)
if err != nil {
fmt.Println("Error creating admin client:", err)
os.Exit(0)
}
fmt.Println("create admin client success")
defer func() {
if err = admin.Close(); err != nil {
fmt.Println(fmt.Sprintf("Error closing admin client: %v", err))
os.Exit(0)
}
}()
brokerList, controllerId, err := admin.DescribeCluster()
if err != nil {
panic(fmt.Sprintf("describe cluster with err: %+v", err))
}
fmt.Println("controllerId =========", controllerId)
for _, b := range brokerList {
fmt.Println(fmt.Sprintf("<<<<<<<<< %+v", b))
}
resource := sarama.ConfigResource{
Type: sarama.TopicResource,
Name: topic,
}
configEntries, err := admin.DescribeConfig(resource)
if err != nil {
fmt.Println(fmt.Sprintf("Error describing config: %v", err))
os.Exit(0)
}
fmt.Println("describe config success")
for index, en := range configEntries {
fmt.Println(fmt.Sprintf("%d >>>>> %+v", index, en))
}
}