tispark执行spark application的时候报错 Failed to init client for PD cluster

为提高效率,提问时请提供以下信息,问题描述清晰可优先响应。

  • 【TiDB 版本】:3.0.12
  • 【问题描述】:
    使用场景:
    阿里大数据平台dataworks中集成maxcompute。maxcompute中中支持使用spark开发。
    由于spark是集成封装的不能操作spark-deflauts.conf文件直接在启动参数–conf中指定了
spark.tispark.pd.addresses $your_pd_servers
spark.sql.extensions org.apache.spark.sql.TiExtensions

并引用对应的tispark.jar。测试运行报错

20/06/09 15:14:48 ERROR ApplicationMaster: User class threw exception: java.lang.NullPointerException: Failed to init client for PD cluster.
java.lang.NullPointerException: Failed to init client for PD cluster.
        at shade.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:228)
        at com.pingcap.tikv.PDClient.initCluster(PDClient.java:368)
        at com.pingcap.tikv.PDClient.createRaw(PDClient.java:395)
        at com.pingcap.tikv.TiSession.getPDClient(TiSession.java:80)
        at com.pingcap.tikv.TiSession.<init>(TiSession.java:49)
        at com.pingcap.tikv.TiSession.create(TiSession.java:147)
        at org.apache.spark.sql.TiContext.<init>(TiContext.scala:43)
        at org.apache.spark.sql.TiExtensions.getOrCreateTiContext(TiExtensions.scala:15)
        at org.apache.spark.sql.TiExtensions$$anonfun$apply$5.apply(TiExtensions.scala:24)
        at org.apache.spark.sql.TiExtensions$$anonfun$apply$5.apply(TiExtensions.scala:24)
        at org.apache.spark.sql.extensions.TiResolutionRule.<init>(rules.scala:31)
        at org.apache.spark.sql.TiExtensions$$anonfun$apply$6.apply(TiExtensions.scala:24)
        at org.apache.spark.sql.TiExtensions$$anonfun$apply$6.apply(TiExtensions.scala:24)
        at org.apache.spark.sql.SparkSessionExtensions$$anonfun$buildResolutionRules$1.apply(SparkSessionExtensions.scala:75)
        at org.apache.spark.sql.SparkSessionExtensions$$anonfun$buildResolutionRules$1.apply(SparkSessionExtensions.scala:75)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.SparkSessionExtensions.buildResolutionRules(SparkSessionExtensions.scala:75)
        at org.apache.spark.sql.internal.BaseSessionStateBuilder.customResolutionRules(BaseSessionStateBuilder.scala:183)
        at org.apache.spark.sql.odps.OdpsSessionStateBuilder$$anon$1.<init>(OdpsSessionStateBuilder.scala:67)
        at org.apache.spark.sql.odps.OdpsSessionStateBuilder.analyzer(OdpsSessionStateBuilder.scala:62)
        at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293)
        at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293)
        at org.apache.spark.sql.internal.SessionState.analyzer$lzycompute(SessionState.scala:79)
        at org.apache.spark.sql.internal.SessionState.analyzer(SessionState.scala:79)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:58)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:56)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638)
        at com.hydee.dataworks.SparkApplication$.main(SparkApplication.scala:50)
        at com.hydee.dataworks.SparkApplication.main(SparkApplication.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:708)
, stdout content: 
        at com.aliyun.odps.cupid.CupidUtil.errMsg2SparkException(CupidUtil.java:43)
        at com.aliyun.odps.cupid.CupidUtil.getResult(CupidUtil.java:123)
        at com.aliyun.odps.cupid.requestcupid.YarnClientImplUtil.pollAMStatus(YarnClientImplUtil.java:107)
        at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.applicationReportTransform(YarnClientImpl.java:340)
        ... 15 more
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at com.aliyun.odps.SubmitJob.main(SubmitJob.java:74)
Caused by: org.apache.spark.SparkException: Application application_1591686825162_922794149 finished with failed status
        at org.apache.spark.deploy.yarn.Client.run(Client.scala:1185)
        at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1542)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:881)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
        ... 5 more
1 个赞

