ticdc读取不到数据内容,只读到了基本元数据

【 TiDB 使用环境】
6.0.0 kafka: 3.2.0

【概述】 场景 + 问题概述
ticdc获取变更内容到下游kafka中, 但是没有数据内容,只获取到了元数据,例:
{“timestamp”:1656502486040,“name”:“records_consumed”,“count”:2,“partitions”:[{“topic”:“tidb-to-kafka-test”,“partition”:0,“count”:2,“minOffset”:10,“maxOffset”:11}]}

【背景】 做过哪些操作
添加了一个config配置文件,里面只有image sync-ddl=true 属性, 试了也还是不行

【现象】 业务和数据库现象
表中有主键, 有数据

【TiDB 版本】
6.0.0

麻烦您贴一下创建 changefeed 的命令

tiup cdc cli changefeed create --pd=http://192.168.60.4:2379 --sink-uri=“kafka://192.168.60.207:9092/tidb-to-kafka-test5?protocol=canal-json&kafka-version=3.2.0&partition-num=1&max-message-bytes=67108864&replication-factor=1&enable-tidb-extension=true” --changefeed-id=“tidb-to-kafka-test1” --sort-engine=“unified”

使用 kafka 自带的 ./kafka-console-producer.sh 跟 ./kafka-console-consumer.sh 可以正常生产以及消费数据吗?

可以在上游 TiDB 上简单创建一张表,然后向这张表插入几行数据,贴一下 TiCDC 的日志

cdc日志, 没有错误日志

