tispark2.5.0访问tidb可行,tispark3.1及3.0 访问出错

hi,tidb的工程师们
我在使用tidb的过程中遇到tispark特定版本能成功访问tidb的问题,现描述如下
环境
scala 2.12.10
spark 3.1.1
hadoop 2.7.3
tidb 4.0.16
tispark-assembly-2.5.0.jar or tispark-assembly-3.1_2.12-3.1.1.jar

配置
–conf spark.tispark.pd.addresses=${TIDB_PD_ADDRESSES}
–conf spark.sql.extensions=org.apache.spark.sql.TiExtensions
–conf spark.tispark.isolation_read_engines=tikv
–conf spark.sql.catalog.tidb_catalog=org.apache.spark.sql.catalyst.catalog.TiCatalog
–conf spark.sql.catalog.tidb_catalog.pd.addresses=${TIDB_PD_ADDRESSES}

启动 spark-job 连接tidb使用spark sql 进行query
spark sq的作用是做全表某些字段的查询,类似
spark.sql(“select filedList from tidb_catalog.database.table”);

使用 tispark-assembly-2.5.0.jar时,spark job 成功运行,未报错。
使用 tispark-assembly-3.1_2.12-3.1.1.jar时,spark job报错:

22/11/16 05:57:33 ERROR Client: Application diagnostics message: User class threw exception: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:874)
at projectName.common.BaseDataFrameUtility.cacheOnHdfs(BaseDataFrameUtility.scala:12)
at projectName.job.tidbcorrection.DumpTidbJob.execute(DumpTidbJob.scala:51)
at projectName.common.BaseSparkJob.$anonfun$run$1(BaseSparkJob.scala:115)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.util.Try$.apply(Try.scala:213)
at projectName.common.BaseSparkJob.run(BaseSparkJob.scala:115)
at projectName.common.BaseEntryPoint.main(BaseEntryPoint.scala:55)
at projectName.job.tidbcorrection.DumpTidbJob.main(DumpTidbJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:732)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 115 in stage 0.0 failed 16 times, most recent failure: Lost task 115.15 in stage 0.0 (TID 925) (): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.pingcap.tikv.exception.TiClientInternalException: Error reading region:
at com.pingcap.tikv.operation.iterator.DAGIterator.doReadNextRegionChunks(DAGIterator.java:190)
at com.pingcap.tikv.operation.iterator.DAGIterator.readNextRegionChunks(DAGIterator.java:167)
at com.pingcap.tikv.operation.iterator.DAGIterator.hasNext(DAGIterator.java:113)
at org.apache.spark.sql.execution.ColumnarRegionTaskExec$$anon$2.proceedNextBatchTask$1(CoprocessorRDD.scala:359)
at org.apache.spark.sql.execution.ColumnarRegionTaskExec$$anon$2.hasNext(CoprocessorRDD.scala:369)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:277)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
… 9 more
Caused by: java.util.concurrent.ExecutionException: com.pingcap.tikv.exception.RegionTaskException: Handle region task failed:
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.pingcap.tikv.operation.iterator.DAGIterator.doReadNextRegionChunks(DAGIterator.java:185)
… 21 more
Caused by: com.pingcap.tikv.exception.RegionTaskException: Handle region task failed:
at com.pingcap.tikv.operation.iterator.DAGIterator.process(DAGIterator.java:233)
at com.pingcap.tikv.operation.iterator.DAGIterator.lambda$submitTasks$1(DAGIterator.java:91)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
… 3 more
Caused by: com.pingcap.tikv.exception.GrpcException: retry is exhausted.
at com.pingcap.tikv.util.ConcreteBackOffer.doBackOffWithMaxSleep(ConcreteBackOffer.java:153)
at com.pingcap.tikv.util.ConcreteBackOffer.doBackOff(ConcreteBackOffer.java:124)
at com.pingcap.tikv.region.RegionStoreClient.handleCopResponse(RegionStoreClient.java:709)
at com.pingcap.tikv.region.RegionStoreClient.coprocess(RegionStoreClient.java:681)
at com.pingcap.tikv.operation.iterator.DAGIterator.process(DAGIterator.java:220)
… 7 more
Caused by: com.pingcap.tikv.exception.GrpcException: message: “region 12189882 is missing”
region_not_found {
region_id: 12189882
}

    at com.pingcap.tikv.region.RegionStoreClient.handleCopResponse(RegionStoreClient.java:710)
    ... 9 more

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:200)
… 35 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.pingcap.tikv.exception.TiClientInternalException: Error reading region:
at com.pingcap.tikv.operation.iterator.DAGIterator.doReadNextRegionChunks(DAGIterator.java:190)
at com.pingcap.tikv.operation.iterator.DAGIterator.readNextRegionChunks(DAGIterator.java:167)
at com.pingcap.tikv.operation.iterator.DAGIterator.hasNext(DAGIterator.java:113)
at org.apache.spark.sql.execution.ColumnarRegionTaskExec$$anon$2.proceedNextBatchTask$1(CoprocessorRDD.scala:359)
at org.apache.spark.sql.execution.ColumnarRegionTaskExec$$anon$2.hasNext(CoprocessorRDD.scala:369)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:277)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
… 9 more
Caused by: java.util.concurrent.ExecutionException: com.pingcap.tikv.exception.RegionTaskException: Handle region task failed:
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.pingcap.tikv.operation.iterator.DAGIterator.doReadNextRegionChunks(DAGIterator.java:185)
… 21 more
Caused by: com.pingcap.tikv.exception.RegionTaskException: Handle region task failed:
at com.pingcap.tikv.operation.iterator.DAGIterator.process(DAGIterator.java:233)
at com.pingcap.tikv.operation.iterator.DAGIterator.lambda$submitTasks$1(DAGIterator.java:91)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
… 3 more
Caused by: com.pingcap.tikv.exception.GrpcException: retry is exhausted.
at com.pingcap.tikv.util.ConcreteBackOffer.doBackOffWithMaxSleep(ConcreteBackOffer.java:153)
at com.pingcap.tikv.util.ConcreteBackOffer.doBackOff(ConcreteBackOffer.java:124)
at com.pingcap.tikv.region.RegionStoreClient.handleCopResponse(RegionStoreClient.java:709)
at com.pingcap.tikv.region.RegionStoreClient.coprocess(RegionStoreClient.java:681)
at com.pingcap.tikv.operation.iterator.DAGIterator.process(DAGIterator.java:220)
… 7 more
Caused by: com.pingcap.tikv.exception.GrpcException: message: “region 12189882 is missing”
region_not_found {
region_id: 12189882
}

    at com.pingcap.tikv.region.RegionStoreClient.handleCopResponse(RegionStoreClient.java:710)
    ... 9 more

