TiCDC 4.0.7 canal格式数据 如何解析

现在我kafka 解析 的消息格式如下,剩下我怎么去解析啊,在我java客户端的话。消息如下
value = a��
UTF-80�����.8BdspdevJsales_order_headerXb
target_sys_id(0 BQKK020Ravarchararemark1(0 Ravarchararemark2(0 Ravarchar)order_no(0 BTOB20102900026Ravarchardoc_type(0 BZFORRavarchar sales_org(0 BDF12Ravarchar
distr_chan(0 B10Ravarchardivision(0 B10Ravarchar sales_grp(0 Ravarchar sales_off(0Ravarchar+
purch_no_c(0 BTOB20102900026Ravarchar$[
purch_date(0 B
2020-10-29Rdate"aversion(0 T111ZGKMRavarchar$asold_to(0 B
F1YJW10122Ravarchar$aship_to(0 B
F1YJW10122Ravarchar’
created_by(0 B
F1YJW10122Ravarcharr3_sales_order(0Ravarchar/created_by_employee(0 B
2000035759Rbigint5created_by_employee_name(0 B
测试1012Ravarchar+created_by_dept(0 B
2000037984Rbigint)created_by_dept_code(0 BHNRavarchar-created_by_dept_name(0 B华南Ravarchar3]created_date(0 B2020-10-29 10:54:19datetime3last_update_by_employee(0 B
2000035759Rbigint9last_update_by_employee_name(0 B
测试1012Ravarchar/last_update_by_dept(0 B
2000037984Rbigint-last_update_by_dept_code(0 BHNRavarchar1last_update_by_dept_name(0 B华南Ravarchar7]last_update_date(0 B2020-10-29 10:54:19datetime is_enable(0 B1Ratinyint is_delete(0 B0Ratinyint(sale_order_status(0 BSAVERavarchar5created_by_parent_dept_id(0 B
customer_name(0 B%DSP测试客户1(小程序下单)Ravarchar-customer_sapcode(0 B
F1YJW10122RavarcharDsold_to_name(0 B%DSP测试客户1(小程序下单)RavarcharDship_to_name(0 B%DSP测试客户1(小程序下单)Ravarchar/total_discount_amount(0 Ba1888.70Radecimal’other_discount(0 B377.74Radecimal#other_amount(0 B0.00Radecimal$
pay_amount(0 Ba1888.70Radecimal$dsp_org_code(0 BHNFGSRavarchar.dsp_org_name(0 B华南分公司Ravarchar!delivery_address(0Ravarchar delivery_person(0Ravarchardelivery_phone(0Ravarcharpay_type(0Ravarcharcash_pay_image(0Ravarchar%cash_pay_bank_number(0Ravarchar over_sell(0 B0Ratinyintbreakup_flag(0Ravarchar)

2 个赞

请问消息协议使用的是 open protocol 还是 canal

1 个赞

哦,抱歉,标题写了是 canal 协议,canal 协议是按照 protobuf 格式编码的,解析可以参考

1 个赞

已经搞定了。大家在使用canal协议的时候要用canal client去接收kafka的信息

:+1:

问题来了,现在是canal输出格式的数据怎么都是UPDATE没有 Insert

这条数据是新增的

这个和 ticdc 的一个特性有关,简单说如果希望区分 update/insert,并且包含 update 的更新前值,可以开启 old-value 特性,开启方法参考文档:输出行变更的历史值

没有开启该特性是,只能获取到 update/delete 的事件类型。并且在 4.0.8 (2020/10/30发布)版本中,使用 canal 协议会强制开启 old-value 特性

Ok,tks,那我基本上要做完了,。。写了建议一下,能不能增加一个参数区分发送的是json害死protobuf这样我们下游就不用解析转换了。。直接flink就可以用了

canal-json 这个我们在开发测试中,预计在 4.0.9 会提供;4.0.9 之前的版本只包括 canal protobuf

现在试了下还是UPDATE

只是OLD有值,没有值的话是不是就可以认为是INSERT

4.0.9 大概什么时候发布?

11 月中旬应该会发版,可以关注一下 TiCDC 的 Github 动态。

挖坟贴,刚好最近在接入 TiCDCC => kafka,发现 4.0.13 是已经可以使用canal-json协议了,但是看到官方文档上还没更新,可用的 Protocol 还是 4 个:

目前支持 default、canal、avro 和 maxwell 四种协议。default 为 TiCDC Open Protocol

想问下是有什么问题背景吗?

同踩坑,且疑惑不解