coder
1
【概述】 欲将TiDB的增量数据同步到kafka,然后使用TicdcEventDecoder解析从kafka消费的数据时抛出EOFException。
【背景】
- 开启同步服务:#tiup ctl:v5.0.2 cdc changefeed create --pd=http://11.159.61.12:2379 --sink-uri=“kafka://xxx.xxx.xxx.xxx:9092/ticdc-test-new” --changefeed-id=“kafka-replication-task” --sort-engine=“unified” --config=changefeed.toml
其中changefeed.toml全部内容为:enable-old-value=true
- 解析的代码如下:
3.报错信息
【现象】 之前是可以正常读取的,应该是更新配置开启了old value特性之后出现上述问题。已尝试过关闭old value/更换sink-url的topic,均未解决问题。
【TiDB 版本】 v4.0.13
1 个赞
xfworld
(魔幻之翼)
2
能够看到kafka 消费的数据么?
比如:读到哪个offset后,出现这个问题,然后这个offset的数据,是什么样的描述?
coder
3
您好,这是出错时的数据。如果没有完全提供您想要的信息请告诉我。感谢回复。
coder
4
补充一下,这是value的值,第七个字节应该是长度的值,看起来不太对的样子。
xfworld
(魔幻之翼)
5
那能跳过这个offset 么~ 就是跳过这个错误的数据,然后在试试
coder
6
已解决!构造consumer时模板参数使用byte[]替换了String问题解决了,应该是消费的Kafka数据先存到String,然后getBytes造成编码上的问题造成的。谢谢~
1 个赞
system
(system)
关闭
7
此话题已在最后回复的 1 分钟后被自动关闭。不再允许新回复。