Exception in thread “main” org.apache.spark.SparkException: Application application_1663728370843_4161693 finished with failed status
at org.apache.spark.deploy.yarn.Client.run(Client.scala:1242)
at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1634)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
22/11/16 05:57:33 INFO ShutdownHookManager: Shutdown hook called
22/11/16 05:57:33 INFO ShutdownHookManager: Deleting directory /tmp/spark-tmp/spark-0a8e76c2-ab0c-42aa-b5d2-5fef1837b563
22/11/16 05:57:33 INFO ShutdownHookManager: Deleting directory /tmp/spark-faa69926-0329-4964-b5dc-8b828f3c09a

这个错误是偶发还是必现的呢

这个日志看起来是 driver 端的日志,spark executor 的日志是否有呢。

driver 端打印的日志是重试失败后,打印了最后一次的失败原因

Caused by: com.pingcap.tikv.exception.GrpcException: retry is exhausted.
at com.pingcap.tikv.util.ConcreteBackOffer.doBackOffWithMaxSleep(ConcreteBackOffer.java:153)
at com.pingcap.tikv.util.ConcreteBackOffer.doBackOff(ConcreteBackOffer.java:124)
at com.pingcap.tikv.region.RegionStoreClient.handleCopResponse(RegionStoreClient.java:709)
at com.pingcap.tikv.region.RegionStoreClient.coprocess(RegionStoreClient.java:681)
at com.pingcap.tikv.operation.iterator.DAGIterator.process(DAGIterator.java:220)
… 7 more
Caused by: com.pingcap.tikv.exception.GrpcException: message: “region 12189882 is missing”
region_not_found {
region_id: 12189882
}

