ticdc同步到kafka出现异常字符

为提高效率,请提供以下信息,问题描述清晰能够更快得到解决:
【 TiDB 使用环境】

使用tidb-operator:v1.2.4部署tidb 5.3.0版本、ticdc 5.3.0版本

【概述】

使用ticdc同步变更日志到kafka中,使用flink读出来,发生问题,又自己写一个代码,从kafka中读出来,返现异常字符。

【问题】

使用flink读出来,报错如下:

java.io.IOException: Failed to deserialize JSON ''.
	at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:112)
	at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:50)
	at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
	at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130)
	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \
, \t) is allowed between tokens
 at [Source: UNKNOWN; line: 1, column: 2]
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:688)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2408)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:677)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4622)
	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056)
	at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserializeToJsonNode(JsonRowDataDeserializationSchema.java:117)
	at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserialize(JsonRowDataDeserializationSchema.java:106)
	... 9 more

使用自己写的消费代码,发现异常字符如下:

46
G{"ts":431068352530612227,"scm":"mydb","tbl":"products","rid":102,"t":1}
%{"u":{"db_name":{"t":15,"f":64,"v":"mydb3"},"description":{"t":15,"f":64,"v":"12V car battery1"},"id":{"t":3,"h":true,"f":11,"v":102},"name":{"t":15,"f":0,"v":"car battery"},"operation_ts":{"t":7,"f":65,"v":"2022-01-07 07:28:16"},"table_name":{"t":15,"f":64,"v":"products"}},"p":{"db_name":{"t":15,"f":64,"v":"mydb2"},"description":{"t":15,"f":64,"v":"12V car battery1"},"id":{"t":3,"h":true,"f":11,"v":102},"name":{"t":15,"f":0,"v":"car battery"},"operation_ts":{"t":7,"f":65,"v":"2022-01-07 07:28:16"},"table_name":{"t":15,"f":64,"v":"products"}}}
47
{"ts":431068352359956481,"t":3}

48
G{"ts":431068352805601283,"scm":"mydb","tbl":"products","rid":103,"t":1}
�{"u":{"db_name":{"t":15,"f":64,"v":"mydb3"},"description":{"t":15,"f":64,"v":"12-pack of drill bits with sizes ranging from #40 to #31"},"id":{"t":3,"h":true,"f":11,"v":103},"name":{"t":15,"f":0,"v":"12-pack drill bits"},"operation_ts":{"t":7,"f":65,"v":"2022-01-07 07:28:16"},"table_name":{"t":15,"f":64,"v":"products"}},"p":{"db_name":{"t":15,"f":64,"v":"mydb2"},"description":{"t":15,"f":64,"v":"12-pack of drill bits with sizes ranging from #40 to #31"},"id":{"t":3,"h":true,"f":11,"v":103},"name":{"t":15,"f":0,"v":"12-pack drill bits"},"operation_ts":{"t":7,"f":65,"v":"2022-01-07 07:28:16"},"table_name":{"t":15,"f":64,"v":"products"}}}
49
{"ts":431068352438599682,"t":3}

50
{"ts":431068352884506625,"t":3}

51
G{"ts":431068353159495684,"scm":"mydb","tbl":"products","rid":104,"t":1}
+{"u":{"db_name":{"t":15,"f":64,"v":"mydb3"},"description":{"t":15,"f":64,"v":"12oz carpenter's hammer1"},"id":{"t":3,"h":true,"f":11,"v":104},"name":{"t":15,"f":0,"v":"hammer"},"operation_ts":{"t":7,"f":65,"v":"2022-01-07 07:28:16"},"table_name":{"t":15,"f":64,"v":"products"}},"p":{"db_name":{"t":15,"f":64,"v":"mydb2"},"description":{"t":15,"f":64,"v":"12oz carpenter's hammer1"},"id":{"t":3,"h":true,"f":11,"v":104},"name":{"t":15,"f":0,"v":"hammer"},"operation_ts":{"t":7,"f":65,"v":"2022-01-07 07:28:16"},"table_name":{"t":15,"f":64,"v":"products"}}}
52
G{"ts":431068353395425284,"scm":"mydb","tbl":"products","rid":105,"t":1}
+{"u":{"db_name":{"t":15,"f":64,"v":"mydb3"},"description":{"t":15,"f":64,"v":"14oz carpenter's hammer1"},"id":{"t":3,"h":true,"f":11,"v":105},"name":{"t":15,"f":0,"v":"hammer"},"operation_ts":{"t":7,"f":65,"v":"2022-01-07 07:28:16"},"table_name":{"t":15,"f":64,"v":"products"}},"p":{"db_name":{"t":15,"f":64,"v":"mydb2"},"description":{"t":15,"f":64,"v":"14oz carpenter's hammer1"},"id":{"t":3,"h":true,"f":11,"v":105},"name":{"t":15,"f":0,"v":"hammer"},"operation_ts":{"t":7,"f":65,"v":"2022-01-07 07:28:16"},"table_name":{"t":15,"f":64,"v":"products"}}}
53
{"ts":431068353146388481,"t":3}

