TiCDC同步数据时对于SET和ENUM类型无法获取原始数据

【TiDB版本】v4.0.13
【问题】使用TiCDC增量同步工具时遇到以下问题:将增量数据同步到Kafka,由于SET和ENUM存储的是set中各个元素的二进制位或得到的整数或者enum中元素的编号,无法消费到原始的字符串数据。请问有没有什么方案可以拿到set和enum的真实数据?

例如:set(‘t’, ‘i’, ‘d’, ‘b’),则 ‘db’ -> 12,如何获取’db’而非12?

1 个赞

是说 old value 么 ?

不是old value,我是说对于set/enum类型TiDB并不存储实际的值,而是使用一个整数表示的,我想获得实际的字符串而不是存储的整数。存储方式见https://docs.pingcap.com/zh/tidb/stable/data-type-string中的set和enum部分。

我试了一下,得到的是字符串,不是对应的整数。

CREATE TABLE IF NOT EXISTS store (
id BIGINT NOT NULL PRIMARY KEY auto_increment,
count int default 0,
name SET(‘apple’,‘banana’,‘orange’,‘pitch’) NOT NULL
);

insert into store(count,name) values(10,‘apple’),(20,‘banana’);

select *from store;

id | count | name
----±------±-------
1 | 10 | apple
2 | 20 | banana
(2 rows)

您好,谢谢关注。但是我是说使用TiCDC同步到kafka再消费出来的数据哦。

可以提供验证步骤么 ?我们测试一下

部署集群拓扑的拓扑文件:
#cat topology.yaml
global:
user: “tidb”
ssh_port: 22
deploy_dir: “/tidb-deploy”
data_dir: “/tidb-data”

pd_servers:
- host: 11.159.61.12

tidb_servers:
- host: 100.88.104.115

tikv_servers:
- host: 11.159.61.13
- host: 11.159.61.14
- host: 11.159.61.16

cdc_servers:
- host: 11.159.61.17

monitoring_servers:
- host: 11.159.61.17
grafana_servers:
- host: 11.159.61.17

alertmanager_servers:
- host: 11.159.61.17

开启ticdc同步服务同步增量数据到kafka:
tiup ctl:v5.0.2 cdc changefeed create --pd=http://11.159.61.12:2379 --sink-uri=“kafka://11.159.61.10:9092/ticdc-test?max-message-bytes=6291456” --changefeed-id=“kafka-replication-task” --config=changefeed.toml --sort-engine=“unified”
其中:
#cat changefeed.toml
enable-old-value=true
kafka版本为2.1

消费端使用你们提供的包:
import com.pingcap.ticdc.cdc.TicdcEventData;
import com.pingcap.ticdc.cdc.key.TicdcEventKey;
import com.pingcap.ticdc.cdc.value.TicdcEventColumn;
import com.pingcap.ticdc.cdc.value.TicdcEventRowChange;

消费端主要代码为:
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
TicdcEventDecoder ticdcEventDecoder = null;
for(ConsumerRecord<byte[], byte[]> record : records){
KafkaMessage kafkaMessage= new KafkaMessage(record.key(), record.value());
kafkaMessage.setOffset(record.offset());
kafkaMessage.setPartition(record.partition());
receiveRecordTime = record.timestamp()/1000;
ticdcEventDecoder = new TicdcEventDecoder(kafkaMessage);
while(ticdcEventDecoder.hasNext()){
TicdcEventData ticdcEventData = ticdcEventDecoder.next();
if(ticdcEventData.getTicdcEventValue() instanceof TicdcEventRowChange){
boolean ok = filter.check(ticdcEventData.getTicdcEventKey().getTbl(), ticdcEventData.getTicdcEventValue().getKafkaPartition(), ticdcEventData.getTicdcEventKey().getTs());
if (ok) {
// deal with row change event
} else {
// ignore duplicated messages
}
}else if(ticdcEventData.getTicdcEventValue() instanceof TicdcEventDDL){
//System.out.println(“DDL”);
}else if(ticdcEventData.getTicdcEventValue() instanceof TicdcEventResolve){
// System.out.println(“resolve event!\t” + Thread.currentThread().getName() + “\t” + record.partition());
}
}
}

谢谢,有不详尽的地方请告诉我。

目前保存在 tikv 中的数据就是 bits,不是字符串,所以对应的结果是整数,无法抓取原值,workaround 是把 kafka 的消费者自己将 bits 对应到他们想要的字符串中。

好的,谢谢您。

此话题已在最后回复的 1 分钟后被自动关闭。不再允许新回复。