@zhengyunpeng 您好,请确认一下 $your_pd_servers 中填写的 pd 地址格式无误,应为逗号分隔的 pd-address:pd-port 字符串。另外,可以确认一下 Spark 集群与 TiDB/PD/TiKV 集群之间是否端口连通。

只能访问4000端口,pd的2379和tikv的tikv的端口都不能访问?tispark写入tikv数据时使用的是20180端口吗?开通所有pd的2379和所有tikv的20180端口吗?

您好,是的,请打开这些端口,保证 Spark 可以访问集群。

TiSpark 需要连接 PD 以获取集群信息,并需要连接 TiKV 以发送相应的查询。

如果使用了 TiSpark 的写入功能,还需要额外打开 TiDB 的 10080 端口来获取 DDL 信息。

好的,谢谢

端口开放了,但是还是报错。好像是一些jar版本冲突。

20/06/09 18:08:05 ERROR ApplicationMaster: User class threw exception: java.lang.NullPointerException: Failed to init client for PD cluster.
java.lang.NullPointerException: Failed to init client for PD cluster.
        at shade.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:228)
        at com.pingcap.tikv.PDClient.initCluster(PDClient.java:368)
        at com.pingcap.tikv.PDClient.createRaw(PDClient.java:395)
        at com.pingcap.tikv.TiSession.getPDClient(TiSession.java:80)
        at com.pingcap.tikv.TiSession.<init>(TiSession.java:49)
        at com.pingcap.tikv.TiSession.create(TiSession.java:147)
        at org.apache.spark.sql.TiContext.<init>(TiContext.scala:43)
        at org.apache.spark.sql.TiExtensions.getOrCreateTiContext(TiExtensions.scala:15)
        at org.apache.spark.sql.TiExtensions$$anonfun$apply$5.apply(TiExtensions.scala:24)
        at org.apache.spark.sql.TiExtensions$$anonfun$apply$5.apply(TiExtensions.scala:24)
        at org.apache.spark.sql.extensions.TiResolutionRule.<init>(rules.scala:31)
        at org.apache.spark.sql.TiExtensions$$anonfun$apply$6.apply(TiExtensions.scala:24)
        at org.apache.spark.sql.TiExtensions$$anonfun$apply$6.apply(TiExtensions.scala:24)
        at org.apache.spark.sql.SparkSessionExtensions$$anonfun$buildResolutionRules$1.apply(SparkSessionExtensions.scala:75)
        at org.apache.spark.sql.SparkSessionExtensions$$anonfun$buildResolutionRules$1.apply(SparkSessionExtensions.scala:75)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.SparkSessionExtensions.buildResolutionRules(SparkSessionExtensions.scala:75)
        at org.apache.spark.sql.internal.BaseSessionStateBuilder.customResolutionRules(BaseSessionStateBuilder.scala:183)
        at org.apache.spark.sql.odps.OdpsSessionStateBuilder$$anon$1.<init>(OdpsSessionStateBuilder.scala:67)
        at org.apache.spark.sql.odps.OdpsSessionStateBuilder.analyzer(OdpsSessionStateBuilder.scala:62)
        at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293)
        at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293)
        at org.apache.spark.sql.internal.SessionState.analyzer$lzycompute(SessionState.scala:79)
        at org.apache.spark.sql.internal.SessionState.analyzer(SessionState.scala:79)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:58)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:56)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638)
        at com.hydee.dataworks.SparkApplication$.main(SparkApplication.scala:50)
        at com.hydee.dataworks.SparkApplication.main(SparkApplication.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:708)
, stdout content: 
        at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.applicationReportTransform(YarnClientImpl.java:375)
        at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getApplicationReport(YarnClientImpl.java:570)
        at org.apache.spark.deploy.yarn.Client.getApplicationReport(Client.scala:301)
        at org.apache.spark.deploy.yarn.Client.monitorApplication(Client.scala:1079)
        at org.apache.spark.deploy.yarn.Client.run(Client.scala:1182)
        at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1542)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:881)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at com.aliyun.odps.SubmitJob.main(SubmitJob.java:74)
