spark-sql 执行 delete 报错

spark v3.3.4 + tispark v3.2.3 + tidb v6.5.8 + java version “1.8.0_391”

spark-sql 执行 delete emp 表报错,看起来是 datetime 列导致的

spark-sql> delete from emp where empno=111;
24/04/01 12:02:37 WARN SimpleFunctionRegistry: The function ti_version replaced a previously registered function.
24/04/01 12:02:37 WARN SimpleFunctionRegistry: The function time_to_str replaced a previously registered function.
24/04/01 12:02:37 WARN SimpleFunctionRegistry: The function str_to_time replaced a previously registered function.
24/04/01 12:02:37 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2) (10.2.103.129 executor 0): org.tikv.common.exception.TiDBConvertException: convert to tidb data error for column 'hiredate'
	at com.pingcap.tispark.utils.WriteUtil$.$anonfun$sparkRow2TiKVRow$2(WriteUtil.scala:71)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
	at com.pingcap.tispark.utils.WriteUtil$.sparkRow2TiKVRow(WriteUtil.scala:58)
	at com.pingcap.tispark.write.TiDBDelete.$anonfun$delete$4(TiDBDelete.scala:92)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:169)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.tikv.common.exception.ConvertNotSupportException: do not support converting from java.time.Instant to  com.pingcap.tikv.types.DateTimeType
	at com.pingcap.tikv.types.AbstractDateTimeType.convertToMysqlDateTime(AbstractDateTimeType.java:134)
	at com.pingcap.tikv.types.DateTimeType.doConvertToTiDBType(DateTimeType.java:53)
	at com.pingcap.tikv.types.DataType.convertToTiDBType(DataType.java:399)
	at com.pingcap.tispark.utils.WriteUtil$.$anonfun$sparkRow2TiKVRow$2(WriteUtil.scala:64)
	... 15 more

24/04/01 12:02:38 ERROR TaskSetManager: Task 0 in stage 2.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 5) (10.2.103.129 executor 0): org.tikv.common.exception.TiDBConvertException: convert to tidb data error for column 'hiredate'
	at com.pingcap.tispark.utils.WriteUtil$.$anonfun$sparkRow2TiKVRow$2(WriteUtil.scala:71)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
	at com.pingcap.tispark.utils.WriteUtil$.sparkRow2TiKVRow(WriteUtil.scala:58)
	at com.pingcap.tispark.write.TiDBDelete.$anonfun$delete$4(TiDBDelete.scala:92)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:169)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.tikv.common.exception.ConvertNotSupportException: do not support converting from java.time.Instant to  com.pingcap.tikv.types.DateTimeType
	at com.pingcap.tikv.types.AbstractDateTimeType.convertToMysqlDateTime(AbstractDateTimeType.java:134)
	at com.pingcap.tikv.types.DateTimeType.doConvertToTiDBType(DateTimeType.java:53)
	at com.pingcap.tikv.types.DataType.convertToTiDBType(DataType.java:399)
	at com.pingcap.tispark.utils.WriteUtil$.$anonfun$sparkRow2TiKVRow$2(WriteUtil.scala:64)
	... 15 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2668)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2604)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2603)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2603)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1178)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1178)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1178)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2798)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2787)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
	at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1470)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.take(RDD.scala:1443)
	at com.pingcap.tispark.write.TiDBDelete.delete(TiDBDelete.scala:138)
	at com.pingcap.tispark.v2.TiDBTable.deleteWhere(TiDBTable.scala:179)
	at org.apache.spark.sql.execution.datasources.v2.DeleteFromTableExec.run(DeleteFromTableExec.scala:31)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
	at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:67)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:384)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:504)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:498)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:498)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:286)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:984)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:191)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:214)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1072)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1081)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.tikv.common.exception.TiDBConvertException: convert to tidb data error for column 'hiredate'
	at com.pingcap.tispark.utils.WriteUtil$.$anonfun$sparkRow2TiKVRow$2(WriteUtil.scala:71)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
	at com.pingcap.tispark.utils.WriteUtil$.sparkRow2TiKVRow(WriteUtil.scala:58)
	at com.pingcap.tispark.write.TiDBDelete.$anonfun$delete$4(TiDBDelete.scala:92)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:169)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.tikv.common.exception.ConvertNotSupportException: do not support converting from java.time.Instant to  com.pingcap.tikv.types.DateTimeType
	at com.pingcap.tikv.types.AbstractDateTimeType.convertToMysqlDateTime(AbstractDateTimeType.java:134)
	at com.pingcap.tikv.types.DateTimeType.doConvertToTiDBType(DateTimeType.java:53)
	at com.pingcap.tikv.types.DataType.convertToTiDBType(DataType.java:399)
	at com.pingcap.tispark.utils.WriteUtil$.$anonfun$sparkRow2TiKVRow$2(WriteUtil.scala:64)
	... 15 more

简单的表可以 delete

spark-sql> delete from t11 where id=1;
24/04/01 12:05:06 WARN SimpleFunctionRegistry: The function ti_version replaced a previously registered function.
24/04/01 12:05:06 WARN SimpleFunctionRegistry: The function time_to_str replaced a previously registered function.
24/04/01 12:05:06 WARN SimpleFunctionRegistry: The function str_to_time replaced a previously registered function.
Time taken: 4.251 seconds
spark-sql> 
1 个赞

确实没有对java.time.Instant 类型做处理。
不过感觉还是和运行环境的版本可能有点关系。
可以考虑把jdk版本信息也加上。可能更容易复现。

1 个赞

已加上

1 个赞

如何破局

It should be fixed in this pr Support localdate convert to date in datetype by shiyuhang0 · Pull Request #2780 · pingcap/tispark · GitHub

1 个赞