使用flink cdc tidb connector 做增量数据同步处理,flink cdc任务启动后报错java.lang.NullPointerException at org.tikv.cdc.CDCClient.getMinResolvedTs(CDCClient.java:100)

【 TiDB 使用环境】测试环境

【概述】 使用flink cdc tidb connector 做增量数据同步处理,flink cdc任务启动后报错java.lang.NullPointerException at org.tikv.cdc.CDCClient.getMinResolvedTs(CDCClient.java:100)

【Flink + TiDB 上下游关系和逻辑】使用Flink CDC监测 TiDB库表得的changelog,并将变化数据做处理后写入到另一个TiDB的库表中。

【现象】 使用flink cdc tidb connector 做增量数据同步处理,flink cdc任务启动后报错java.lang.NullPointerException at org.tikv.cdc.CDCClient.getMinResolvedTs(CDCClient.java:100)

【业务影响】 flink cdc任务一直处于循环报错重启状态,无法完成同步

【 TiDB 版本】 5.4.0

【Flink 版本】 1.13.6
【Flink CDC版本】2.2.1

【日志】

2022-06-06 12:07:42,016 ERROR org.tikv.cdc.CDCClient                                       [] - failed to start:
java.lang.IllegalArgumentException: Invalid range: [+INF..{116,128,0,0,0,0,0,0,72,95,115,0,0,0,0,0,0,0,0})
	at org.tikv.shade.com.google.common.collect.Range.<init>(Range.java:358) ~[blob_p-d9beb6ee66a2dfe71c625689b58326788233f48c-727df68a156f63aff141dbcf13364f79:?]
	at org.tikv.shade.com.google.common.collect.Range.create(Range.java:156) ~[blob_p-d9beb6ee66a2dfe71c625689b58326788233f48c-727df68a156f63aff141dbcf13364f79:?]
	at org.tikv.shade.com.google.common.collect.Range.intersection(Range.java:558) ~[blob_p-d9beb6ee66a2dfe71c625689b58326788233f48c-727df68a156f63aff141dbcf13364f79:?]
	at org.tikv.cdc.CDCClient.overlapWithRegion(CDCClient.java:214) ~[blob_p-d9beb6ee66a2dfe71c625689b58326788233f48c-727df68a156f63aff141dbcf13364f79:?]
	at org.tikv.cdc.CDCClient.addRegions(CDCClient.java:167) ~[blob_p-d9beb6ee66a2dfe71c625689b58326788233f48c-727df68a156f63aff141dbcf13364f79:?]
	at org.tikv.cdc.CDCClient.applyKeyRange(CDCClient.java:160) ~[blob_p-d9beb6ee66a2dfe71c625689b58326788233f48c-727df68a156f63aff141dbcf13364f79:?]
	at org.tikv.cdc.CDCClient.start(CDCClient.java:75) [blob_p-d9beb6ee66a2dfe71c625689b58326788233f48c-727df68a156f63aff141dbcf13364f79:?]
	at com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction.run(TiKVRichParallelSourceFunction.java:135) [blob_p-d9beb6ee66a2dfe71c625689b58326788233f48c-727df68a156f63aff141dbcf13364f79:?]
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) [flink-dist_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) [flink-dist_2.12-1.14.4.jar:1.14.4]
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323) [flink-dist_2.12-1.14.4.jar:1.14.4]
2022-06-06 12:07:42,016 INFO  com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction [] - read change event from resolvedTs:433721773119504386
2022-06-06 12:07:42,022 INFO  com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction [] - snapshotState checkpoint: 26 at resolvedTs: 433721773119504386
2022-06-06 12:07:42,117 INFO  org.tikv.cdc.CDCClient                                       [] - remove regions: []
2022-06-06 12:07:42,118 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Custom Source -> Sink: Print to Std. Out (1/1)#264 (638840b8ce67535c038e795519f64026) switched from RUNNING to FAILED with failure cause: java.lang.NullPointerException
	at org.tikv.cdc.CDCClient.getMinResolvedTs(CDCClient.java:100)
	at com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction.readChangeEvents(TiKVRichParallelSourceFunction.java:205)
	at com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction.run(TiKVRichParallelSourceFunction.java:136)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)

Flink CDC 貌似还有bug,升级到 2.3.0 试试

Maven dependency

<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-tidb-cdc</artifactId>
  <!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
  <version>2.3-SNAPSHOT</version>
</dependency>

https://github.com/ververica/flink-cdc-connectors/issues/1154

编译使用了2.3版本的Flink CDC ,依然报上面的错误。和issue1154的错误还不一样

  1. 这报错可以去 Flink CDC 社区问一下,看 TiDB 这块未解的 bug 有好几个;
  2. 针对 ticdc 这边的 log 有异常吗?可以贴下