为提高效率,请提供以下信息,问题描述清晰能够更快得到解决:
【TiDB 版本】V5.0.1
【问题描述】
TiCDC写入Kafka(canal-json),然后使用Flink 基于SQL 创建Source 表消费写入MySQL,消费时报数据格式错误,因为CDC捕获数据到一个Topic,Flink消费时会取所有变换表的数据,请问如何通过TiCDC配置让CDC可以一个Table一个Topic?或者一下错误,如何处理比较和合适?
at org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema.deserialize(CanalJsonDeserializationSchema.java:285)
at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 16)): only regular white space (\r, \
, \t) is allowed between tokens
at [Source: UNKNOWN; line: 1, column: 2]
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:688)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._skipWSOrEnd(UTF8StreamJsonParser.java:3012)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:724)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4622)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056)
at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserializeToJsonNode(JsonRowDataDeserializationSchema.java:117)
at org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema.deserialize(CanalJsonDeserializationSchema.java:213)
… 7 more
若提问为性能优化、故障排查类问题,请下载脚本运行。终端输出的打印结果,请务必全选并复制粘贴上传。