本地运行代码报错,pd能连接上能读取信息
读取不会报错,写入就报错;
10.1.5.222是本机ip
运行到写入的时候报错信息如下(为什么会读取localhost ip呢)
Exception in thread “main” java.sql.SQLException: Access denied for user ‘root’@‘10.1.5.222’ (localhost ip) (using password: YES)
追源码追到mysqlIO里面就追不动了
pom
<dependency>
<groupId>com.pingcap.tispark</groupId>
<artifactId>tispark-assembly</artifactId>
<version>2.4.4-scala_2.11</version>
</dependency>
报错堆栈
23/02/28 22:32:33 INFO PDClient: Switched to new leader: [leaderInfo: 172.18.1.13:2379]
xxx df show xxxx
Exception in thread "main" java.sql.SQLException: Access denied for user 'root'@'10.1.5.222' (using password: YES)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:964)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3970)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3906)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:873)
at com.mysql.jdbc.MysqlIO.proceedHandshakeWithPluggableAuthentication(MysqlIO.java:1710)
at com.mysql.jdbc.MysqlIO.doHandshake(MysqlIO.java:1226)
at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2253)
at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2284)
at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2083)
at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:806)
at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:47)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:410)
at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:328)
at com.pingcap.tispark.TiDBUtils$$anonfun$createConnectionFactory$1.apply(TiDBUtils.scala:63)
at com.pingcap.tispark.TiDBUtils$$anonfun$createConnectionFactory$1.apply(TiDBUtils.scala:52)
at com.pingcap.tispark.write.TiDBWriter$.write(TiDBWriter.scala:33)
at com.pingcap.tispark.TiDBDataSource.createRelation(TiDBDataSource.scala:57)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270)
代码如下
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setIfMissing("spark.tispark.write.allow_spark_sql", "true")
.setIfMissing("spark.master", "local[*]")
.setIfMissing("spark.app.name", getClass.getName)
.setIfMissing("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
.setIfMissing("spark.tispark.tidb.addr", "172.18.1.13")
.setIfMissing("spark.tispark.tidb.port", "4000")
.setIfMissing("spark.tispark.pd.addresses", "172.18.1.14:2379")
.setIfMissing("spark.sql.catalog.tidb_catalog.pd.addresses", "172.18.1.14:2379")
.setIfMissing("spark.tispark.write.without_lock_table", "true")
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
val sqlContext = spark.sqlContext
writeUsingScala(sqlContext)
}
def writeUsingScala(sqlContext: SQLContext): Unit = {
// use tidb config in spark config if does not provide in data source config
val tiOptionMap = new util.HashMap[String, String]()
tiOptionMap.put("tidb.addr", "172.18.1.12")
tiOptionMap.put("tidb.port", "4000")
tiOptionMap.put("tidb.user", "root")
tiOptionMap.put("tidb.password", "-t+156@8T2NQFw$m7q")
tiOptionMap.put("replace", "true")
tiOptionMap.put("spark.tispark.pd.addresses", "172.18.1.14:2379")
val df = sqlContext.read
.format("tidb")
.options(tiOptionMap)
.option("database", "test")
.option("table", "person")
.load()
// df show数据是正常的
df.show()
// Append
df.write
.format("tidb")
.options(tiOptionMap)
.option("database", "test")
.option("table", "person_copy1")
.mode("append")
.save()
// save这里就报错
}