FlinkCDC连接tidb乱码

【 TiDB 版本】
6.1.0

【遇到的问题】
new TiKVSnapshotEventDeserializationSchema() {
@Override
public void deserialize(
Kvrpcpb.KvPair record, Collector out)
throws Exception {
System.out.println(“==============”);
System.out.println(record.getValue().toStringUtf8());
int serializedSize = record.getSerializedSize();
out.collect(RowKey.decode(record.getValue().toByteArray()).toString());
}

                                @Override
                                public TypeInformation<String> getProducedType() {
                                    return BasicTypeInfo.STRING_TYPE_INFO;
                                }
                            })

打印的语句是乱码: e !"#$&'( " @ A ] c o { � � � � � [{�������1s{��0000004c0f0446aab996f270ef2371ef�7南通工程有限公司A[“河南省交通运输厅”]省级信用评级企业荣誉信用评级公路水运交通类河南省�A 公路建设施工企业信用评价南通工程有限公司获得了2020信用评级公路水运交通类Af48405721ba019df0526adefbfcf94872021-04-08 P�2020�a�a河南省交通运输厅 ���杨 f���

【复现路径】
Flinkcdc连接tidb

【问题现象及影响】
需要获取正确的文本或json格式数据

【附件】

SourceFunction<String> tidbSource =
            TiDBSource.<String>builder()
                .database("mydb") // set captured database
                .tableName("products") // set captured table
                .tiConf(
                    TDBSourceOptions.getTiConfiguration(
                        "localhost:2399", new HashMap<>()))
                .snapshotEventDeserializer(
                    new TiKVSnapshotEventDeserializationSchema<String>() {
                        @Override
                        public void deserialize(
                            Kvrpcpb.KvPair record, Collector<String> out)
                            throws Exception {
                            out.collect(record.toString());
                        }

                        @Override
                        public TypeInformation<String> getProducedType() {
                            return BasicTypeInfo.STRING_TYPE_INFO;
                        }
                    })
                .changeEventDeserializer(
                    new TiKVChangeEventDeserializationSchema<String>() {
                        @Override
                        public void deserialize(
                            Cdcpb.Event.Row record, Collector<String> out)
                            throws Exception {
                            out.collect(record.toString());
                        }

                        @Override
                        public TypeInformation<String> getProducedType() {
                            return BasicTypeInfo.STRING_TYPE_INFO;
                        }
                    })
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // enable checkpoint
        env.enableCheckpointing(3000);
        env.addSource(tidbSource).print().setParallelism(1);

        env.execute("Print TiDB Snapshot + Binlog");

原版的没有转码操作呢,你试试看
另外 数据库集群用的什么编码?

对,我也是教程上直接复制下来的, 如果直接按你这样那么输出是这种

,
数据库的编码:

遇到同样的问题,整了一天没整好,准备试试sql方式:rofl:

sql方式可以,只是读了一段时间数据,报了个 java.io.EOFException异常。就想着换成api方式,调试下原因

可以在里面的几个编码转化里面再试了看下排除一下

通过 flink CDC 的api 读取到数据,好像是经过TiKV处理过,并不是乱码。我目前也遇到了这个问题,暂时还不知道如何将得到的TiKV的数据还原成原始的数据

搞sql了 :smiley:

坑爹的地方是因为record.getValue()和record.getKey()不能直接toString解析!

参考一下RowDataTiKVSnapshotEventDeserializationSchema的源码,做一下改造就好了。
Object[] tikvValues =
decodeObjects(
record.getValue().toByteArray(),
RowKey.decode(record.getKey().toByteArray()).getHandle(),
tableInfo);

希望官方能把早点这个问题修复吧!易用性比较差

  • 列表条目

坑爹的地方是因为record.getValue()和record.getKey()不能直接toString解析!

参考一下RowDataTiKVSnapshotEventDeserializationSchema的源码,做一下改造就好了。
Object[] tikvValues =
decodeObjects(
record.getValue().toByteArray(),
RowKey.decode(record.getKey().toByteArray()).getHandle(),
tableInfo);

希望官方能把早点这个问题修复吧!易用性比较差

请问一下,最后你贴的这种方式是把标题中的乱码问题解决了吗?

解决了,没啥问题。

这个其实就是 Kvrpcpb.KvPair 官方没有说明,需要参考RowDataTiKVSnapshotEventDeserializationSchema去做解析而已

这个该怎么做呀,大佬,有示例参考下吗? :grinning:

这么处理:
Map<String, String> map = new HashMap<>();
map.put(“tikv.grpc.timeout_in_ms”, “30000”);
map.put(“tikv.grpc.keepalive_time”, “30000”);

TiConfiguration tiConfiguration = TDBSourceOptions.getTiConfiguration(“localhost:2379”, map);
TiSession session = TiSession.create(tiConfiguration);
TiTableInfo tableInfo = session.getCatalog().getTable(“库名”, “表名”);

Object[] objArray = TableCodec.decodeObjects(valueByteArray, RowKey.decode(keyByteArray).getHandle(), tableInfo);

学习了

大佬 您好 可以贴一个全一点的样例吗 这边试了下没效果

大佬厉害,学习了。

此话题已在最后回复的 60 天后被自动关闭。不再允许新回复。