tispark写入10万记录OK,50万开始报错

【 TiDB 使用环境】测试
【 TiDB 版本】6.5
【复现路径】正在用spark改造现有业务,其中一个场景是spark将计算好的数据写入tidb,数据记录大约在800万左右,每条记录10个字段,spark直接写入tidb需要5分钟;寻思用tispark写入应该更快些,另搭建一个简单的spark standalone集群,一个master,2个worker,spark版本3.3.2,tispark的jar包是tispark-assembly-3.3_2.12-3.1.3.jar,写入同一个tidb测试库。
写入5万记录,用48秒;10万记录,1.2分;当数据量增加到50万时,写入报错了。
刚开始用tispak,不知道是哪里用导致的问题,还望有经验的大侠指点下。

【遇到的问题:问题现象及影响】50万数据,看日志分成了37个task,第20个task 报Error reading region,后续接连有其它任务报同样错误
23/08/25 14:53:36 WARN TaskSetManager: Lost task 20.0 in stage 0.0 (TID 20) (10.120.3.63 executor 1): com.pingcap.tikv.exception.TiClientInternalException: Error reading region:
at com.pingcap.tikv.operation.iterator.DAGIterator.doReadNextRegionChunks(DAGIterator.java:191)
at com.pingcap.tikv.operation.iterator.DAGIterator.readNextRegionChunks(DAGIterator.java:168)
at com.pingcap.tikv.operation.iterator.DAGIterator.hasNext(DAGIterator.java:114)
at org.apache.spark.sql.tispark.TiRowRDD$$anon$1.hasNext(TiRowRDD.scala:70)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:268)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.ExecutionException: com.pingcap.tikv.exception.RegionTaskException: Handle region task failed:
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.pingcap.tikv.operation.iterator.DAGIterator.doReadNextRegionChunks(DAGIterator.java:186)
… 20 more
Caused by: com.pingcap.tikv.exception.RegionTaskException: Handle region task failed:
at com.pingcap.tikv.operation.iterator.DAGIterator.process(DAGIterator.java:243)
at com.pingcap.tikv.operation.iterator.DAGIterator.lambda$submitTasks$1(DAGIterator.java:92)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
… 3 more
Caused by: com.pingcap.tikv.exception.GrpcException: retry is exhausted.
at com.pingcap.tikv.util.ConcreteBackOffer.doBackOffWithMaxSleep(ConcreteBackOffer.java:153)
at com.pingcap.tikv.util.ConcreteBackOffer.doBackOff(ConcreteBackOffer.java:124)
at com.pingcap.tikv.region.RegionStoreClient.handleCopResponse(RegionStoreClient.java:708)
at com.pingcap.tikv.region.RegionStoreClient.coprocess(RegionStoreClient.java:689)
at com.pingcap.tikv.operation.iterator.DAGIterator.process(DAGIterator.java:229)
… 7 more
Caused by: com.pingcap.tikv.exception.GrpcException: TiKV down or Network partition
… 10 more

23/08/25 14:53:36 INFO TaskSetManager: Lost task 18.0 in stage 0.0 (TID 18) on 10.120.3.67, executor 0: com.pingcap.tikv.exception.TiClientInternalException (Error reading region:) [duplicate 1]
23/08/25 14:53:36 INFO TaskSetManager: Starting task 18.1 in stage 0.0 (TID 26) (10.120.3.67, executor 0, partition 18, ANY, 9207 bytes) taskResourceAssignments Map()
【资源配置】进入到 TiDB Dashboard -集群信息 (Cluster Info) -主机(Hosts) 截图此页面

【附件:截图/日志/监控】
tispark写入50万条记录日志.txt (146.7 KB)

tikv状态是正常的么

另外,写入50w是一次写入,还是切batch的方式写入,batch的话 是多大batch