[2022/06/30 11:11:21.046 +08:00] [INFO] [ddl_puller.go:148] [“receive new ddl job”] [changefeed=tidb-to-kafka-test11_ddl_puller] [job=“ID:97, Type:create table, State:done, SchemaState:public, SchemaID:90, TableID:96, RowCount:0, ArgLen:0, start time: 2022-06-30 11:11:19.131 +0800 CST, Err:, ErrCount:0, SnapshotVersion:0”]
[2022/06/30 11:11:21.046 +08:00] [INFO] [schema_storage.go:847] [“handle DDL”] [DDL=“CREATE TABLE testcdc.test333 (\r\ id int NOT NULL,\r\ name varchar(255) NULL,\r\ PRIMARY KEY (id)\r\ )”] [job=“ID:97, Type:create table, State:done, SchemaState:public, SchemaID:90, TableID:96, RowCount:0, ArgLen:0, start time: 2022-06-30 11:11:19.131 +0800 CST, Err:, ErrCount:0, SnapshotVersion:0”] [changefeed=tidb-to-kafka-test11] [finishTs=434256918394961922]
[2022/06/30 11:11:21.046 +08:00] [WARN] [ddl_puller.go:144] [“ignore duplicated DDL job”] [changefeed=tidb-to-kafka-test11_ddl_puller] [job=“ID:97, Type:create table, State:done, SchemaState:public, SchemaID:90, TableID:96, RowCount:0, ArgLen:0, start time: 2022-06-30 11:11:19.131 +0800 CST, Err:, ErrCount:0, SnapshotVersion:0”]
[2022/06/30 11:11:21.046 +08:00] [INFO] [ddl_puller.go:135] [“ddl job is nil after unmarshal”] [changefeed=tidb-to-kafka-test11_ddl_puller]
[2022/06/30 11:11:21.046 +08:00] [INFO] [ddl_puller.go:135] [“ddl job is nil after unmarshal”] [changefeed=tidb-to-kafka-test11_ddl_puller]
[2022/06/30 11:11:21.046 +08:00] [INFO] [schema_storage.go:832] [“ignore foregone DDL”] [jobID=97] [DDL=“CREATE TABLE testcdc.test333 (\r\ id int NOT NULL,\r\ name varchar(255) NULL,\r\ PRIMARY KEY (id)\r\ )”] [changefeed=tidb-to-kafka-test11] [finishTs=434256918394961922]
[2022/06/30 11:11:22.901 +08:00] [INFO] [schema.go:124] [“handle DDL”] [changefeed=tidb-to-kafka-test11] [DDL=“CREATE TABLE testcdc.test333 (\r\ id int NOT NULL,\r\ name varchar(255) NULL,\r\ PRIMARY KEY (id)\r\ )”] [job=“ID:97, Type:create table, State:done, SchemaState:public, SchemaID:90, TableID:96, RowCount:0, ArgLen:0, start time: 2022-06-30 11:11:19.131 +0800 CST, Err:, ErrCount:0, SnapshotVersion:0”] [role=owner]
[2022/06/30 11:11:22.902 +08:00] [INFO] [ddl_sink.go:235] [“ddl is sent”] [changefeed=tidb-to-kafka-test11] [ddlSentTs=434256918394961922]
[2022/06/30 11:11:22.902 +08:00] [INFO] [scheduler.go:144] [“schedulerV2: DispatchTable”] [message="{“owner-rev”:811337,“epoch”:“031790eb-949a-4502-a602-18f64801c4b8”,“id”:96,“is-delete”:false}"] [successful=true] [changefeedID=tidb-to-kafka-test11] [captureID=b290c64b-489c-4bfa-8a4f-677fb879e33d]
[2022/06/30 11:11:22.902 +08:00] [INFO] [ddl_sink.go:179] [“begin emit ddl event”] [changefeed=tidb-to-kafka-test11] [DDL="{“StartTs”:434256918382116873,“CommitTs”:434256918394961922,“TableInfo”:{“Schema”:“testcdc”,“Table”:“test333”,“TableID”:96,“ColumnInfo”:[{“Name”:“id”,“Type”:3},{“Name”:“name”,“Type”:15}]},“PreTableInfo”:null,“Query”:“CREATE TABLE testcdc.test333 (id INT NOT NULL,name VARCHAR(255) NULL,PRIMARY KEY(id))”,“Type”:3}"]
[2022/06/30 11:11:22.902 +08:00] [INFO] [agent.go:389] [OnOwnerDispatchedTask] [changefeed=tidb-to-kafka-test11] [ownerCaptureID=b290c64b-489c-4bfa-8a4f-677fb879e33d] [ownerRev=811337] [op="{“TableID”:96,“IsDelete”:false,“Epoch”:“031790eb-949a-4502-a602-18f64801c4b8”,“FromOwnerID”:“b290c64b-489c-4bfa-8a4f-677fb879e33d”}"]
[2022/06/30 11:11:22.907 +08:00] [INFO] [ddl_sink.go:187] [“Execute DDL succeeded”] [changefeed=tidb-to-kafka-test11] [ignored=false] [ddl="{“StartTs”:434256918382116873,“CommitTs”:434256918394961922,“TableInfo”:{“Schema”:“testcdc”,“Table”:“test333”,“TableID”:96,“ColumnInfo”:[{“Name”:“id”,“Type”:3},{“Name”:“name”,“Type”:15}]},“PreTableInfo”:null,“Query”:“CREATE TABLE testcdc.test333 (id INT NOT NULL,name VARCHAR(255) NULL,PRIMARY KEY(id))”,“Type”:3}"]
[2022/06/30 11:11:22.997 +08:00] [INFO] [agent.go:295] [“Agent start processing operation”] [changefeed=tidb-to-kafka-test11] [op="{“TableID”:96,“IsDelete”:false,“Epoch”:“031790eb-949a-4502-a602-18f64801c4b8”,“FromOwnerID”:“b290c64b-489c-4bfa-8a4f-677fb879e33d”}"]
[2022/06/30 11:11:22.997 +08:00] [INFO] [processor.go:109] [“adding table”] [tableID=96] [changefeed=tidb-to-kafka-test11]
[2022/06/30 11:11:22.998 +08:00] [INFO] [processor.go:1015] [“Add table pipeline”] [tableID=96] [changefeed=tidb-to-kafka-test11] [name=testcdc.test333] [replicaInfo="{“start-ts”:434256918394961922,“mark-table-id”:0}"] [globalResolvedTs=434256918394961922]
[2022/06/30 11:11:22.998 +08:00] [INFO] [client.go:512] [“event feed started”] [span="[7480000000000000ff605f720000000000fa, 7480000000000000ff605f730000000000fa)"] [startTs=434256918394961922] [changefeed=tidb-to-kafka-test11]
[2022/06/30 11:11:22.999 +08:00] [INFO] [region_range_lock.go:222] [“range locked”] [changefeed=tidb-to-kafka-test11] [lockID=105] [regionID=11005] [version=82] [startKey=7480000000000000ff605f720000000000fa] [endKey=7480000000000000ff605f730000000000fa] [checkpointTs=434256918394961922]
[2022/06/30 11:11:22.999 +08:00] [INFO] [client.go:755] [“creating new stream to store to send request”] [changefeed=tidb-to-kafka-test11] [regionID=11005] [requestID=230] [storeID=5] [addr=192.168.60.4:20161]
[2022/06/30 11:11:22.999 +08:00] [INFO] [puller.go:217] [“puller is initialized”] [changefeed=tidb-to-kafka-test11] [duration=1.31316ms] [tableID=96] [spans="["[7480000000000000ff605f720000000000fa, 7480000000000000ff605f730000000000fa)"]"] [resolvedTs=434256918394961922]
[2022/06/30 11:11:23.000 +08:00] [INFO] [client.go:801] [“start new request”] [changefeed=tidb-to-kafka-test11] [request="{“header”:{“cluster_id”:7103783547263542403,“ticdc_version”:“6.0.0”},“region_id”:11005,“region_epoch”:{“conf_ver”:5,“version”:82},“checkpoint_ts”:434256918394961922,“start_key”:“dIAAAAAAAAD/YF9yAAAAAAD6”,“end_key”:“dIAAAAAAAAD/YF9zAAAAAAD6”,“request_id”:230,“extra_op”:1,“Request”:null}"] [addr=192.168.60.4:20161]
[2022/06/30 11:11:23.196 +08:00] [INFO] [processor.go:175] [“Add Table finished”] [changefeed=tidb-to-kafka-test11] [tableID=96]
[2022/06/30 11:11:23.197 +08:00] [INFO] [agent.go:330] [“Agent finish processing operation”] [changefeed=tidb-to-kafka-test11] [op="{“TableID”:96,“IsDelete”:false,“Epoch”:“031790eb-949a-4502-a602-18f64801c4b8”,“FromOwnerID”:“b290c64b-489c-4bfa-8a4f-677fb879e33d”}"]
[2022/06/30 11:11:23.197 +08:00] [INFO] [agent.go:212] [“SchedulerAgent: FinishTableOperation”] [message="{“id”:96,“epoch”:“031790eb-949a-4502-a602-18f64801c4b8”}"] [successful=true] [changefeedID=tidb-to-kafka-test11] [ownerID=b290c64b-489c-4bfa-8a4f-677fb879e33d]
[2022/06/30 11:11:23.197 +08:00] [INFO] [schedule_dispatcher.go:567] [“owner received dispatch finished”] [changefeed=tidb-to-kafka-test11] [captureID=b290c64b-489c-4bfa-8a4f-677fb879e33d] [tableID=96] [epoch=031790eb-949a-4502-a602-18f64801c4b8]
[2022/06/30 11:11:23.299 +08:00] [INFO] [ddl_sink.go:218] [“ddl already executed”] [changefeed=tidb-to-kafka-test11] [ddlFinishedTs=434256918394961922] [DDL="{“StartTs”:434256918382116873,“CommitTs”:434256918394961922,“TableInfo”:{“Schema”:“testcdc”,“Table”:“test333”,“TableID”:96,“ColumnInfo”:[{“Name”:“id”,“Type”:3},{“Name”:“name”,“Type”:15}]},“PreTableInfo”:null,“Query”:“CREATE TABLE testcdc.test333 (id INT NOT NULL,name VARCHAR(255) NULL,PRIMARY KEY(id))”,“Type”:3}"]

