tispark执行任务报错:com.pingcap.tikv.exception.KeyException: unexpected key error meets and it is commit_ts_expired

为提高效率,请提供以下信息,问题描述清晰能够更快得到解决:

【 TiDB 版本】
tidb v5.1.0 + tispark v2.4.0

【问题】
使用tispark写入数据到tidb,数据量不大,只有2w多行,每天定时执行,偶尔会出现如下错误,导致spark任务一直运行不完,重试任务会成功。

[main] WARN KVErrorHandler: Unable to handle KeyExceptions other than LockException
com.pingcap.tikv.exception.KeyException: unexpected key error meets and it is commit_ts_expired {
start_ts: 429160315151712272
attempted_commit_ts: 429160320905773116
key: “t\200\000\000\000\000\000\000M_r\200\000\000\000\000\000=\362”
min_commit_ts: 429160320905773118
}

at com.pingcap.tikv.txn.AbstractLockResolverClient.extractLockFromKeyErr(AbstractLockResolverClient.java:68)
at com.pingcap.tikv.operation.KVErrorHandler.handleResponseError(KVErrorHandler.java:287)
at com.pingcap.tikv.policy.RetryPolicy.callWithRetry(RetryPolicy.java:65)
at com.pingcap.tikv.AbstractGRPCClient.callWithRetry(AbstractGRPCClient.java:77)
at com.pingcap.tikv.region.RegionStoreClient.commit(RegionStoreClient.java:594)
at com.pingcap.tikv.txn.TxnKVClient.commit(TxnKVClient.java:158)
at com.pingcap.tikv.TwoPhaseCommitter.doCommitPrimaryKeyWithRetry(TwoPhaseCommitter.java:211)
at com.pingcap.tikv.TwoPhaseCommitter.commitPrimaryKey(TwoPhaseCommitter.java:199)
at com.pingcap.tispark.write.TiBatchWrite.commitPrimaryKey(TiBatchWrite.scala:424)
at com.pingcap.tispark.write.TiBatchWrite.commitPrimaryKeyWithRetry(TiBatchWrite.scala:378)
at com.pingcap.tispark.write.TiBatchWrite.doWrite(TiBatchWrite.scala:311)
at com.pingcap.tispark.write.TiBatchWrite.com$pingcap$tispark$write$TiBatchWrite$$write(TiBatchWrite.scala:88)
at com.pingcap.tispark.write.TiBatchWrite$.write(TiBatchWrite.scala:45)
at com.pingcap.tispark.write.TiDBWriter$.write(TiDBWriter.scala:40)
at com.pingcap.tispark.TiDBDataSource.createRelation(TiDBDataSource.scala:57)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:137)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:169)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:166)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:133)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:109)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:107)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:81)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:128)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:76)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:305)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:291)
at com.jd.ads_report.tispark.UnionCategoryTiSpark$.main(UnionCategoryTiSpark.scala:44)
at com.jd.ads_report.tispark.UnionCategoryTiSpark.main(UnionCategoryTiSpark.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:935)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1010)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1020)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Exception in thread “main” com.pingcap.tikv.exception.TiBatchWriteException: commit primary key error
at com.pingcap.tikv.TwoPhaseCommitter.doCommitPrimaryKeyWithRetry(TwoPhaseCommitter.java:215)
at com.pingcap.tikv.TwoPhaseCommitter.commitPrimaryKey(TwoPhaseCommitter.java:199)
at com.pingcap.tispark.write.TiBatchWrite.commitPrimaryKey(TiBatchWrite.scala:424)
at com.pingcap.tispark.write.TiBatchWrite.commitPrimaryKeyWithRetry(TiBatchWrite.scala:378)
at com.pingcap.tispark.write.TiBatchWrite.doWrite(TiBatchWrite.scala:311)
at com.pingcap.tispark.write.TiBatchWrite.com$pingcap$tispark$write$TiBatchWrite$$write(TiBatchWrite.scala:88)
at com.pingcap.tispark.write.TiBatchWrite$.write(TiBatchWrite.scala:45)
at com.pingcap.tispark.write.TiDBWriter$.write(TiDBWriter.scala:40)
at com.pingcap.tispark.TiDBDataSource.createRelation(TiDBDataSource.scala:57)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:137)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:169)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:166)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:133)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:109)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:107)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:696)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:81)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:128)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:76)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:696)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:305)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:291)
at com.jd.ads_report.tispark.UnionCategoryTiSpark$.main(UnionCategoryTiSpark.scala:44)
at com.jd.ads_report.tispark.UnionCategoryTiSpark.main(UnionCategoryTiSpark.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:935)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1010)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1020)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: com.pingcap.tikv.exception.KeyException: Key exception occurred and the reason is commit_ts_expired {
start_ts: 429160315151712272
attempted_commit_ts: 429160320905773116
key: “t\200\000\000\000\000\000\000M_r\200\000\000\000\000\000=\362”
min_commit_ts: 429160320905773118
}

at com.pingcap.tikv.region.RegionStoreClient.handleCommitResponse(RegionStoreClient.java:618)
at com.pingcap.tikv.region.RegionStoreClient.commit(RegionStoreClient.java:595)
at com.pingcap.tikv.txn.TxnKVClient.commit(TxnKVClient.java:158)
at com.pingcap.tikv.TwoPhaseCommitter.doCommitPrimaryKeyWithRetry(TwoPhaseCommitter.java:211)
… 42 more

Hi ,需要提供一下 PD log 、TiKV log我们再确认一下,如果有可以尽量提供一下完整的 Spark 日志,尤其是前后文的内容,我们需要排查一下。

每台机器的log都需要吗?

最好提供一下 Tispark 的日志看看哈。

561364662.log (89.8 KB)

看起来 tispark 写入的锁被 tidb gc 清除了,可以调大 tidb 的 gc interval

https://docs.pingcap.com/tidb/stable/garbage-collection-configuration

好的,调大了些,再观察几天看看