structured-streaming程序运行异常

为提高效率,提问时请提供以下信息,问题描述清晰可优先响应。

  • 【TiDB 版本】:3.0.5
  • 【问题描述】: structured-streaming程序,运行一段时间后报错。 中间有中断过程,将任务中断重新创建
remain#11561,batch_stall_remain_money#11562,batch_busi_remain_money#11563,batch_remain_money#11564,... 24 more fields] TiDBRelation(com.pingcap.tikv.TiSession@5a8c1d27,TiTableReference(h3-bigdata,r_stock_log_pre,841432),com.pingcap.tispark.MetaManager@207abd66,null)

	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: com.pingcap.tikv.exception.GrpcException: shade.io.grpc.StatusRuntimeException: CANCELLED
	at com.pingcap.tikv.policy.RetryPolicy.rethrowNotRecoverableException(RetryPolicy.java:46)
	at com.pingcap.tikv.policy.RetryPolicy.callWithRetry(RetryPolicy.java:56)
	at com.pingcap.tikv.AbstractGRPCClient.callWithRetry(AbstractGRPCClient.java:63)
	at com.pingcap.tikv.PDClient.getTimestamp(PDClient.java:95)
	at com.pingcap.tikv.TiSession.getTimestamp(TiSession.java:64)
	at org.apache.spark.sql.TiStrategy.apply(TiStrategy.scala:131)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:63)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$4.apply(MicroBatchExecution.scala:528)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$4.apply(MicroBatchExecution.scala:519)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:519)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
	... 1 more
Caused by: shade.io.grpc.StatusRuntimeException: CANCELLED
	at shade.io.grpc.Status.asRuntimeException(Status.java:517)
	at shade.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:121)
	at com.pingcap.tikv.AbstractGRPCClient.lambda$callWithRetry$0(AbstractGRPCClient.java:66)
	at com.pingcap.tikv.policy.RetryPolicy.callWithRetry(RetryPolicy.java:54)
	... 29 more
Caused by: java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1220)
	at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:439)
	at shade.io.grpc.stub.ClientCalls$ThreadlessExecutor.waitAndDrain(ClientCalls.java:589)
	at shade.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:118)
	... 31 more
org.apache.spark.sql.streaming.StreamingQueryException: shade.io.grpc.StatusRuntimeException: CANCELLED
=== Streaming Query ===
Identifier: [id = ae70687b-47e4-4b65-b87b-66ade6660e30, runId = ef4cfa5e-36b9-435d-9f97-278bfe0dcdd7]
Current Committed Offsets: {}
Current Available Offsets: {KafkaV2[Subscribe[kafka_binlog]]: {"kafka_binlog":{"0":1535894}}}

Current State: TERMINATED
Thread State: RUNNABLE

若提问为性能优化、故障排查类问题,请下载脚本运行。终端输出的打印结果,请务必全选并复制粘贴上传。

您好,能否提供完整的报错信息? 另外,如果方便最好提供一下streaming的代码

streaming的代码依赖于环境,根据数据创建的流。直接启动的,没有日志。。 周末有时间我复现一下问题然后记录日志和参数,做一个demo

好的,感谢后续反馈