目前还出现了另外个奇怪的现象, kafka接收一直在刷日志,而且offset也一直在变

上面这个截图里用的 Kafka 完整命名是什么呢?

可以正常生产和消费, 可能kafka消费有问题, 也是没有消息体的。

看了一下日志里的内容,有 “Execute DDL succeeded” ,说明 DDL 已经正常写到下游 Kafka 了

tiup cdc cli changefeed create --pd=http://192.168.60.4:2379 --sink-uri=“kafka://192.168.60.207:9092/tidb-to-kafka-test11?protocol=canal-json&kafka-version=3.2.0&partition-num=1&max-message-bytes=67108864&replication-factor=1&enable-tidb-extension=true” --changefeed-id=“tidb-to-kafka-test11” --sort-engine=“unified”

我用flink来监控当前这个kafka topic, 也一直在刷日志, 但是flink监控的可以有新增数据这些,

这个 flink 的截图看上去是正常的,是 canal-json 的正常输出

上面这个只刷元数据的截图,怀疑是不是 kafka 消费的命令没用对

我的消费命令
bin/kafka-verifiable-consumer.sh --broker-list 192.168.60.207:9092 --topic tidb-to-kafka-test11 --group-id test11

flink监控的kafka,也是一样一直在刷元数据,而且offset也一直在变

你用 kafka-console-consumer.sh 试一下看一下是不是还是只输出元数据

是的,还是一直在刷新

您的意思是 Flink 这边看到的是元数据跟数据混在一起输出?

是的,


{“id”:0,“database”:“testcdc”,“table”:“test333”,“pkNames”:[“id”],“isDdl”:false,“type”:“INSERT”,“es”:1656559518981,“ts”:1656559520925,“sql”:“”,“sqlType”:{“id”:4,“name”:12},“mysqlType”:{“id”:“int”,“name”:“varchar”},“data”:[{“id”:“22”,“name”:“bb”}],“old”:null,“_tidb”:{“commitTs”:434257138543755266}}
{“id”:0,“database”:“”,“table”:“”,“pkNames”:null,“isDdl”:false,“type”:“TIDB_WATERMARK”,“es”:1656559520880,“ts”:1656559523120,“sql”:“”,“sqlType”:null,“mysqlType”:null,“data”:null,“old”:null,“_tidb”:{“watermarkTs”:434257139041566722}}
{“id”:0,“database”:“”,“table”:“”,“pkNames”:null,“isDdl”:false,“type”:“TIDB_WATERMARK”,“es”:1656559521880,“ts”:1656559524120,“sql”:“”,“sqlType”:null,“mysqlType”:null,“data”:null,“old”:null,“_tidb”:{“watermarkTs”:434257139303710722}}