Caused by: com.aliyun.odps.cupid.CupidException: subprocess exit: 3840, stderr content: SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/cupid/cupid_jar/cupid-runtime-1.0.0-shaded.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/apsara/odps/sharedcache/archive/5EDE031535E81B7A0E02DCF7_c4de6b91882df23ece57af2f68c14771.zip/files/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

可以看看 log 里是否有 failed to get member from pd server 的报错。如果没有的话,你的 PD 配置应该没有生效。需要注意一下 TiSpark 的配置文件必须在 Spark 启动时设置,这是由于使用了 Spark 的 Extensions 特性,在生成 SparkContext 阶段就需要传入 TiExtensions。

可以用下面的方法简单验证:使用 spark-shell 不连接 Spark master 而单独使用 local 模式连接 tidb 集群,如果可以正常工作,那就是因为上述原因。

解决方法是在 Spark 集群启动时就传入参数,而不是在 Application 启动时才传入。

好的,收到。目前没有打印 failed to get member from pd server

阿里的dataworks集群不提供spark-shell。而且他只提供spark服务,不能启动或者停止spark,没法在Spark 启动时设置。

明白了,那你知道 Spark 集群的版本么?

如果是 2.3/2.4 并且没有被魔改的话,可以尝试手动生成一个 spark context。如:

val sc = SparkContext.getActive();
sc.stop();
val conf = new SparkConf(false);
conf.set(".....", "....")
sc = SparkContext.getOrCreate(conf)
.....

TiSpark 目前仅支持官方的 2.3+ 版本,并将随官方发布支持 3.0 版本,魔改的 Spark 不能保证兼容性。

谢谢。具体版本不清楚,我先试一下。

好的,如果 Spark 提供 shell 接口,可以用 sc.version 来确认版本

没有提供shell接口。failed to get member from pd server出现了。我没有手动创建spark context。我把tispark的包打进我的代码里了,把冲突的jar排除了。有报错这个。

20/06/09 19:25:06 WARN PDClient: failed to get member from pd server.
java.lang.NullPointerException: target
        at shade.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:906)
        at shade.io.grpc.internal.AbstractManagedChannelImplBuilder.<init>(AbstractManagedChannelImplBuilder.java:174)
        at shade.io.grpc.netty.NettyChannelBuilder.<init>(NettyChannelBuilder.java:121)
        at shade.io.grpc.netty.NettyChannelBuilder.<init>(NettyChannelBuilder.java:116)
        at shade.io.grpc.netty.NettyChannelBuilder.forAddress(NettyChannelBuilder.java:102)
        at shade.io.grpc.netty.NettyChannelProvider.builderForAddress(NettyChannelProvider.java:37)
        at shade.io.grpc.netty.NettyChannelProvider.builderForAddress(NettyChannelProvider.java:23)
        at shade.io.grpc.ManagedChannelBuilder.forAddress(ManagedChannelBuilder.java:37)
        at com.pingcap.tikv.util.ChannelFactory.lambda$getChannel$0(ChannelFactory.java:45)
        at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660)
        at com.pingcap.tikv.util.ChannelFactory.getChannel(ChannelFactory.java:34)
        at com.pingcap.tikv.PDClient.getMembers(PDClient.java:242)
        at com.pingcap.tikv.PDClient.initCluster(PDClient.java:340)
        at com.pingcap.tikv.PDClient.<init>(PDClient.java:331)
        at com.pingcap.tikv.PDClient.createRaw(PDClient.java:369)
        at com.pingcap.tikv.TiSession.getPDClient(TiSession.java:114)
        at com.pingcap.tikv.TiSession.getTimestamp(TiSession.java:98)
        at com.pingcap.tikv.TiSession.createSnapshot(TiSession.java:102)
        at com.pingcap.tikv.catalog.Catalog.<init>(Catalog.java:140)
        at com.pingcap.tikv.TiSession.getCatalog(TiSession.java:133)
        at org.apache.spark.sql.TiContext.<init>(TiContext.scala:47)
        at org.apache.spark.sql.TiExtensions.getOrCreateTiContext(TiExtensions.scala:10)
        at org.apache.spark.sql.TiExtensions$$anonfun$apply$5.apply(TiExtensions.scala:18)
        at org.apache.spark.sql.TiExtensions$$anonfun$apply$5.apply(TiExtensions.scala:18)
        at org.apache.spark.sql.extensions.TiResolutionRule.<init>(rules.scala:32)
        at org.apache.spark.sql.TiExtensions$$anonfun$apply$6.apply(TiExtensions.scala:18)
        at org.apache.spark.sql.TiExtensions$$anonfun$apply$6.apply(TiExtensions.scala:18)
        at org.apache.spark.sql.SparkSessionExtensions$$anonfun$buildResolutionRules$1.apply(SparkSessionExtensions.scala:75)
        at org.apache.spark.sql.SparkSessionExtensions$$anonfun$buildResolutionRules$1.apply(SparkSessionExtensions.scala:75)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.SparkSessionExtensions.buildResolutionRules(SparkSessionExtensions.scala:75)
        at org.apache.spark.sql.internal.BaseSessionStateBuilder.customResolutionRules(BaseSessionStateBuilder.scala:183)
        at org.apache.spark.sql.odps.OdpsSessionStateBuilder$$anon$1.<init>(OdpsSessionStateBuilder.scala:67)
        at org.apache.spark.sql.odps.OdpsSessionStateBuilder.analyzer(OdpsSessionStateBuilder.scala:62)
        at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293)
        at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293)
        at org.apache.spark.sql.internal.SessionState.analyzer$lzycompute(SessionState.scala:79)
        at org.apache.spark.sql.internal.SessionState.analyzer(SessionState.scala:79)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:58)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:56)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638)
        at com.hydee.dataworks.SparkApplication$.main(SparkApplication.scala:50)
        at com.hydee.dataworks.SparkApplication.main(SparkApplication.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:708)
