几种使用方式
最近有好多小伙伴对使用TiSpark有一些疑问,现在对使用TiSpark写TiKV的使用demo举例如下,按照成熟程度(并不是按照写入效率)依次推荐,版本为TiSpark 3.0.0+Spark 3.0.X:
基础配置:
SparkConf conf = new SparkConf()
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
.set("spark.sql.catalog.tidb_catalog", "org.apache.spark.sql.catalyst.catalog.TiCatalog")
.set("spark.sql.catalog.tidb_catalog.pd.addresses", pd_addr)
.set("spark.tispark.pd.addresses", pd_addr);
推荐1:
spark.sql("use "+source_db_name);
String source_sql = "select * from "+source_table_name;
spark.sql(source_sql)
.repartition(20)
.write()
.mode(SaveMode.Append)
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
// replace the host and port with yours and be sure to use rewrite batch
.option("url", "jdbc:mysql://"+t_tidb_addr+":4000/"+target_db_name+"?rewriteBatchedStatements=true")
.option("useSSL", "false")
// as tested, setting to `150` is a good practice
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE(), 150)
// database name and table name here
.option("dbtable", target_table_name)
// set isolationLevel to NONE
.option("isolationLevel", "NONE")
// TiDB user here
.option("user", t_username)
.option("password", t_password)
.save();
推荐2
Map<String, String> tiOptionMap = new HashMap<String, String>();
tiOptionMap.put("tidb.addr", tidb_addr);
tiOptionMap.put("tidb.port", "4000");
tiOptionMap.put("tidb.user", username);
tiOptionMap.put("tidb.password", password);
tiOptionMap.put("replace", "true");
tiOptionMap.put("spark.tispark.pd.addresses", pd_addr);
spark.sql(source_sql)
.write()
.format("tidb")
.options(tiOptionMap)
.option("database", target_db_name)
.option("table", target_table_name)
.mode(SaveMode.Append)
.save();
推荐3
String source_sql = "select * from "+source_db_name+"."+source_table_name;
String create_table = "create table target_"+target_table_name+" using tidb options (\
" +
"tidb.user '"+username+"',\
" +
"tidb.password '"+password+"',\
" +
"tidb.addr '"+tidb_addr+"',\
" +
"tidb.port '4000',\
" +
"database '"+target_db_name+"',\
" +
"table '"+target_table_name+"'\
" +
")";
String insert_sql = "INSERT INTO target_"+target_table_name+" "+source_sql;
其中,推荐1和推荐2实测,效率接近,理论上推荐2速度更快一点,但是会有一些不兼容的问题,可以遇到问题解决问题,推荐3按照目前收集的信息来看,会有类型转换错误的风险,需要谨慎使用。
常见错误
最近论坛里也发现了一些常见的问题,大多数是误报,并不影响TiSpark运行,现列举如下:
1.ANTLR Tool version 4.8 used for code generation does not match the current runtime version 4.7.1ANTLR Runtime version 4.8 used for parser compilation does not match the current runtime version 4.7.122/07/27
原因:spark 3.1.x编译时引用了org.antlr:antlr4-runtime:4.8-1,tispark-assembly-3.1_2.12-3.0.1.jar用了tikv-client,tikv-client中用了org.antlr:antlr4-runtime:4.7,所以我猜测tispark-assembly-3.1_2.12-3.0.1.jar大概率引用的是org.antlr:antlr4-runtime:4.7。
影响:只是一个警告,暂无对运行有影响的明确现象
避免方式:采用Spark 3.0.X运行时无此错误
2.Failed to qet PD version For input string: “2379,XXX.XXX.XXX.XXX:23379,XXX.XXX.XXX.XXX:2379”
原因:此为一个BUG,只要是在维护PD地址的时候,配置了多个地址,就会有这个警告,此BUG已经修复:https://github.com/pingcap/tispark/pull/2473
影响:这个错误只和 telemetry 有关系,和主流程没关系,所以不影响主流程
避免方式: 可通过参数 spark.tispark.telemetry.enable = flase
关闭遥测功能。
https://tidb.net/book/tidb-monthly/2022-06/update/tispark-3-0-0#新特性
3.Database ‘test’ not Found
原因:大部分出现在由Spark 2.X升级到Spark 3.X之后出现,因为在Spark 3.X中使用了Catalog,读取表的时候,模式名的组成有了变化,如下图:
影响:无法正确读取数据库
避免方式:修改sql语句,采用正确的方式