pytispark有适配spark3.1.x的版本吗

pypi上的pytispark2.0依赖的spark2.3.3提交时间2019.3.26请教一下有适配spark3的pytispark版本吗

2 个赞

目前还没有支持 spark3 的版本

2 个赞

TiSpark 支持 spark3 的版本是有的, pytispark 应该是好久没有发版了,我去问问之前怎么发版的,目前应该是没有支持。

2 个赞

tispark在spark3.1.3的环境下已经跑过测试了,就是pytispark跑不通

1 个赞

是不是以前的业务有使用pyspark的?无论从业界还是社区的实践,java或者scala的多一些,这方面能提供的帮助比较多。

1 个赞

有一些team是用pyspak的只是了解一下tidb在这方面的能力

1 个赞

我们团队用pyspark的少,就是想交流一下,采用的pyspark的team是以前的开发语言是python所以转到了pyspark,还是pyspark有spark在当前业务场景下不具有的能力,所以采用的pyspark,这一块您了解么?因为最近社区,最近问pyspark的也比较多,想了解一些这方面的情况。

1 个赞

据我了解有一部分是因为python下轮子比较多所以有一部分非关键性的业务和poc工作是在pyspark下做的

2 个赞

我们也有上述场景,不过都是单机环境,一个notebook也是解决了大部分场景,非常感谢。

1 个赞

有一些poc场景需要在在集群上跑批所以计算引擎最好是一致的

1 个赞

其实我有个观念不知道对不对:
pyspark其实还是用的spark的rdd机制在做计算,pyspark使用python的那些库,其实还是单机利用,能想到的场景是类似通过rdd的foreachpartition算子,让数据片(partition)在分布式环境中的多台机器利用python库计算,python并没有做分布式计算。
可能有点绕口,不知道这个认知跟你们的场景对不对的上。

2 个赞

因为每个executor都会启动一个python进程,所以在您提到的foreach partition这个场景中每个executor的python进程中的函数会收到一个这个partition的迭代器作为入参进行业务处理,只需在每个executor节点上安装所需要的python库就可以分布式计算。

4 个赞

我也是同样的需求,用 pyspark,python 的库比较多,数据处理时比较方便
我都是根据需要开一个 emr 集群,启动的时候在各个节点上安装好所有的依赖包,在上面跑 pyspark 任务

2 个赞

https://github.com/pingcap/tispark/wiki/PySpark
参考一下官方的说明,其中:

TiSpark 版本 Spark 版本 是否需要 pytispark
< 2.0 < 2.3 :white_check_mark:
2.4.x 2.3.x 2.4.x pyspark :negative_squared_cross_mark: spark-sumbit :white_check_mark:
2.5.x > 3.0 :negative_squared_cross_mark:

简单来说,
tispark < 2.0和spark < 2.3,必须使用pytispark
tipsark 2.4.x 和 spark 2.3.x 2.4.x 在spark-sumbit情况下使用pytispark,其他情况直接使用
tipsark 2.5.x 和 spark > 3.0 直接使用

tipsark 2.5.x 和 spark > 3.0,参考代码如下:

  1. 配置 spark-defaults.conf
spark.sql.extensions  org.apache.spark.sql.TiExtensions
spark.tispark.pd.addresses  ${your_pd_adress}
spark.sql.catalog.tidb_catalog  org.apache.spark.sql.catalyst.catalog.TiCatalog
spark.sql.catalog.tidb_catalog.pd.addresses  ${your_pd_adress}
  1. 启动 pyspark
pyspark --jars tispark-assembly-{version}.jar
  1. Read
spark.sql("use tidb_catalog")
spark.sql("select count(*) from ${database}.${table}").show()
  1. Write
df1 = spark.sql("select * from ${src_database}.${src_table}").show()

df1.write
.format("tidb")
.option("database", ${target_database})
.option("table", ${target_table})
.option("tidb.addr","127.0.0.1")
.option("tidb.password","")
.option("tidb.port","4000")
.option("tidb.user","root")
.mode("append")
.save()

此话题已在最后回复的 1 分钟后被自动关闭。不再允许新回复。