TiSpark 批量写入TiDB,pyspark可否批量写入TiDB

成功官方例子:
以下通过 scala API 演示如何使用 TiSpark 批量写入:

// 选择需要写入的数据
val df = spark.sql("select * from tpch.ORDERS")

// 将数据写入 tidb
df.write.
  format("tidb").
  option("tidb.addr", "127.0.0.1").
  option("tidb.port", "4000").
  option("tidb.user", "root").
  option("tidb.password", "").
  option("database", "tpch").
  option("table", "target_orders").
  mode("append").
  save()

**如果把scala api 换成 pyspark 是否可行?**

用pyspark 测试上述例子报错,

Traceback (most recent call last):
  File "<stdin>", line 9, in <module>
  File "/opt/spark/spark-2.3.0/python/pyspark/sql/readwriter.py", line 701, in save
    self._jwrite.save()
  File "/opt/spark/spark-2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/opt/spark/spark-2.3.0/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/spark/spark-2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o48.save.
: com.pingcap.tikv.exception.TiBatchWriteException: TiExtensions is disable!
	at com.pingcap.tispark.write.TiDBWriter$.write(TiDBWriter.scala:55)
	at com.pingcap.tispark.TiDBDataSource.createRelation(TiDBDataSource.scala:57)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)


有解决方法吗

据我了解,pyspark很早之前曾经有过一个版本,目前应该是不支持了,官方也没有说法。

pyspark 中是否有配置 tiextension, 该错误看起来是缺失了相关配置

https://github.com/pingcap/tispark/wiki/PySpark
参考一下官方的说明,其中:

TiSpark 版本 Spark 版本 是否需要 pytispark
< 2.0 < 2.3 :white_check_mark:
2.4.x 2.3.x 2.4.x pyspark :negative_squared_cross_mark: spark-sumbit :white_check_mark:
2.5.x > 3.0 :negative_squared_cross_mark:

简单来说,
tispark < 2.0和spark < 2.3,必须使用pytispark
tipsark 2.4.x 和 spark 2.3.x 2.4.x 在spark-sumbit情况下使用pytispark,其他情况直接使用
tipsark 2.5.x 和 spark > 3.0 直接使用

tipsark 2.5.x 和 spark > 3.0,参考代码如下:

  1. 配置 spark-defaults.conf
spark.sql.extensions  org.apache.spark.sql.TiExtensions
spark.tispark.pd.addresses  ${your_pd_adress}
spark.sql.catalog.tidb_catalog  org.apache.spark.sql.catalyst.catalog.TiCatalog
spark.sql.catalog.tidb_catalog.pd.addresses  ${your_pd_adress}
  1. 启动 pyspark
pyspark --jars tispark-assembly-{version}.jar
  1. Read
spark.sql("use tidb_catalog")
spark.sql("select count(*) from ${database}.${table}").show()
  1. Write
df1 = spark.sql("select * from ${src_database}.${src_table}").show()

df1.write
.format("tidb")
.option("database", ${target_database})
.option("table", ${target_table})
.option("tidb.addr","127.0.0.1")
.option("tidb.password","")
.option("tidb.port","4000")
.option("tidb.user","root")
.mode("append")
.save()

应该就是配置文件缺失配置引起的。org.apache.spark.sql.TiExtensions