TiCDC 同步cananl-json格式数据到kafka并使用Flink消费时的问题

为提高效率,请提供以下信息,问题描述清晰能够更快得到解决:

【TiDB 版本】V5.0.1

【问题描述】
TiCDC写入Kafka(canal-json),然后使用Flink 基于SQL 创建Source 表消费写入MySQL,消费时报数据格式错误,因为CDC捕获数据到一个Topic,Flink消费时会取所有变换表的数据,请问如何通过TiCDC配置让CDC可以一个Table一个Topic?或者一下错误,如何处理比较和合适?
at org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema.deserialize(CanalJsonDeserializationSchema.java:285)
at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 16)): only regular white space (\r, \ , \t) is allowed between tokens
at [Source: UNKNOWN; line: 1, column: 2]
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:710)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:688)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._skipWSOrEnd(UTF8StreamJsonParser.java:3012)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:724)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4622)
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:3056)
at org.apache.flink.formats.json.JsonRowDataDeserializationSchema.deserializeToJsonNode(JsonRowDataDeserializationSchema.java:117)
at org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema.deserialize(CanalJsonDeserializationSchema.java:213)
… 7 more


若提问为性能优化、故障排查类问题,请下载脚本运行。终端输出的打印结果,请务必全选并复制粘贴上传。

1 个赞

目前 ticdc 不支持 一个 schema 对应一个 topic。如果需要这样处理
建议一个 changefeed 对应一个 schema 再进行 相关的处理

另外 changefeed 过多非常容易引起 TiCDC 的内存资源消耗

如果只是数据同步 建议直接使用 ticdc 的 sink-mysql 模式 直接写入到 下游 mysql 数据库

目前 ticdc 在 5.0.1 上存在一个验证的已知问题。建议等到 6月初使用 5.0.2 版本的 tidb 集群+ticdc

1 个赞

我是想通过TiCDC直接写入其它存储,譬如数据湖,而不是mysql。如果这样,是否必须自己去单独解析再入库?

1 个赞

ticdc 的 sink 是支持 很多种 message format 的

包括 canal-json maxwell 等等
如果是自定义的 消息类型 需要 按照 标准的开放数据协议来进行 入库程序的开发

1 个赞

此话题已在最后回复的 1 分钟后被自动关闭。不再允许新回复。