中间重试失败的细节应该会在 spark executor 日志中

必现的

你好,我去executor找到的日志如下

22/11/21 20:21:31 WARN RegionStoreClient: Re-splitting region task due to region error:EpochNotMatch current epoch of region 9438746 is conf_ver: 17 version: 3556, but you sent conf_ver: 17 version: 3397
22/11/21 20:21:31 ERROR Utils: Aborting task
com.pingcap.tikv.exception.TiClientInternalException: Error reading region:
	at com.pingcap.tikv.operation.iterator.DAGIterator.doReadNextRegionChunks(DAGIterator.java:190)
	at com.pingcap.tikv.operation.iterator.DAGIterator.readNextRegionChunks(DAGIterator.java:167)
	at com.pingcap.tikv.operation.iterator.DAGIterator.hasNext(DAGIterator.java:113)
	at org.apache.spark.sql.execution.ColumnarRegionTaskExec$$anon$2.proceedNextBatchTask$1(CoprocessorRDD.scala:359)
	at org.apache.spark.sql.execution.ColumnarRegionTaskExec$$anon$2.hasNext(CoprocessorRDD.scala:374)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:277)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: com.pingcap.tikv.exception.RegionTaskException: Handle region task failed:
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at com.pingcap.tikv.operation.iterator.DAGIterator.doReadNextRegionChunks(DAGIterator.java:185)
	... 21 more
Caused by: com.pingcap.tikv.exception.RegionTaskException: Handle region task failed:
	at com.pingcap.tikv.operation.iterator.DAGIterator.process(DAGIterator.java:233)
	at com.pingcap.tikv.operation.iterator.DAGIterator.lambda$submitTasks$1(DAGIterator.java:91)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	... 3 more
Caused by: com.pingcap.tikv.exception.GrpcException: retry is exhausted.
	at com.pingcap.tikv.util.ConcreteBackOffer.doBackOffWithMaxSleep(ConcreteBackOffer.java:153)
	at com.pingcap.tikv.util.ConcreteBackOffer.doBackOff(ConcreteBackOffer.java:124)
	at com.pingcap.tikv.region.RegionStoreClient.handleCopResponse(RegionStoreClient.java:709)
	at com.pingcap.tikv.region.RegionStoreClient.coprocess(RegionStoreClient.java:681)
	at com.pingcap.tikv.operation.iterator.DAGIterator.process(DAGIterator.java:220)
	... 7 more
Caused by: com.pingcap.tikv.exception.GrpcException: message: "EpochNotMatch current epoch of region 9438746 is conf_ver: 17 version: 3556, but you sent conf_ver: 17 version: 3397"
epoch_not_match {
  current_regions {
    id: 9438746
    start_key: "t\200\000\000\000\000\000\000\377/_r\200\000\000\000\002\377\256\377\235\000\000\000\000\000\372"
    end_key: "t\200\000\000\000\000\000\000\377/_r\200\000\000\000\002\377\2755\a\000\000\000\000\000\372"
    region_epoch {
      conf_ver: 17
      version: 3556
    }
    peers {
      id: 9438748
      store_id: 2003
    }
    peers {
      id: 9438750
      store_id: 6947716
    }
    peers {
      id: 10583292
      store_id: 10530118
    }
    peers {
      id: 10614114
      store_id: 10613406
      role: Learner
    }
  }