20/06/09 19:25:06 ERROR ApplicationMaster: User class threw exception: java.lang.NullPointerException: Failed to init client for PD cluster.
java.lang.NullPointerException: Failed to init client for PD cluster.
        at shade.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:906)
        at com.pingcap.tikv.PDClient.initCluster(PDClient.java:345)
        at com.pingcap.tikv.PDClient.<init>(PDClient.java:331)
        at com.pingcap.tikv.PDClient.createRaw(PDClient.java:369)
        at com.pingcap.tikv.TiSession.getPDClient(TiSession.java:114)
        at com.pingcap.tikv.TiSession.getTimestamp(TiSession.java:98)
        at com.pingcap.tikv.TiSession.createSnapshot(TiSession.java:102)
        at com.pingcap.tikv.catalog.Catalog.<init>(Catalog.java:140)
        at com.pingcap.tikv.TiSession.getCatalog(TiSession.java:133)
        at org.apache.spark.sql.TiContext.<init>(TiContext.scala:47)
        at org.apache.spark.sql.TiExtensions.getOrCreateTiContext(TiExtensions.scala:10)
        at org.apache.spark.sql.TiExtensions$$anonfun$apply$5.apply(TiExtensions.scala:18)
        at org.apache.spark.sql.TiExtensions$$anonfun$apply$5.apply(TiExtensions.scala:18)
        at org.apache.spark.sql.extensions.TiResolutionRule.<init>(rules.scala:32)
        at org.apache.spark.sql.TiExtensions$$anonfun$apply$6.apply(TiExtensions.scala:18)
        at org.apache.spark.sql.TiExtensions$$anonfun$apply$6.apply(TiExtensions.scala:18)
        at org.apache.spark.sql.SparkSessionExtensions$$anonfun$buildResolutionRules$1.apply(SparkSessionExtensions.scala:75)
        at org.apache.spark.sql.SparkSessionExtensions$$anonfun$buildResolutionRules$1.apply(SparkSessionExtensions.scala:75)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.SparkSessionExtensions.buildResolutionRules(SparkSessionExtensions.scala:75)
        at org.apache.spark.sql.internal.BaseSessionStateBuilder.customResolutionRules(BaseSessionStateBuilder.scala:183)
        at org.apache.spark.sql.odps.OdpsSessionStateBuilder$$anon$1.<init>(OdpsSessionStateBuilder.scala:67)
        at org.apache.spark.sql.odps.OdpsSessionStateBuilder.analyzer(OdpsSessionStateBuilder.scala:62)
        at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293)
        at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293)
        at org.apache.spark.sql.internal.SessionState.analyzer$lzycompute(SessionState.scala:79)
        at org.apache.spark.sql.internal.SessionState.analyzer(SessionState.scala:79)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:58)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:56)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638)
        at com.hydee.dataworks.SparkApplication$.main(SparkApplication.scala:50)
        at com.hydee.dataworks.SparkApplication.main(SparkApplication.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:708)
