tispark查询表报错,出现热点

【 TiDB 使用环境`】生产环境
【 TiDB 版本】5.1.4
【问题现象及影响】
4节点的tikv,在使用tispark扫描一张表的时候,出现一个节点的CPU很高,其他kv节点cpu没有变化,然后查询失败。
数据分布情况:
total leader count: 2486
store: 3, num_leaders: 515, percentage: 20.72%
store: 10, num_leaders: 655, percentage: 26.35%
store: 1, num_leaders: 682, percentage: 27.43%
store: 2, num_leaders: 634, percentage: 25.50%
[RECORD - bio_dwd.dwd_bio_sequence_patent] - Peers Distribution:
total peers count: 7458
store: 2, num_peers: 1919, percentage: 25.73%
store: 3, num_peers: 1837, percentage: 24.63%
store: 1, num_peers: 1673, percentage: 22.43%
store: 10, num_peers: 2029, percentage: 27.21%
[INDEX - PRIMARY] - Leaders Distribution:
total leader count: 1154
store: 3, num_leaders: 288, percentage: 24.96%
store: 2, num_leaders: 281, percentage: 24.35%
store: 10, num_leaders: 293, percentage: 25.39%
store: 1, num_leaders: 292, percentage: 25.30%
[INDEX - PRIMARY] - Peers Distribution:
total peers count: 3462
store: 2, num_peers: 882, percentage: 25.48%
store: 3, num_peers: 775, percentage: 22.39%
store: 10, num_peers: 920, percentage: 26.57%
store: 1, num_peers: 885, percentage: 25.56%

报错信息:
dw Notify->Spark Application Failed,error is org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:195)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)

at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)

at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)

at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)

at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)

at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)

at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)

at com.patsnap.dw.spark.job.InitTidbSparkJob.main(InitTidbSparkJob.java:194)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(Delegati ngMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:735)

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 55) (ip-10-11-22-32.ec2.internal executor 1): com.pingcap.tikv.exception.TiClientInternalException: Error reading region:

at com.pingcap.tikv.operation.iterator.DAGIterator.doReadNextRegionChunks(DAGIterator.java:189)

at com.pingcap.tikv.operation.iterator.DAGIterator.readNextRegionChunks(DAGIterator.java:166)

at com.pingcap.tikv.operation.iterator.DAGIterator.hasNext(DAGIterator.java:112)

at org.apache.spark.sql.tispark.TiRowRDD$$anon$1.hasNext(TiRowRDD.scala:69)

at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)

at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)

at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:265)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)

at org.apache.spark.scheduler.Task.run(Task.scala:131)

at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)

at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:750)

Caused by: java.util.concurrent.ExecutionException: com.pingcap.tikv.exception.RegionTaskException: Handle region task failed:

at java.util.concurrent.FutureTask.report(FutureTask.java:122)

at java.util.concurrent.FutureTask.get(FutureTask.java:192)

at com.pingcap.tikv.operation.iterator.DAGIterator.doReadNextRegionChunks(DAGIterator.java:184)

… 17 more

Caused by: com.pingcap.tikv.exception.RegionTaskException: Handle region task failed:

at com.pingcap.tikv.operation.iterator.DAGIterator.process(DAGIterator.java:232)

at com.pingcap.tikv.operation.iterator.DAGIterator.lambda$submitTasks$1(DAGIterator.java:90)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

… 3 more

Caused by: com.pingcap.tikv.exception.GrpcException: Coprocessor task terminated due to exceeding the deadline

at com.pingcap.tikv.region.RegionStoreClient.handleCopResponse(RegionStoreClient.java:728)

at com.pingcap.tikv.region.RegionStoreClient.coprocess(RegionStoreClient.java:675)

at com.pingcap.tikv.operation.iterator.DAGIterator.process(DAGIterator.java:219)

… 7 more

Driver stacktrace:

at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465)

at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414)

at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413)

at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)

at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)

at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413)

at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)

at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)

at scala.Option.foreach(Option.scala:407)

at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)

at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:200)

… 33 more

Caused by: com.pingcap.tikv.exception.TiClientInternalException: Error reading region:

at com.pingcap.tikv.operation.iterator.DAGIterator.doReadNextRegionChunks(DAGIterator.java:189)

at com.pingcap.tikv.operation.iterator.DAGIterator.readNextRegionChunks(DAGIterator.java:166)

at com.pingcap.tikv.operation.iterator.DAGIterator.hasNext(DAGIterator.java:112)

at org.apache.spark.sql.tispark.TiRowRDD$$anon$1.hasNext(TiRowRDD.scala:69)

at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)

at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)

at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:265)

at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)

at org.apache.spark.scheduler.Task.run(Task.scala:131)

at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)

at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:750)

Caused by: java.util.concurrent.ExecutionException: com.pingcap.tikv.exception.RegionTaskException: Handle region task failed:

at java.util.concurrent.FutureTask.report(FutureTask.java:122)

at java.util.concurrent.FutureTask.get(FutureTask.java:192)

at com.pingcap.tikv.operation.iterator.DAGIterator.doReadNextRegionChunks(DAGIterator.java:184)

… 17 more

Caused by: com.pingcap.tikv.exception.RegionTaskException: Handle region task failed:

at com.pingcap.tikv.operation.iterator.DAGIterator.process(DAGIterator.java:232)

at com.pingcap.tikv.operation.iterator.DAGIterator.lambda$submitTasks$1(DAGIterator.java:90)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

… 3 more

Caused by: com.pingcap.tikv.exception.GrpcException: Coprocessor task terminated due to exceeding the deadline

at com.pingcap.tikv.region.RegionStoreClient.handleCopResponse(RegionStoreClient.java:728)

at com.pingcap.tikv.region.RegionStoreClient.coprocess(RegionStoreClient.java:675)

at com.pingcap.tikv.operation.iterator.DAGIterator.process(DAGIterator.java:219)

… 7 more

【附件】

请提供各个组件的 version 信息,如 cdc/tikv,可通过执行 cdc version/tikv-server --version 获取。

是固定可以重现么?
如果是固定会重现,建议把数据进行合理的打散,避免热点读…

是固定重现,现在的数据已经很散了,我上面有数据的分布。
执行的语句很简单就是select * from table_name;

spark 和 tispark 什么版本?

select * from table_name; 用 mysql client,跑个 EXPLAIN table_name 的执行计划看看

执行计划:
±----------------------±-------------±----------±------------------------------±---------------------+
| id | estRows | task | access object | operator info |
±----------------------±-------------±----------±------------------------------±---------------------+
| TableReader_5 | 814637450.00 | root | | data:TableFullScan_4 |
| └─TableFullScan_4 | 814637450.00 | cop[tikv] | table:d | keep order:false |
±----------------------±-------------±----------±------------------------------±---------------------+

spark是2.4.6, tispark是2.4.1

没理解这个问题是什么?因为热点问题确实会使 tispark job abort。 可以打散、开调度之类的方式解决。

该主题在最后一个回复创建后60天后自动关闭。不再允许新的回复。