版本: v4.0.8
项目背景:
部分mysql 业务需要迁移到tidb 集群.之前用canal消费的mysql binlog 现在需要用ticdc的 组件写入到kafka 中,希望得到跟canal 相同的数据格式,再写入到下游大数据系统中。
配置文件部分如下:
global:
user: tidb
ssh_port: 22
ssh_type: builtin
deploy_dir: /data/tidb-deploy
data_dir: /data/tidb-data
os: linux
arch: amd64
server_configs:
tidb:
binlog.enable: true
binlog.ignore-error: true
log.slow-threshold: 700
tikv:
readpool.coprocessor.use-unified-pool: true
readpool.storage.use-unified-pool: false
pd:
replication.enable-placement-rules: true
replication.location-labels:
- host
tiflash:
logger.level: info
tiflash-learner: {}
pump: {}
drainer: {}
cdc: {}
cdc_servers:
- host: 172.30.135.19
ssh_port: 22
port: 8300
deploy_dir: /data/tidb-deploy/cdc-8300
log_dir: /data/tidb-deploy/cdc-8300/log
arch: amd64
os: linux
+++++++++++++++++++++++++++++++++++++++++++++++++++++++
kafka-task 任务相关配置:
tiup ctl cdc changefeed create --pd=http://172.30.135.140:2379
–sink-uri=“kafka://10.33.131.216:9092/dba_test?partition-num=1&max-message-bytes=67108864&replication-factor=1” --changefeed-id=“kafka-task” --config=feedkafka.toml
[root@db-mysql-135-140 .tiup]# more feedkafka.toml |grep -v “#” |grep -v ^$
case-sensitive = true
enable-old-value = true
[filter]
ignore-txn-start-ts = [1, 2]
rules = [‘ysc_test.', '!test.’]
[mounter]
worker-num = 16
[sink]
protocol = “canal”
[cyclic-replication]
enable = false
replica-id = 1
filter-replica-ids = [2,3]
sync-ddl = true
++++++++++++++++++++++++++++
cdc 实例状态为:
[root@db-mysql-135-140 .tiup]# tiup ctl cdc changefeed list --pd=http://172.30.135.140:2379
Starting component ctl
: /root/.tiup/components/ctl/v4.0.8/ctl cdc changefeed list --pd=http://172.30.135.140:2379
[
{
“id”: “kafka-task”,
“summary”: {
“state”: “normal”,
“tso”: 421554747986935810,
“checkpoint”: “2020-12-16 15:29:04.457”,
“error”: null
}
},
{
“id”: “mysql-task”,
“summary”: {
“state”: “normal”,
“tso”: 421554747986935810,
“checkpoint”: “2020-12-16 15:29:04.457”,
“error”: null
}
}
]
++++++++
[root@db-mysql-135-140 .tiup]# tiup ctl cdc changefeed query --pd=http://172.30.135.140:2379 --changefeed-id=kafka-task
Starting component ctl
: /root/.tiup/components/ctl/v4.0.8/ctl cdc changefeed query --pd=http://172.30.135.140:2379 --changefeed-id=kafka-task
{
“info”: {
“sink-uri”: “kafka://10.33.131.216:9092/dba_test1?partition-num=1\u0026max-message-bytes=67108864\u0026replication-factor=1”,
“opts”: {},
“create-time”: “2020-12-14T21:34:26.893820819+08:00”,
“start-ts”: 421515196330934274,
“target-ts”: 0,
“admin-job-type”: 2,
“sort-engine”: “memory”,
“sort-dir”: “.”,
“config”: {
“case-sensitive”: true,
“enable-old-value”: true,
“force-replicate”: false,
“filter”: {
“rules”: [
“ysc_test.",
"!test.”
],
“ignore-txn-start-ts”: [
1,
2
],
“ddl-allow-list”: null
},
“mounter”: {
“worker-num”: 16
},
“sink”: {
“dispatchers”: null,
“protocol”: “canal”
},
“cyclic-replication”: {
“enable”: false,
“replica-id”: 1,
“filter-replica-ids”: [
2,
3
],
“id-buckets”: 0,
“sync-ddl”: true
},
“scheduler”: {
“type”: “table-number”,
“polling-time”: -1
}
},
“state”: “normal”,
“history”: [
1608023250906
],
“error”: null,
“sync-point-enabled”: false,
“sync-point-interval”: 600000000000
},
“status”: {
“resolved-ts”: 421554755851255811,
“checkpoint-ts”: 421554755851255809,
“admin-job-type”: 0
},
“count”: 0,
“task-status”: [
{
“capture-id”: “a003bf4a-146c-4ddb-8237-ebd6835d5094”,
“status”: {
“tables”: {
“47”: {
“start-ts”: 421554374575915012,
“mark-table-id”: 0
},
“51”: {
“start-ts”: 421554374575915012,
“mark-table-id”: 0
},
“55”: {
“start-ts”: 421554374575915012,
“mark-table-id”: 0
}
},
“operation”: null,
“admin-job-type”: 0
}
}
]
}
++++++
cdc 日志输出为:
./kafka-console-consumer.sh --bootstrap-server 10.33.131.216:9092 --topic dba_test1
kafka 消费到的数据为乱码,并不像下面帖子中那样我们需要的格式
https://asktug.com/t/topic/63715