54
G{"ts":431068353631617027,"scm":"mydb","tbl":"products","rid":106,"t":1}
+{"u":{"db_name":{"t":15,"f":64,"v":"mydb3"},"description":{"t":15,"f":64,"v":"16oz carpenter's hammer1"},"id":{"t":3,"h":true,"f":11,"v":106},"name":{"t":15,"f":0,"v":"hammer"},"operation_ts":{"t":7,"f":65,"v":"2022-01-07 07:28:16"},"table_name":{"t":15,"f":64,"v":"products"}},"p":{"db_name":{"t":15,"f":64,"v":"mydb2"},"description":{"t":15,"f":64,"v":"16oz carpenter's hammer1"},"id":{"t":3,"h":true,"f":11,"v":106},"name":{"t":15,"f":0,"v":"hammer"},"operation_ts":{"t":7,"f":65,"v":"2022-01-07 07:28:16"},"table_name":{"t":15,"f":64,"v":"products"}}}
55
{"ts":431068353408532484,"t":3}

56
G{"ts":431068353867284483,"scm":"mydb","tbl":"products","rid":107,"t":1}
%{"u":{"db_name":{"t":15,"f":64,"v":"mydb3"},"description":{"t":15,"f":64,"v":"box of assorted rocks1"},"id":{"t":3,"h":true,"f":11,"v":107},"name":{"t":15,"f":0,"v":"rocks"},"operation_ts":{"t":7,"f":65,"v":"2022-01-07 07:28:16"},"table_name":{"t":15,"f":64,"v":"products"}},"p":{"db_name":{"t":15,"f":64,"v":"mydb2"},"description":{"t":15,"f":64,"v":"box of assorted rocks1"},"id":{"t":3,"h":true,"f":11,"v":107},"name":{"t":15,"f":0,"v":"rocks"},"operation_ts":{"t":7,"f":65,"v":"2022-01-07 07:28:16"},"table_name":{"t":15,"f":64,"v":"products"}}}

取数代码如下:

		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(1000);
			for (ConsumerRecord<String, String> record : records) {
				System.out.println(i);
				System.out.println(record.key());
				System.out.println(record.value());
				i++;
			}
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

查看cdc运行状态正常,日志都没有明显报错
请问是什么原因导致的每个json文件之前有形似

image
的异常字符?怎么配置能够消除掉异常字符?


若提问为性能优化、故障排查类问题,请下载脚本运行。终端输出的打印结果,请务必全选并复制粘贴上传。

1 个赞

https://asktug.com/t/topic/68884
Flink 最佳实践之 通过 TiCDC 将 TiDB 数据流入 Flink

参考下这个~

原文章作者猜想是 cdc 的格式不对

2 个赞

能变更为 protobuf 的格式定义么?

问题已解决,通过增加protocol=maxwell参数
cdc提交任务的语句:
有乱码时:cdc cli changefeed create --pd=http://basic-pd:2379 --sink-uri=“kafka://kafka:9092/tidb-cdc-event?kafka-version=2.4.0&partition-num=1&max-message-bytes=67108864&replication-factor=1” --changefeed-id=“kafka-replication-task” --sort-engine=“unified”
无乱码时:cdc cli changefeed create --pd=http://basic-pd:2379 --sink-uri=“kafka://kafka:9092/tidb-cdc-event?kafka-version=2.4.0&partition-num=1&max-message-bytes=67108864&replication-factor=1&protocol=maxwell” --changefeed-id=“kafka-replication-task” --sort-engine=“unified”

我显式指定protocol=default后,也有乱码。

2 个赞

建议使用 canal-json 格式,从5.4.0版本开始 canal-json 格式会正式 GA,并生产可用

2 个赞

我们数据平台应用maxwell比较多,我采用了maxwell格式后就没问题了

1 个赞

反序列化失败?

1 个赞

此话题已在最后回复的 1 分钟后被自动关闭。不再允许新回复。