TiPySpark 如何使用jdbc写入数据

#!/usr/bin/python

-- coding: UTF-8 --

“”"
@author:xht
@file:testWriteToTidbJdbc.py
@time:2020/10/17
“”"

import platform
import sys
import os
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
import logging

输出一下 python 版本 以及相关的环境变量,

print(sys.version)
for line in sys.path:
print(line)

import findspark
findspark.init()

#os.environ[‘SPARK_HOME’] = “D:\dev\spark-2.3.3-bin-hadoop2.7”
#os.environ[“JAVA_HOME”] = “d:\dev\jdk1.8.0_141”
#os.unsetenv(“PYSPARK_SUBMIT_ARGS”)

from pytispark.pytispark import TiContext

#https://github.com/pingcap/tispark/blob/master/docs/datasource_api_userguide.md
#https://www.cnblogs.com/longsongpong/p/10126986.html
currentFileName =os.path.split(file)[-1].split(".")[0]
print(currentFileName)

sparkConf = SparkConf().
setMaster( “spark://192.168.10.131:7077”).
setAppName( ‘tet’).
setIfMissing(“spark.sql.extensions”, “org.apache.spark.sql.TiExtensions”).
setIfMissing(“spark.tispark.pd.addresses”, “tidb01:2379”).
setIfMissing(“spark.tispark.tidb.addr”, “192.168.10.131”).
setIfMissing(“spark.tispark.tidb.port”, “4000”)

spark = SparkSession.builder.appName(“test-write-tidb”)
.master(“spark://192.168.10.131:7077”)
.config(“spark.tispark.pd.addresses”, “192.168.10.131:2379”)
.config(“spark.executor.memory”, “1g”)
.config(“spark.executor.cores”, “2”)
.config(“spark.eventLog.enabled”, “true”)
.config(“spark.cores.max”, “6”)
.config(“tidb.addr”, “tidb01”)
.config(“tidb.user”, “root”)
.config(“tidb.password”, “”)
.config(“tidb.port”, “4000”)
.getOrCreate();

print(spark.version)

ti = TiContext(spark)

ti.tidbMapDatabase(“tpch_001”)

customer = spark.sql(“select * from customer limit 100000”)

df = customer.repartition(32)
df.write .mode(“append”) .format(“jdbc”)
.option(“driver”, “com.mysql.jdbc.Driver”)
.option(“url”, “jdbc:mysql://tidb01:4000/test?rewriteBatchedStatements=true”)
.option(“useSSL”, “false”)
.option(“JDBCOptions.JDBC_BATCH_INSERT_SIZE”, 150)
.option(“dbtable”, “cust_test_select”)
.option(“isolationLevel”, “NONE”)
.option(“user”, “root”)
.save()

if name == ‘main’:
pass

程序运行之后 就直接挂掉了,我该 如何 排查, 另外 感觉 社区内 PyTiSpark 相关案例比较少,请问 哪里有相关的学习资料呀, 我所在 的部门是数据组, 希望 使用Python版本的实现 ,

testWriteToTidbJdbc.py (2.3 KB)

有 pytispark 批量写入 tidb的案例吗

https://asktug.com/t/topic/693609,这里有很详细的说明