1、创建kafka topic:
/data/kafka/kafka_2.12-2.4.1/bin/kafka-topics.sh --create --zookeeper XXX:2181 --replication-factor 3 --partitions 3 --config min.insync.replicas=2 --topic test3
2、查看topic属性:
/data/kafka/kafka_2.12-2.4.1/bin/kafka-topics.sh --describe --bootstrap-server XXX:9092 --topic test3
Topic: test3 PartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824
Topic: test3 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: test3 Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: test3 Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
3、创建cdc任务:
/usr/bin/cdc cli changefeed create --pd=XXX --start-ts=436935319064936449 --sink-uri=“kafka://XXX:9092/test3?message.max.bytes=2147483648?partition-num=3?replication-factor=3” --changefeed-id=“testcdc0” --config=/home/tidb/testcdc_yaml/testcdc0_testcdc_t0.yaml
4、配置文件:
case-sensitive = true
enable-old-value = true
[filter]
rules = [
“testcdc0.testcdc_t0”
]
[mounter]
worker-num = 8
[sink]
dispatchers = [
{matcher = [
“testcdc0.testcdc_t0”
], dispatcher = “rowid”},
]
protocol = “canal-json”
[cyclic-replication]
enable = false
replica-id = 1
5、创建任务返回:
Create changefeed successfully!
ID: testcdc0
Info: {“sink-uri”:“kafka://XXX:9092/test3?message.max.bytes=2147483648?partition-num=3?replication-factor=3”,“opts”:{},“create-time”:“2022-10-26T17:19:45.759358278+08:00”,“start-ts”:436935319064936449,“target-ts”:0,“admin-job-type”:0,“sort-engine”:“unified”,“sort-dir”:“”,“config”:{“case-sensitive”:true,“enable-old-value”:true,“force-replicate”:false,“check-gc-safe-point”:true,“filter”:{“rules”:[“testcdc0.testcdc_t0”],“ignore-txn-start-ts”:null},“mounter”:{“worker-num”:8},“sink”:{“dispatchers”:[{“matcher”:[“testcdc0.testcdc_t0”],“dispatcher”:“rowid”}],“protocol”:“canal-json”},“cyclic-replication”:{“enable”:false,“replica-id”:1,“filter-replica-ids”:null,“id-buckets”:0,“sync-ddl”:false},“scheduler”:{“type”:“table-number”,“polling-time”:-1}},“state”:“normal”,“history”:null,“error”:null,“sync-point-enabled”:false,“sync-point-interval”:600000000000,“creator-version”:“v4.0.16”}
6、任务错误提示:
cdc cli changefeed query -s --pd=http://XXX:2379 --changefeed-id=testcdc0
{
“state”: “error”,
“tso”: 436935319064936449,
“checkpoint”: “2022-10-26 17:19:26.892”,
“error”: {
“addr”: “172.16.72.22:8300”,
“code”: “CDC:ErrKafkaNewSaramaProducer”,
“message”: “[CDC:ErrKafkaNewSaramaProducer]new sarama producer: [CDC:ErrKafkaInvalidConfig]because TiCDC Kafka producer’s request.required.acks
defaults to -1, TiCDC cannot deliver messages when the replication-factor
is less than min.insync.replicas
: replication-factor
cannot be smaller than the min.insync.replicas
of topic”
}
}
7、日志报错:
[2022/10/26 17:20:16.193 +08:00] [ERROR] [kafka.go:571] [“replication-factor
cannot be smaller than the min.insync.replicas
of topic”] [replicationFactor=1] [minInsyncReplicas=2]
[2022/10/26 17:20:16.581 +08:00] [ERROR] [changefeed.go:119] [“an error occurred in Owner”] [changefeed=testcdc0] [error=“[CDC:ErrKafkaNewSaramaProducer]new sarama producer: [CDC:ErrKafkaInvalidConfig]because TiCDC Kafka producer’s request.required.acks
defaults to -1, TiCDC cannot deliver messages when the replication-factor
is less than min.insync.replicas
: replication-factor
cannot be smaller than the min.insync.replicas
of topic”]
我发现日志报错里有这个提示: [replicationFactor=1] [minInsyncReplicas=2],但明明topic的replicationFactor设置为了3。
应该出错的原因就在这里,不知道为何创建的replicationFactor为3,但cdc却认为replicationFactor=1。
这也就解释通了当min.insync.replicas为2,replication-factor设为2的时候,cdc任务也会报错,而当min.insync.replicas为1,replication-factor设为1的时候,cdc任务却正常了。
估计是因为cdc始终认为replicationFactor的值为1。