20/06/09 19:25:06 INFO ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.lang.NullPointerException: Failed to init client for PD cluster.
        at shade.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:906)
        at com.pingcap.tikv.PDClient.initCluster(PDClient.java:345)
        at com.pingcap.tikv.PDClient.<init>(PDClient.java:331)
        at com.pingcap.tikv.PDClient.createRaw(PDClient.java:369)
        at com.pingcap.tikv.TiSession.getPDClient(TiSession.java:114)
        at com.pingcap.tikv.TiSession.getTimestamp(TiSession.java:98)
        at com.pingcap.tikv.TiSession.createSnapshot(TiSession.java:102)
        at com.pingcap.tikv.catalog.Catalog.<init>(Catalog.java:140)
        at com.pingcap.tikv.TiSession.getCatalog(TiSession.java:133)
        at org.apache.spark.sql.TiContext.<init>(TiContext.scala:47)
        at org.apache.spark.sql.TiExtensions.getOrCreateTiContext(TiExtensions.scala:10)
        at org.apache.spark.sql.TiExtensions$$anonfun$apply$5.apply(TiExtensions.scala:18)
        at org.apache.spark.sql.TiExtensions$$anonfun$apply$5.apply(TiExtensions.scala:18)
        at org.apache.spark.sql.extensions.TiResolutionRule.<init>(rules.scala:32)
        at org.apache.spark.sql.TiExtensions$$anonfun$apply$6.apply(TiExtensions.scala:18)
        at org.apache.spark.sql.TiExtensions$$anonfun$apply$6.apply(TiExtensions.scala:18)
        at org.apache.spark.sql.SparkSessionExtensions$$anonfun$buildResolutionRules$1.apply(SparkSessionExtensions.scala:75)
        at org.apache.spark.sql.SparkSessionExtensions$$anonfun$buildResolutionRules$1.apply(SparkSessionExtensions.scala:75)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.SparkSessionExtensions.buildResolutionRules(SparkSessionExtensions.scala:75)
        at org.apache.spark.sql.internal.BaseSessionStateBuilder.customResolutionRules(BaseSessionStateBuilder.scala:183)
        at org.apache.spark.sql.odps.OdpsSessionStateBuilder$$anon$1.<init>(OdpsSessionStateBuilder.scala:67)
        at org.apache.spark.sql.odps.OdpsSessionStateBuilder.analyzer(OdpsSessionStateBuilder.scala:62)
        at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293)
        at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293)
        at org.apache.spark.sql.internal.SessionState.analyzer$lzycompute(SessionState.scala:79)
        at org.apache.spark.sql.internal.SessionState.analyzer(SessionState.scala:79)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:58)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:56)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638)
        at com.hydee.dataworks.SparkApplication$.main(SparkApplication.scala:50)
        at com.hydee.dataworks.SparkApplication.main(SparkApplication.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:708)
)
20/06/09 19:25:06 INFO SparkContext: Invoking stop() from shutdown hook
20/06/09 19:25:06 INFO SparkUI: Stopped Spark web UI at http://master7907ef97-4e55-472e-aaff-45db1a66834bcupid-11-219-228-5:8088
20/06/09 19:25:06 INFO YarnAllocator: Driver requested a total number of 0 executor(s).
20/06/09 19:25:06 INFO YarnClusterSchedulerBackend: Shutting down all executors
20/06/09 19:25:06 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
20/06/09 19:25:06 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
20/06/09 19:25:06 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/06/09 19:25:06 INFO MemoryStore: MemoryStore cleared
20/06/09 19:25:06 INFO BlockManager: BlockManager stopped
20/06/09 19:25:06 INFO BlockManagerMaster: BlockManagerMaster stopped
20/06/09 19:25:06 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/06/09 19:25:07 INFO SparkContext: Successfully stopped SparkContext
20/06/09 19:25:07 INFO ShutdownHookManager: Shutdown hook called
20/06/09 19:25:07 INFO ShutdownHookManager: Deleting directory /diskdriver/b21e12433.cloud.et2@ephemeral_503c74c2-e81d-4bf9-89ec-da58bfb1a630#3dd62111-b347-45fe-a7eb-e4a086d36e64/spark-1f3af700-2b05-465a-bda2-3716b639e5e6
, stdout content: 
        at com.aliyun.odps.cupid.CupidUtil.errMsg2SparkException(CupidUtil.java:43)
        at com.aliyun.odps.cupid.CupidUtil.getResult(CupidUtil.java:123)
        at com.aliyun.odps.cupid.requestcupid.YarnClientImplUtil.pollAMStatus(YarnClientImplUtil.java:107)
        at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.applicationReportTransform(YarnClientImpl.java:340)
        ... 15 more
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at com.aliyun.odps.SubmitJob.main(SubmitJob.java:74)
Caused by: org.apache.spark.SparkException: Application application_1591701855942_448853916 finished with failed status
        at org.apache.spark.deploy.yarn.Client.run(Client.scala:1185)
        at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1542)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:881)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
        ... 5 more

