TiSpark 2.4.1(Spark 2.4.5)到TiSpark 2.5.0(Spark 3.0.X/3.1.X)迁移实践

【是否原创】是
【首发渠道】TiDB 社区
【首发渠道链接】其他平台首发请附上对应链接

背景介绍

image

喜大普奔,TiSpark 2.5.0发布了,其中最重要的特性是支持了Spark 3.0.X和Spark 3.1.X。因为在k8s上跑TiSpark始终使用的Spark 3.0.3的环境(原因见:TiSpark On Kubernetes实践 关键解释中的说明),也因为统一技术栈的需要,想要统一TiSpark的运行环境到3.0.X,对TiSpark 2.5.0进行了初步测试。
本文描述了从TiSpar k2.4.1(Spark 2.4.5)到TiSpark 2.5.0(Spark 3.0.X/3.1.X)所需做出的修改,运行时状态对比,出现的问题及解决方案。

基础环境版本

  • TiDB版本
    TiDB 5.4.0
  • 测试1:
    TiSpark 2.4.1+Spark 2.4.5
    submit环境Spark 3.0.3
  • 测试2:
    TiSpark 2.5.0+Spark 3.0.3
    submit环境Spark 3.0.3
  • 测试3:
    TiSpark 2.5.0+Spark 3.1.2
    submit环境Spark 3.1.2
  • 特别说明
    测试数据量为10万行。
    spark的executor运行参数如下:
--conf spark.executor.instances=5 \
--conf spark.executor.memory=1G \

测试2和测试3的结果基本一直,出现的问题也类同,不做单独分析。

迁移时需要的修改

连接参数修改

TiSpark 2.4.1迁移到TiSpark 2.5.0,并结合Spark 3.0.3/3.1.2使用时,需要在Spark的conf中增加如下配置:
image

.set("spark.sql.catalog.tidb_catalog","org.apache.spark.sql.catalyst.catalog.TiCatalog")
.set("spark.sql.catalog.tidb_catalog.pd.addresses", pd_addr)

此配置可以在Spark代码中SparkConf()构建的时候set,也可以在Spark submit或者Spark的spark-defaults.conf文件中配置。

读取代码的修改

上一小节增加的两行表示在Spark中增加一个名字是tidb_catalog的catalog,此处的catalog类似database,对后续代码的影响就是:

use tidb_catalog.sbtest

或者

select * from tidb_catalog.sbtest.sbtest_o

切换数据库需要按照database+schema的方式,读取表时需要按照database+schema+table的方式。
此处需要注意,写回TiDB的时候,不需要指定database,完整的代码如下:

//通过 TiSpark 将 DataFrame 批量写入 TiDB
Map<String, String> tiOptionMap = new HashMap<String, String>();
tiOptionMap.put("tidb.addr", tidb_addr);
tiOptionMap.put("tidb.port", "4000");
tiOptionMap.put("tidb.user", username);
tiOptionMap.put("tidb.password", password);
tiOptionMap.put("replace", "true");
tiOptionMap.put("spark.tispark.pd.addresses", pd_addr);

String source_db_name = "tidb_catalog.sbtest";
String source_table_name = "sbtest_o";
String target_db_name = "sbtest2";
String target_table_name = "sbtest_t";

spark.sql("use "+source_db_name);
String source_sql = "select * from "+source_table_name;
spark.sql(source_sql)
  .write()
  .format("tidb")
  .options(tiOptionMap)
  .option("database", target_db_name)
  .option("table", target_table_name)
  .mode(SaveMode.Append)
  .save();

运行情况

测试1运行状态

总体运行状态:


shuffle数据量情况:
image
核心运行时间在3.3min:

运行时发现有溢出磁盘的情况:
image

测试2/3运行状态

总体运行情况:


shuffle数据情况:
image
核心运行时间在3.2min:

运行时发现有溢出磁盘的情况:

测试对比

2.5.0版本运行时简化了一次判空动作,整体运行时间缩短了0.1min,磁盘溢出比2.4.1版本增多,偶尔会出现问题,对出现的问题总结如下。

常见错误与处理

ConcurrentModificationException

com.pingcap.tikv.exception.TiKVException: Execution exception met.
......
Caused by: java.util.concurrent.ExecutionException: java.util.ConcurrentModificationException
......
Caused by: java.util.ConcurrentModificationException
......

出现上述错误时,需要修改TiSpark源码,修改com.pingcap.tikv.util.ConcreteBackOffer这个类的backOffFunctionMap的定义和初始化代码:

private final ConcurrentHashMap<BackOffFunction.BackOffFuncType,BackOffFunction> backOffFunctionMap;
this.backOffFunctionMap = new ConcurrentHashMap<BackOffFunction.BackOffFuncType, BackOffFunction>();

重新编译TiSpark

mvn clean install -Dmaven.test.skip=true

重新编译Spark,重新运行即可解决。

Missing an output location for shuffle 3

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 3
......

出现上述错误是因为Spark的运行内存不足,导出task失败重试,有很多种调优方式,列举两种:

  1. 对数据进行repartition操作,例如:
spark.sql(source_sql)
  .repartition(50) //数据重分区,减轻内存负担
  .write()
  .format("tidb")
  .options(tiOptionMap)
  .option("database", target_db_name)
  .option("table", target_table_name)
  .mode(SaveMode.Append)
  .save();
  1. 有条件的增加executor资源,例如:
--conf spark.executor.instances=5 \

--conf spark.executor.memory=1G \

迁移总结

整体迁移较容易,需要修改的也不多。2.5.0减少了一个步骤,对shuffle的使用有所增加,容易引起一些shuffle的问题,也能通过调整规避。下一步准备阅读源码对2.5.0的优化作进一步的理解和解读。

3赞

谢谢分享,正好有这个需求

写的很详细,已经收藏,谢谢了