tidb + structured streaming不支持checkPoint写入

为提高效率,提问时请提供以下信息,问题描述清晰可优先响应。

  • 【TiDB 版本】:3.0.5
    【问题描述】:

使用tispark+structured streaming执行aggregations操作时需保存state,无法为checkpoint指定“This checkpoint location has to be a path in an HDFS compatible file system”导致在程序运行时报错。
目前我们分析环境tispark+tidb,没有hdfs或类似的分布式文件系统,有没有其他解决方案。
ps:网上有提到使用OSS存储,单目前内网开发环境,无法联网,而且oss付费系统不太合适

0/03/26 18:57:49 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 192.168.10.203, executor 1): java.lang.IllegalStateException: Error reading delta file file:/data/checkpoint/test/state/0/0/1.delta of HDFSStateStoreProvider[id = (op=0,part=0),dir = file:/data/checkpoint/test/state/0/0]: file:/data/checkpoint/test/state/0/0/1.delta does not exist

首先这需要看你是否一定要 Spark Streaming 的 Checkpoint 机制。因为事实上也可以通过定期把数据存到 tidb 中来实现 checkpoint 的部分功能。

当然如果你使用了 mapWithState,就需要使用 checkpoint 保存 state 信息。一种思路是使用 local 模式可以通过提供 local dir 的方式提供 checkpoint 路径。

我使用的时structured-streaming,跟spark-streaming类似又不太一样。
1.checkpoint存储到tidb
程序运行过程中未使用 checkpoint取存储数据,只有最终将数据存储到tidb
structured-streaming可以设置checkpointLocal,用来自动存储offset等信息
2.设置checkpointLocal普通的简单流处理存储offset没有问题,当使用groupby语句时会导致sparksql自动使用State store。目前State store好像只有一个hdfs 类似的存储方式。在使用groupby后再保存到local dir会报异常。


参见:http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing

不好意思看错了,不过这两者的 checkpoint 机制是一致的,最好是使用 s3,hdfs 等方法储存 state 信息。当然如果使用 local 模式跑的话是可以用 local dir 的。你可以确认一下模式是否正确

使用的时tidb官网推荐的standalone模式

TiDB官网推荐standalone模式是因为standalone模式是集群模式,如果需要大规模计算的话是推荐使用这种分布式的集群模式的。但是针对你们这个case,集群模式要求checkpoint location必须是类似HDFS的存储,这个是spark原生的要求,我们也没什么办法绕过,但是理论上local模式的spark是可以使用local dir的,所以建议你试一下local 模式的spark能否在功能上满足你们现在的需求。当然就算能在功能上满足你们的需求,因为local模式在计算能力上肯定是不如standalone模式的,具体能不能真正用上local模式还得看你们的具体场景和具体需求了。

1 个赞

好的,谢谢。暂时我们还是不希望使用local模式,考虑使用oss或者其他方案

:handshake:

此话题已在最后回复的 1 分钟后被自动关闭。不再允许新回复。