如果上述问题都排除了,那只有可能是 Spark 机器与 PD 集群不连通,可以在 Spark 集群的机器上尝试执行 pd-ctl 的命令,看看能否运行?

执行不了。阿里提供的是服务,没有服务器。
我本地是可以联通的,通过外网联通,应该可以的

那可以在本地执行一下么? pd-ctl -u <pd-address:pd-port> store

我用一个ali云服务器执行的

[root@iZ2ze278r1bks3kx96sbgmZ bin]# ./pd-ctl -u 47.103.58.154:2379 store
{
“count”: 3,
“stores”: [
{
“store”: {
“id”: 5,
“address”: “172.30.0.115:20160”,
“version”: “3.0.5”,
“state_name”: “Up”
},
“status”: {
“capacity”: “295.2GiB”,
“available”: “266.1GiB”,
“leader_count”: 3860,
“leader_weight”: 1,
“leader_score”: 4457,
“leader_size”: 4457,
“region_count”: 10504,
“region_weight”: 1,
“region_score”: 13355,
“region_size”: 13355,
“start_ts”: “2020-05-26T11:43:37+08:00”,
“last_heartbeat_ts”: “2020-06-09T19:56:06.624246292+08:00”,
“uptime”: “344h12m29.624246292s”
}
},
{
“store”: {
“id”: 1,
“address”: “172.30.0.117:20160”,
“version”: “3.0.5”,
“state_name”: “Up”
},
“status”: {
“capacity”: “295.2GiB”,
“available”: “269.9GiB”,
“leader_count”: 2866,
“leader_weight”: 1,
“leader_score”: 4449,
“leader_size”: 4449,
“region_count”: 10504,
“region_weight”: 1,
“region_score”: 13355,
“region_size”: 13355,
“start_ts”: “2020-05-29T18:32:00+08:00”,
“last_heartbeat_ts”: “2020-06-09T19:56:06.090978425+08:00”,
“uptime”: “265h24m6.090978425s”
}
},
{
“store”: {
“id”: 4,
“address”: “172.30.0.116:20160”,
“version”: “3.0.5”,
“state_name”: “Up”
},
“status”: {
“capacity”: “295.2GiB”,
“available”: “273GiB”,
“leader_count”: 3778,
“leader_weight”: 1,
“leader_score”: 4449,
“leader_size”: 4449,
“region_count”: 10504,
“region_weight”: 1,
“region_score”: 13355,
“region_size”: 13355,
“start_ts”: “2020-05-28T12:56:43+08:00”,
“last_heartbeat_ts”: “2020-06-09T19:56:00.501937549+08:00”,
“uptime”: “294h59m17.501937549s”
}
}
]
}

[root@iZ2ze278r1bks3kx96sbgmZ bin]#

