flink-connector-tidb-cdc同步数据报错

【 TiDB 使用环境】
【 TiDB 版本】
【复现路径】做过哪些操作出现的问题
【遇到的问题:】通过flink-connector-tidb-cdc读取tidb数据时,同步模式为:‘scan.startup.mode’ = ‘latest-offset’。刚开始没有报错,过了一段时间后报错:failed to get member from pd server.和UNAVAILABLE: Keepalive failed. The connection is likely gone。同时也看不到同步的数据。
【资源配置】

[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, time_record]], fields=[id, employee_id, date, fence_status]) (2/8)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 437453863341785118, regionId: 213496
[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, time_record]], fields=[id, employee_id, date, fence_status]) (7/8)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 437453863079641102, regionId: 213496
[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, time_record]], fields=[id, employee_id, date, fence_status]) (7/8)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 437453863341785118, regionId: 213496
[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, time_record]], fields=[id, employee_id, date, fence_status]) (4/8)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 437453863341785118, regionId: 213496
[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, time_record]], fields=[id, employee_id, date, fence_status]) (4/8)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 437453863603929106, regionId: 213496
[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, time_record]], fields=[id, employee_id, date, fence_status]) (8/8)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 437453863341785118, regionId: 213496
[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, time_record]], fields=[id, employee_id, date, fence_status]) (8/8)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 437453863603929106, regionId: 213496
[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, time_record]], fields=[id, employee_id, date, fence_status]) (2/8)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 437453863603929106, regionId: 213496
[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, time_record]], fields=[id, employee_id, date, fence_status]) (1/8)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 437453863341785118, regionId: 213496
[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, time_record]], fields=[id, employee_id, date, fence_status]) (1/8)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 437453863603929106, regionId: 213496
[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, time_record]], fields=[id, employee_id, date, fence_status]) (1/8)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 437453863879180289, regionId: 213496
[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, time_record]], fields=[id, employee_id, date, fence_status]) (5/8)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 437453863603929106, regionId: 213496
[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, time_record]], fields=[id, employee_id, date, fence_status]) (5/8)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 437453863879180289, regionId: 213496
[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, time_record]], fields=[id, employee_id, date, fence_status]) (6/8)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 437453863603929106, regionId: 213496
[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, time_record]], fields=[id, employee_id, date, fence_status]) (8/8)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 437453863879180289, regionId: 213496
[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, time_record]], fields=[id, employee_id, date, fence_status]) (6/8)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 437453863879180289, regionId: 213496
[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, time_record]], fields=[id, employee_id, date, fence_status]) (4/8)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 437453863879180289, regionId: 213496
[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, time_record]], fields=[id, employee_id, date, fence_status]) (3/8)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 437453863603929106, regionId: 213496
[Legacy Source Thread - Source: TableSourceScan(table=[[default_catalog, default_database, time_record]], fields=[id, employee_id, date, fence_status]) (3/8)#0] INFO org.tikv.cdc.CDCClient - handle resolvedTs: 437453863879180289, regionId: 213496
[PDClient-update-leader-pool-0] WARN org.tikv.common.PDClient - failed to get member from pd server.
org.tikv.shade.io.grpc.StatusRuntimeException: UNAVAILABLE: Keepalive failed. The connection is likely gone
at org.tikv.shade.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:287)
at org.tikv.shade.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:268)
at org.tikv.shade.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:175)
at org.tikv.kvproto.PDGrpc$PDBlockingStub.getMembers(PDGrpc.java:1868)
at org.tikv.common.PDClient.getMembers(PDClient.java:443)
at org.tikv.common.PDClient.tryUpdateLeader(PDClient.java:565)
at org.tikv.common.PDClient.lambda$initCluster$15(PDClient.java:730)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

推荐使用 TiCDC 同步到 Kafka 然后再用 Flink 消费。https://docs.pingcap.com/zh/tidb/stable/manage-ticdc#sink-uri-配置-kafka

直接使用flink-connector-tidb-cdc消费数据时有什么问题吗?或者说flink-connector-tidb-cdc存在什么问题?

flink-connector-tidb-cdc 是社区贡献的,官方没测试过。

好的,感谢

此话题已在最后回复的 60 天后被自动关闭。不再允许新回复。