Dataset dataSet = tispark.sqlContext().sql("SELECT ta_account_no, ta_transaction_account_no, pos_date, cust_manager_id, product_code, cust_vol_belong_to_mg, percent, " +
“saler_code,update_time,essential_saler_code,ta_system_code from dcdb.cr_cust_pos_custmg_mapping limit 500000”);
dataSet.write().format(“tidb”).option(“database”, “dcdb”).option(“table”, “cr_cust_pos_custmg_map20230819”).options(tidbOptions).mode(“append”).save();

直接写入,没做其它控制

建议一次5000~1万条的写入方式写入。
另外, 可以尝试给tikv开启压缩试试(https://docs.pingcap.com/zh/tidb/v7.3/tikv-configuration-file#compression-per-level),或者把tikv的region大小调整为256MiB再试一下。

还有一个方案,是给dcdb.cr_cust_pos_custmg_map20230819做一下split,如下:ALTER TABLE t ATTRIBUTES ‘merge_option=deny’;并结合split语法对表进行split(https://docs.pingcap.com/zh/tidb/stable/sql-statement-split-region);

多谢,我试试。
还有个疑问,看过tispark的写入原理,要经过数据预处理、预分区、两阶段提交等步骤,分批次写入是否会造成总时间过长?还有目前条件下,tispark写入没有展现出很大优势,在什么条件下,用tispark写入性能会有明显提升呢?

tispark直接写入tikv其实是会快的,只是tidb需要注意一个点是比较大batch写入时候,容易发生region split进而导致类似region is unavailable的报错。注意注意以下几点:
1、tikv的磁盘使用nvme ssd;
2、可以适当给写入的表提前做一下split;
3、region大小改大一些;
4、开启tikv的压缩 (zstd)
compression-per-level

  • 每一层默认压缩算法。

  • defaultcf 的默认值:[“no”, “no”, “lz4”, “lz4”, “lz4”, “zstd”, “zstd”]

  • writecf 的默认值:[“no”, “no”, “lz4”, “lz4”, “lz4”, “zstd”, “zstd”]

  • lockcf 的默认值:[“no”, “no”, “no”, “no”, “no”, “no”, “no”]

参考:https://docs.pingcap.com/zh/tidb/stable/tikv-configuration-file#compression-per-level

配置文件开启一下压缩

[rocksdb.defaultcf] compression-per-level = [“no”, “no”, “lz4”, “lz4”, “lz4”, “zstd”, “zstd”]

改成 compression-per-level = [“no”, “no”, “zstd”, “zstd”, “zstd”, “zstd”, “zstd”]。

多谢解答,我去试试

好的 试完之后,希望你也能在这里分享一下您的经验

我问了一下,使用建议是这样的:
1、读取建议tispark直接从tikv或者tiflash读;
2、写入,batch比较大的话,建议通过jdbc方式,拆batch写入
val customer = spark.sql(“select * from customer limit 100000”)
// 为了平衡各节点以及提高并发数,你可以将数据源重新分区
val df = customer.repartition(32)
df.write
.mode(saveMode = “append”)
.format(“jdbc”)
.option(“driver”, “com.mysql.jdbc.Driver”)
// 替换为你的主机名和端口地址,并确保开启了重写批处理
.option(“url”, “jdbc:mysql://127.0.0.1:4000/test?rewriteBatchedStatements=true”)
.option(“useSSL”, “false”)
// 作为测试,建议设置为 150
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 150)
.option(“dbtable”, s"cust_test_select") // 数据库名和表名
.option(“isolationLevel”, “NONE”) // 如果需要写入较大 Dataframe,推荐将 isolationLevel 设置为 NONE
.option(“user”, “root”) // TiDB 用户名
.save()

看上去是资源问题,建议拆batch写入

rdd没找到特别好的拆分方法,spark sql尝试了下,不支持limit offset,单纯limit没法将一个完整数据集拆分成多个不重复不漏的子集,有什么好的建议么?感觉目前用tispark存储百万级数据到tidb,和自己当初设想的差距有点大。