关于解析kafka中的binlog消息

【 TiDB 使用环境】测试
【 TiDB 版本】4.0.4
【复现路径】kafka重启后阿里云的DTS出现问题
【遇到的问题:如何解析kafka中的消息】
【资源配置】
【附件:截图/日志/监控】

目前是用的阿里云DTS读取kafka的binlog送往阿里云adb
但是前些天kafka一个实例重启,虽未停止服务但阿里云的DTS挂了,并且消费消息失败,给了我一个offset,我想看看这个offset里的消息是否可以跳过,但消费后发现数据格式不会解析…
本人并不熟悉go语言,不会用官方文档提到的driver工具,希望能解答下用python或者shell如何解析这种数据:b’\x08\x00\x10\x89\x80\x80\xeb\x94\xe3\xc2\x89\x06\x1a\xca\x02\n\xc7\x02\n\nuat_xxljob\x12\rxxl_job_group\x1a\x0b\n\x02id\x12\x03int\x18\x01\x1a\x15\n\x08app_name\x12\x07varchar\x18\x00\x1a\x12\n\x05title\x12\x07varchar\x18\x00\x1a\x19\n\x0caddress_type\x12\x07tinyint\x18\x00\x1a\x16\n\x0caddress_list\x12\x04text\x18\x00\x1a\x19\n\x0bupdate_time\x12\x08datetime\x18\x00"\x94\x01\x08\x01\x12G\n\x04\x10\xb2\xea\x01\n\x0c2\ngoods-task\n\x122\x10\xe7\x94\x9f\xe4\xba\xa7-\xe5\x95\x86\xe5\x93\x81JOB\n\x02\x10\x00\n\x02\x08\x01\n\x152\x132022-11-29 17:44:48\x1aG\n\x04\x10\xb2\xea\x01\n\x0c2\ngoods-task\n\x122\x10\xe7\x94\x9f\xe4\xba\xa7-\xe5\x95\x86\xe5\x93\x81JOB\n\x02\x10\x00\n\x02\x08\x01\n\x152\x132022-11-29 17:44:18*\r\n\x07PRIMARY\x12\x02id’

官方提供了 protobuf 文件
https://github.com/pingcap/tidb-tools/blob/release-4.0/tidb-binlog/proto/proto/binlog.proto

你可以通过这个文件来生成解析格式,如果你会 python,生成 python 能够操纵的 api 和 数据格式

然后你贴的这串码,就可以解开了


如果你身边有懂 Java 的小伙伴,可以求助一下,
https://github.com/pingcap/tidb-tools/tree/release-4.0/tidb-binlog/driver/example/kafkaReader

这里一份 kafkaReader example ,对着改改就可以用了

https://github.com/pingcap/tidb-tools/blob/release-4.0/tidb-binlog/driver/example/print/print.go

我改一下里面的KafkaAddr、offset、topic,能否直接运行这个go脚本来解决呢?

解析出来的也是 binlog 的protobuf 的定义,是否能满足你的要求,也不可知
不过可以试试

谢谢,已解决,让开发的同事改动了下这个 kafkareader

好的 :+1:

此话题已在最后回复的 60 天后被自动关闭。不再允许新回复。