之前发现端口号没有加上导致连接报错。现在又变成连接超时了 20/06/09 20:06:53 WARN PDClient: failed to get member from pd server. shade.io.grpc.StatusRuntimeException: UNAVAILABLE: io exception at shade.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:233) at shade.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:214) at shade.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:139) at org.tikv.kvproto.PDGrpc$PDBlockingStub.getMembers(PDGrpc.java:966) at com.pingcap.tikv.PDClient.getMembers(PDClient.java:246) at com.pingcap.tikv.PDClient.initCluster(PDClient.java:340) at com.pingcap.tikv.PDClient.(PDClient.java:331) at com.pingcap.tikv.PDClient.createRaw(PDClient.java:369) at com.pingcap.tikv.TiSession.getPDClient(TiSession.java:114) at com.pingcap.tikv.TiSession.getTimestamp(TiSession.java:98) at com.pingcap.tikv.TiSession.createSnapshot(TiSession.java:102) at com.pingcap.tikv.catalog.Catalog.(Catalog.java:140) at com.pingcap.tikv.TiSession.getCatalog(TiSession.java:133) at org.apache.spark.sql.TiContext.(TiContext.scala:47) at org.apache.spark.sql.TiExtensions.getOrCreateTiContext(TiExtensions.scala:10) at org.apache.spark.sql.TiExtensions$$anonfun$apply$5.apply(TiExtensions.scala:18) at org.apache.spark.sql.TiExtensions$$anonfun$apply$5.apply(TiExtensions.scala:18) at org.apache.spark.sql.extensions.TiResolutionRule.(rules.scala:32) at org.apache.spark.sql.TiExtensions$$anonfun$apply$6.apply(TiExtensions.scala:18) at org.apache.spark.sql.TiExtensions$$anonfun$apply$6.apply(TiExtensions.scala:18) at org.apache.spark.sql.SparkSessionExtensions$$anonfun$buildResolutionRules$1.apply(SparkSessionExtensions.scala:75) at org.apache.spark.sql.SparkSessionExtensions$$anonfun$buildResolutionRules$1.apply(SparkSessionExtensions.scala:75) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.SparkSessionExtensions.buildResolutionRules(SparkSessionExtensions.scala:75) at org.apache.spark.sql.internal.BaseSessionStateBuilder.customResolutionRules(BaseSessionStateBuilder.scala:183) at org.apache.spark.sql.odps.OdpsSessionStateBuilder$$anon$1.(OdpsSessionStateBuilder.scala:67) at org.apache.spark.sql.odps.OdpsSessionStateBuilder.analyzer(OdpsSessionStateBuilder.scala:62) at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293) at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293) at org.apache.spark.sql.internal.SessionState.analyzer$lzycompute(SessionState.scala:79) at org.apache.spark.sql.internal.SessionState.analyzer(SessionState.scala:79) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:58) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:56) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:638) at com.hydee.dataworks.SparkApplication$.main(SparkApplication.scala:56) at com.hydee.dataworks.SparkApplication.main(SparkApplication.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:708) Caused by: shade.io.netty.channel.ConnectTimeoutException: connection timed out: /47.103.58.154:2379 at shade.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:267) at shade.io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) at shade.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127) at shade.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at shade.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) at shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462) at shade.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897) at shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:745)

看了一眼应该 PD 是正常的。不过信息现在太少了,不能确定你所使用的 Spark 集群能否访问 PD 集群,我希望能至少知道 Spark 的版本信息。(如通过代码查询 spark context 的 version,并在程序中尝试访问 PD 的地址看看能否成功)

如果上述依然无法断定的话,请问可以尝试自己搭建 Spark 集群么?我觉得 TiSpark 不太适合在使用无法配置且不知道版本信息的 spark 服务上运行。

1 个赞

嗯嗯,应该是连通性问题。我也问一下spark的集群。

目前得需求是我们的大数据环境不自己搭建了,使用阿里的。导致spark写入只能通过mysql jdbc,这种方式太慢,想通过tispak直接写入能批量加快写入速度。

好的,明白了。看刚才你的报错信息,多半是连通性的问题。有可能你的 Spark 服务所在区域无法访问到 PD 的集群。

1 个赞