spark向TiDB写入数据时报 Communications link failure during rollback(). Transaction resolution unknown.

【 TiDB 使用环境】生产环境
【 TiDB 版本】v6.1.0
【复现路径】增加 spark 并行执行的 task 数时报错
【遇到的问题:问题现象及影响】我尝试用 spark 解析大小为 30GB ,约 2.7 亿条数据的 csv 文件到 TiDB,目标表有约一亿条数据,多个普通索引及一个唯一索引,写入代码如下

csvDataFrameTime.write.format("org.apache.spark.sql.execution.datasources.jdbc2").options(
                Map(
                    "savemode" -> JDBCSaveMode.Update.toString,
                    "driver" -> "com.mysql.cj.jdbc.Driver",
                    "url" -> (DBConf.TIDB_TEST.get("url").get + dbConfigMap.get("dbName").get + "?rewriteBatchedStatements=true&autoReconnect=true&allowMultiQueries=true&serverTimezone=UTC"),
                    "user" -> DBConf.TIDB_TEST.get("user").get,
                    "password" -> DBConf.TIDB_TEST.get("password").get,
                    "dbtable" -> dbConfigMap.get("tableName").get,
                    "batchsize" -> "10000", # 此处是为了适配这个csv文件,将1000调整为10000后每个task的执行速度快了一倍
                    "useSSL" -> "false",
                    "showSql" -> "true"
                )
            ).save()

当我使用 spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 --master yarn --deploy-mode cluster --executor-cores 1 --num-executors 10 --executor-memory 2GB --conf spark.default.parallelism=3000 --class com.process.Task /dispose/spark/csvProcess 命令执行代码时,程序正常运行,同时执行十个task,速度约为 21万条元数据每小时每task
注:spark.default.parallelism=3000 是为了切分数据以不触发 TiDB 的事务限制和悲观锁报错
为了提高整体执行速度,我尝试分配更多的资源以并行执行更多的 task,例如使用 spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 --master yarn --deploy-mode cluster --executor-cores 2 --num-executors 10 --executor-memory 4GB --conf spark.default.parallelism=3000 --class com.process.Task /dispose/spark/csvProcess 执行,此时并行执行20个task,就会在执行成功十几或几十个task后开始集体报 java.sql.SQLNonTransientConnectionException: Communications link failure during rollback(). Transaction resolution unknown. 导致spark整体失败,我应该怎样才能并行执行更多的 task 而不触发此报错?
【资源配置】进入到 TiDB Dashboard -集群信息 (Cluster Info) -主机(Hosts) 截图此页面
注:下方为20个 16C 32G 的 TiKV

【附件:截图/日志/监控】
spark 报错信息及执行信息


可以尝试把spark 并行执行的任务数调低再跑

题外话,资源好充足,配了5个TiFlash

这个url指向某个tidb?

嗯…可是我的目的就是为了能并行执行更多任务以更快的把这些数据都写进去…调低任务数我测试过是可以正常运行的

类似?实际上是有一个负载均衡,将指向这个url的连接分配到各个tidb

考虑直接写tikv么?或者直接import到tidb,而不是用spark写

倒是希望能尽量避免这样的特殊操作,因为和其他文件的解析流程不一样,然后和解析中需要做的其他操作也不太匹配。不过请问直接写入tikv是怎样实现的?

tikv client java 可以了解一下
https://tikv.github.io/client-java/

不过,你这里没用到 tispark 么?

TiSpark 的写入是直接去写 TiKV,不经过 TiDB 的,它绕过了 TiDB。

1 个赞

是啊,你得找到这样一个平衡点才行,不然并发太高不同task事务之间有冲突就会报错导致程序失败

学到了,我还真没有用到tispark,我去尝试一下,十分感谢

关于 tispark 可以参考这个帖子

哦哦,感谢,我正找哪有资料呢