【 TiDB 版本】
6.1.0
【遇到的问题】
new TiKVSnapshotEventDeserializationSchema() {
@Override
public void deserialize(
Kvrpcpb.KvPair record, Collector out)
throws Exception {
System.out.println(“==============”);
System.out.println(record.getValue().toStringUtf8());
int serializedSize = record.getSerializedSize();
out.collect(RowKey.decode(record.getValue().toByteArray()).toString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
})
打印的语句是乱码: e !"#$&'( " @ A ] c o { � � � � � [{�������1s{��0000004c0f0446aab996f270ef2371ef�7南通工程有限公司A[“河南省交通运输厅”]省级信用评级企业荣誉信用评级公路水运交通类河南省�A 公路建设施工企业信用评价南通工程有限公司获得了2020信用评级公路水运交通类Af48405721ba019df0526adefbfcf94872021-04-08 P�2020�a�a河南省交通运输厅 ���杨 f���
【复现路径】
Flinkcdc连接tidb
【问题现象及影响】
需要获取正确的文本或json格式数据
【附件】
xfworld
(魔幻之翼)
2022 年8 月 24 日 00:13
2
SourceFunction<String> tidbSource =
TiDBSource.<String>builder()
.database("mydb") // set captured database
.tableName("products") // set captured table
.tiConf(
TDBSourceOptions.getTiConfiguration(
"localhost:2399", new HashMap<>()))
.snapshotEventDeserializer(
new TiKVSnapshotEventDeserializationSchema<String>() {
@Override
public void deserialize(
Kvrpcpb.KvPair record, Collector<String> out)
throws Exception {
out.collect(record.toString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
})
.changeEventDeserializer(
new TiKVChangeEventDeserializationSchema<String>() {
@Override
public void deserialize(
Cdcpb.Event.Row record, Collector<String> out)
throws Exception {
out.collect(record.toString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
})
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env.addSource(tidbSource).print().setParallelism(1);
env.execute("Print TiDB Snapshot + Binlog");
原版的没有转码操作呢,你试试看
另外 数据库集群用的什么编码?
对,我也是教程上直接复制下来的, 如果直接按你这样那么输出是这种
,
数据库的编码:
消息终结者
2022 年8 月 24 日 02:31
4
遇到同样的问题,整了一天没整好,准备试试sql方式
sql方式可以,只是读了一段时间数据,报了个 java.io.EOFException异常。就想着换成api方式,调试下原因
Peiqi
(Ti D Ber Zn Dlw08x)
2023 年2 月 6 日 03:13
7
通过 flink CDC 的api 读取到数据,好像是经过TiKV处理过,并不是乱码。我目前也遇到了这个问题,暂时还不知道如何将得到的TiKV的数据还原成原始的数据
天下第一帅
2023 年4 月 1 日 18:08
9
坑爹的地方是因为record.getValue()和record.getKey()不能直接toString解析!
参考一下RowDataTiKVSnapshotEventDeserializationSchema的源码,做一下改造就好了。
Object[] tikvValues =
decodeObjects(
record.getValue().toByteArray(),
RowKey.decode(record.getKey().toByteArray()).getHandle(),
tableInfo);
希望官方能把早点这个问题修复吧!易用性比较差
天下第一帅
2023 年4 月 1 日 18:09
11
坑爹的地方是因为record.getValue()和record.getKey()不能直接toString解析!
参考一下RowDataTiKVSnapshotEventDeserializationSchema的源码,做一下改造就好了。
Object[] tikvValues =
decodeObjects(
record.getValue().toByteArray(),
RowKey.decode(record.getKey().toByteArray()).getHandle(),
tableInfo);
希望官方能把早点这个问题修复吧!易用性比较差
请问一下,最后你贴的这种方式是把标题中的乱码问题解决了吗?
天下第一帅
2023 年4 月 2 日 13:16
14
这个其实就是 Kvrpcpb.KvPair 官方没有说明,需要参考RowDataTiKVSnapshotEventDeserializationSchema去做解析而已
消息终结者
2023 年11 月 22 日 07:29
18
这么处理:
Map<String, String> map = new HashMap<>();
map.put(“tikv.grpc.timeout_in_ms”, “30000”);
map.put(“tikv.grpc.keepalive_time”, “30000”);
TiConfiguration tiConfiguration = TDBSourceOptions.getTiConfiguration(“localhost:2379”, map);
TiSession session = TiSession.create(tiConfiguration);
TiTableInfo tableInfo = session.getCatalog().getTable(“库名”, “表名”);
Object[] objArray = TableCodec.decodeObjects(valueByteArray, RowKey.decode(keyByteArray).getHandle(), tableInfo);
TiDb_01
(Ti D Ber D1bwh5 Bi)
2024 年1 月 5 日 02:58
22
大佬 您好 可以贴一个全一点的样例吗 这边试了下没效果
system
(system)
关闭
2024 年3 月 7 日 04:09
24
此话题已在最后回复的 60 天后被自动关闭。不再允许新回复。