SourceFunction<String> tidbSource =
TiDBSource.<String>builder()
.database("mydb") // set captured database
.tableName("products") // set captured table
.tiConf(
TDBSourceOptions.getTiConfiguration(
"localhost:2399", new HashMap<>()))
.snapshotEventDeserializer(
new TiKVSnapshotEventDeserializationSchema<String>() {
@Override
public void deserialize(
Kvrpcpb.KvPair record, Collector<String> out)
throws Exception {
out.collect(record.toString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
})
.changeEventDeserializer(
new TiKVChangeEventDeserializationSchema<String>() {
@Override
public void deserialize(
Cdcpb.Event.Row record, Collector<String> out)
throws Exception {
out.collect(record.toString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
})
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
env.addSource(tidbSource).print().setParallelism(1);
env.execute("Print TiDB Snapshot + Binlog");
原版的没有转码操作呢,你试试看
另外 数据库集群用的什么编码?