Java解析drainer发送到kafka中的binlog异常

protobuf版本:3.9.1

tidb: v3.0.0

kafka消费者接收到的binlog数据OK,如下图:


binlog.proto文件定义:




image

java解析代码: // value是从kafka得到的binlog日志,字符串形式。 BinLogInfo.Binlog binlog = BinLogInfo.Binlog.parseFrom(value.getBytes());

出现异常:

com.google.protobuf.InvalidProtocolBufferException: CodedInputStream encountered a malformed varint. at com.google.protobuf.InvalidProtocolBufferException.malformedVarint(InvalidProtocolBufferException.java:98) at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawVarint64SlowPath(CodedInputStream.java:1130) at com.google.protobuf.CodedInputStream$ArrayDecoder.readRawVarint64(CodedInputStream.java:1117) at com.google.protobuf.CodedInputStream$ArrayDecoder.readInt64(CodedInputStream.java:760) at com.tnp.search.proto.BinLogInfo$Binlog.(BinLogInfo.java:7524) at com.tnp.search.proto.BinLogInfo$Binlog.(BinLogInfo.java:7466) at com.tnp.search.proto.BinLogInfo$Binlog$1.parsePartialFrom(BinLogInfo.java:8467) at com.tnp.search.proto.BinLogInfo$Binlog$1.parsePartialFrom(BinLogInfo.java:8461) at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:158) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:191) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:203) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:208) at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48) at com.tnp.search.proto.BinLogInfo$Binlog.parseFrom(BinLogInfo.java:7820) at com.tnp.search.provider.synchronize.process.KafkaMqConsumerListener.dealMessage(KafkaMqConsumerListener.java:92) at com.tnp.search.provider.synchronize.process.KafkaMqConsumerListener.lambda$init$0(KafkaMqConsumerListener.java:62) at java.lang.Thread.run(Thread.java:748)

异常位置:

疑问: proto文件是按照官方提供的 https://pingcap.com/docs-cn/v3.0/reference/tools/tidb-binlog/binlog-slave-client/#%E9%85%8D%E7%BD%AE-kafka-drainer 编写的,和go语言解析不同的是java的proto文件没有如下标红的参数:

请问为啥java解析的时候出现异常? 或者提供一份java版本的proto文件。

官方只有 GO 版本,其他版本的暂时还没有哈

但是 按照官方给的proto文件格式解析不出来,这类问题怎么处理呢?

binlog里边的commit_ts是0,这个不对吧:

使用 pdreader 的proto也读不出来,同样的异常

proto 只是看数据定义的。如果需要 Drainer 输出其他的,比如输出到 Elasticsearch。 需要了解数据格式,然后自定义开发就好。

怎么自定义开发,需要修改drainer的源码吗,自定义格式在哪里配置?

我们的应用是基于java的,用protobuf的java的api解析的kafka的binlog,然后proto是用的官方提供的proto文件,但是解析失败,提示解析到commit_ts出现问题(上面错误信息已给出),tidb官方是否收集过其他用户关于java解析方面的demo,供开发者参考一下?

  1. 请问下 drainer 使用的是什么版本?
  2. 能否使用 golang 版本解析一下试试,我们项目里有相关的集成测试,可以看下 https://github.com/pingcap/tidb-binlog/tree/master/tests/kafka ,里面有现成的代码,可以编译一下 kafka.go 读取试试。如果 golang 版本解析没问题,可能就是 proto 的问题,我们再一起排查。
2赞

drainer版本:


我们把这个kafka.go的go工程拉下来了,但是下载相关依赖 下了快一个周了,没有搞定,不是缺这个包就是缺那个包,又没有熟悉go开发的,我们的应用还在跑,不想耗费太多的时间去搞了,官方有没有之前的客户使用java解析kafka binlog的实例我们参考一下,proto就是上文提到的按照官方提供的proto文件,使用protobuf 的java api解析,关键就一行代码【 BinLogInfo.Binlog binlog = BinLogInfo.Binlog.parseFrom(value.getBytes());】异常信息上面也列出了,实在没有办法了。。。

有一个用户之前用 java 解析成功过,我联系了他,看看能否给你提供些帮助。

好的,十分感谢,:+1:

问题得到解决,主要原因是kafka的序列化方式需要改成如下: Properties props = new Properties(); props.put(“bootstrap.servers”, searchTopicBO.getMqHost()); props.put(“group.id”, searchTopicBO.getConsumerGroup()); //自动提交位移 props.put(“enable.auto.commit”, “false”); props.put(“auto.commit.interval.ms”, “1000”); props.put(“session.timeout.ms”, “30000”); props.put(“max.poll.records”, “10”); props.put(“key.deserializer”, “org.apache.kafka.common.serialization.ByteArrayDeserializer”); props.put(“value.deserializer”, “org.apache.kafka.common.serialization.ByteArrayDeserializer”);

不要使用StringDeserializer或者其他的序列化方式。

有中文说明吗

感谢 @ceaserwang 提供的 java 示例