cdc rowchange消息解析eof exception

  • 【TiDB 版本】:4.07
  • 【问题描述】:
    你好问一下我这边运用了官方的demo,但是解析value时报eof exception,key能正常解析
    �{“u”:{“adunit_show_id”:{“t”:15,“f”:48,“v”:“A9586D2727D47996E235C7F0FB03DC18”},“cacheTime”:{“t”:246,“f”:193,“v”:“4.00”},“create_time”:{“t”:7,“f”:1,“v”:“2020-09-02 16:33:06”},“enable”:{“t”:1,“f”:65,“v”:0},“enable_end_time”:{“t”:7,“f”:1,“v”:“2020-11-20 16:58:31”},“enable_start_time”:{“t”:7,“f”:1,“v”:“2020-09-26 16:58:28”},“id”:{“t”:3,“h”:true,“f”:11,“v”:5},“other_plat_id”:{“t”:3,“f”:49,“v”:99782},“update_time”:{“t”:7,“f”:33,“v”:“2020-11-17 15:09:42”}}} 此条是value信息

    为什么读出来的长度比真实value的长度还大?
    跑了一下测试用例里的demo,可以通过,请问一下这是哪里有问题

感谢,demo 地址辛苦提供下

方便发一下您从kafka 拉消息的相关代码吗?

之前有一个客户案例是,它使用的java kafka client 拉数据的时候直接按照 String 类型去拉的。由于cdc 的输出不是一个标准的 utf8,因此有一些不合法的bytes 可能被处理成其他的。后面换成 按照 Bytes 类型去拉数据就好了。我估计您也是一样的情况

地址是官方提供的:https://github.com/pingcap/ticdc/tree/master/demo/java

你说的对,我刚开始用的String去序列化的,现在改过来可以了。谢谢

大致的正确写法
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.ByteArrayDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.ByteArrayDeserializer”);

KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(getProperties());

1 个赞

@leoppro-PingCAP感谢帮助

@shuyingzhong感谢你的分享:love_you_gesture:

此话题已在最后回复的 1 分钟后被自动关闭。不再允许新回复。