【 TiDB 使用环境】测试
【 TiDB 版本】
TIDB v4.0.8
flink-sql-connector-tidb-cdc 2.2.1
flink-clients_2.12 1.13.5
【遇到的问题】
使用flink cdc组件读取tikv的数据,读取结果为其他编码,想知道具体编码格式,以及如何将读取数据转化为json格式,以方便对数据进行解析?
使用record.getKey().toStringUtf8() 转为utf8 只有中文不乱码。
【复现路径】
flink 官网提供的例子,直接运行就是这个结果;
链接:https://ververica.github.io/flink-cdc-connectors/master/content/connectors/tidb-cdc.html
【问题现象及影响】
代码如下
Map<String, String> map = new HashMap<>();
map.put(“tikv.grpc.timeout_in_ms”,“20000”);
SourceFunction tidbSource =
TiDBSource.builder()
.database(“db_bill_shop”) // set captured database
.tableName(“test”) // set captured table
.tiConf(
TDBSourceOptions.getTiConfiguration(pdAddrs, map)
)
.snapshotEventDeserializer(
new TiKVSnapshotEventDeserializationSchema() {
@Override
public void deserialize(Kvrpcpb.KvPair record, Collector out)throws Exception {
// System.out.println(“-------key---------”+record.getKey().toStringUtf8());
// System.out.println(“--------value------”+record.getValue().toStringUtf8());
out.collect(record.toString());
}
@Override
public TypeInformation getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
})
.changeEventDeserializer(
new TiKVChangeEventDeserializationSchema() {
@Override
public void deserialize(Cdcpb.Event.Row record, Collector out)throws Exception {
out.collect(record.toString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
})
.startupOptions(StartupOptions.initial())
.build();
env.enableCheckpointing(3000);
env.addSource(tiDBSource).print().setParallelism(1);
try {
env.execute("Tidb-CDC-JOB");
} catch (Exception e) {
log.error("Tidb-CDC-JOB 执行失败!");
e.printStackTrace();
}
打印结果:
有遇到相同问题的小伙伴吗?
【附件】
请提供各个组件的 version 信息,如 cdc/tikv,可通过执行 cdc version/tikv-server --version 获取。