Table or view not found,表其实是存在的

【 TiDB 使用环境】生产环境
【 TiDB 版本】v5.2.1
【复现路径】spark版本是2.4,用tispark读tidb,偶尔会报Exception in thread "main"org-apachespark.sql.Analysisxception: Table or view not found:
【遇到的问题:问题现象及影响】读取失败,任务会抛异常
【资源配置】进入到 TiDB Dashboard -集群信息 (Cluster Info) -主机(Hosts) 截图此页面
【附件:截图/日志/监控】

如果是“偶尔”,会不会可能是超时或者中断了!

直接用 sql client 去读呢?

偶尔发生这种情况

部署环境的spark目录下conf里hive-site.xml文件没有更新,导致找不到数据库。

将hive的conf目录中的hive-site.xml拷贝到spark目录下conf里试一下。

这个是tispark直接连的tidb,不是用spark访问hive。

tispark报错的相关代码是用scala写的看不大懂,你可以自己去研究下
https://github.com/pingcap/tispark/blob/e17587c7c74b93878f67be70ea7523fe635ddbd6/core/src/test/scala/com/pingcap/tispark/insert/InsertSuite.scala#L19
https://github.com/pingcap/tispark/blob/e17587c7c74b93878f67be70ea7523fe635ddbd6/core/src/test/scala/com/pingcap/tispark/datasource/ExceptionTestSuite.scala#L25

谢谢,现在还在tisaprk读取数据是节点报的Exception in thread “main” org.apache.spark.sql.AnalysisException: Table or view not found

程序里有使用过spark.sql.runSQLOnFiles 这个参数吗?

还有更具体的错误信息么?

最好把代码也贴出来。。。

Exception in thread “main” org.apache.spark.sql.AnalysisException: Table or view not found: tidb_rpt.no_built_package_details; line 2 pos 67;
'Project [unresolvedalias('date_format('max('rpt_date), yyyy-MM-dd HH:mm:ss), None)]
± 'Filter ('rpt_date >= 2023-10-30)
± 'UnresolvedRelation tidb_rpt.no_built_package_details

at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:90)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108)
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
at com.yto56.daichaibao.testcase.tpch.DaichaobaoTestLogic$.achive(DaichaobaoTestLogic.scala:37)
at com.yto56.daichaibao.DaichaibaoTest$.main(DaichaibaoTest.scala:50)
at com.yto56.daichaibao.DaichaibaoTest.main(DaichaibaoTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

没有用到spark.sql.runSQLOn

代码是怎么写的。。。使用spark-sql前有切换tidb的catalog吗?

发生这个错误之前,是否有某个tikv的重启?

没有收到tikv重启的报警

import package cn.net.dfo.dw.driver
import cn.net.dfo.dw.utils.DateHelper
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.storage.StorageLevel
import org.slf4j.{Logger, LoggerFactory}

object BatchCollectTotal {

val log: Logger = LoggerFactory.getLogger(this.getClass.getName)

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().
  //setIfMissing("spark.master", "local[*]").
  setIfMissing("spark.app.name", getClass.getName).
  setIfMissing("spark.driver.allowMultipleContexts", "true").
  setIfMissing("spark.sql.extensions", "org.apache.spark.sql.TiExtensions").
  // 生产
  setIfMissing("spark.tispark.pd.addresses", "ip:2379").
  // 测试
  // setIfMissing("spark.tispark.pd.addresses", "ip:2379").
  setIfMissing("spark.driver.maxResultSize", "16g").
  setIfMissing("spark.debug.maxToStringFields", "150").

  // 生产
  setIfMissing("spark.tispark.tidb.addr", "tidb1.prd.db").
  setIfMissing("spark.tispark.tidb.port", "3390").

  // 测试
  // setIfMissing("spark.tispark.tidb.addr", "ip.122").
  // setIfMissing("spark.tispark.tidb.port", "4000").

  //如果要同时操作hive和tidb,加上enableHiveSupport方法。
  // 如果有重名库 要操作Tidb表要加上前缀已区分是hive还是tidb的表
  setIfMissing("spark.tispark.write.allow_spark_sql", "true").
  setIfMissing("spark.tispark.db_prefix", "tidb_").
  setIfMissing("spark.sql.crossJoin.enabled", "true").
  setIfMissing("spark.tispark.use.tiflash", "true")

val spark = SparkSession.builder.config(sparkConf).getOrCreate()

var batchTime = ""
if (args.size == 2) {
  batchTime = args(0) + " " + args(1) //yyyy-MM-dd HH:mm:ss
  println("传入参数: " + batchTime)
} else {
  batchTime = DateHelper.getCurrTimeSegment(10) //yyyy-MM-dd HH:mm:ss
  println("默认参数:" + batchTime)
}

val historyTime = DateHelper.beforeMinute(batchTime, "yyyy-MM-dd HH:mm:ss", 10)

// T 日
val dateStr = batchTime.substring(0, 10)
// T-5 日
val date_t_5 = DateHelper.defineDayBefore(dateStr, -5, "yyyy-MM-dd")



println("batchTime => " + batchTime)
println("historyTime => " + historyTime)
try {
  // 
  //      deleteData(historyTime)

  // (mdm_org_temp)
  println(">>>>>>>>>>>>>>>>>>>>>>>>>>> ")
  basicData(spark, batchTime)

 
  getCarPeriod(spark, batchTime)

  
} catch {
  case e: Exception =>
    e.printStackTrace()
    log.error(e.getMessage)
} finally {
  spark.close()
  System.exit(0)
}

}

def basicData(spark: SparkSession, batchTime: String) = {
val firstThreeDays = DateHelper.defineDayBefore(batchTime.substring(0, 10).replaceAll(“-”, “”), -3, “yyyyMMdd”)
// 维度表
var sql =
s"“”
|select org_code,
| org_name,
| region_code,
| region_name,
| transfer_code,
| transfer_name,
| transfer_prov_code,
| transfer_prov_name
| from (select org_code,
|
| else
| region_name
| end region_name,
| transfer_code,
| transfer_name,
| transfer_prov_code,
| transfer_prov_name,
| row_number() over(partition by org_code order by dt desc) rk
| from dim.t03_user_org_mdm_org
| where dt >= ‘${firstThreeDays}’) m
| where rk = ‘1’
|“”".stripMargin
/var sql =
“”"
|select sub_department_code org_code,
| sub_department_name org_name,
| region_code region_code,
| region_name region_name,
| transfer_code transfer_code,
| transfer_name transfer_name
| from default.dw_dim_dfo_org_trans
|“”".stripMargin
/
println(sql)
spark.sql(sql).persist(StorageLevel.MEMORY_ONLY_SER).createOrReplaceTempView(“temp_upcar_monitor_orgdim”)
// spark.sql(“select * from temp_upcar_monitor_orgdim limit 10”).show()

sql =
  s"""
     |select /** BROADCAST(t) */ * from tidb_rpt.upcar_monitor_carno t
     |""".stripMargin
spark.sql(sql).persist(StorageLevel.MEMORY_ONLY_SER).createOrReplaceTempView("temp_upcar_monitor_carno")
//    spark.sql("select * from temp_upcar_monitor_carno limit 10").show()

}

def firstBatchFlag(historyTime: String) = {
val beginBatchTime = historyTime.split(" ")(1)
“09:00:00”.equals(beginBatchTime)
}

/def deleteData(historyTime: String) = {
if (“00:00:00”.equals(historyTime.split(" “)(1))) {
val date = DateHelper.defineDayBefore(historyTime, -30, “yyyy-MM-dd”)
@transient val conn = TidbClusterPool.getConnection
val stmt = conn.createStatement()
val sql = s"delete from tidb_ytrpt.upcar_monitor_carno where create_time<‘${date}’”
println(sql)
stmt.execute(sql)
}
}
/

def getTotalDetail(spark: SparkSession, batchTime: String) = {

val dateStr = batchTime.substring(0, 10)

var ctStartDate = ""
//    var ctEndDate = ""
var initDate = ""

if (DateHelper.theNextDay(batchTime) > 0) {
  initDate = DateHelper.defineDayBefore(dateStr, -2, "yyyy-MM-dd") + " 09:00:00"
  ctStartDate = DateHelper.defineDayBefore(dateStr, -1, "yyyy-MM-dd") + " 09:00:00"
  //      ctEndDate = dateStr + " 09:00:00"
} else {
  initDate = DateHelper.defineDayBefore(dateStr, -1, "yyyy-MM-dd") + " 09:00:00"
  ctStartDate = dateStr + " 09:00:00"
  //      ctEndDate = DateHelper.defineDayBefore(dateStr, 1, "yyyy-MM-dd") + " 09:00:00"
}

/*val sql =
  s"""
     |select t.waybillNo              waybill_No,
     |       t.bagNo                  bag_No,
     |       t.containerNo            container_No,
     |       t.expType                exp_Type,
     |       t.orgCode                org_Code,
     |       t.createTime             create_Time,
     |       t.weight                 weight,
     |       t.createUserCode         create_User_Code,
     |       t.createUserName         create_User_Name,
     |       case when c.line_Name is null or c.line_name ='' then first_value(t.lineName) over(order by t.lineName) else c.line_name end line_Name,
     |       t.truckNo                truck_No,
     |       t.truckTypeNew           truck_Type_New,
     |       t.transCodeLink          trans_Code_Link,
     |       t.predictDepartTime      predict_Depart_Time,
     |       t.returnFlag             return_Flag,
     |       t.datoubi                datoubi,
     |       t.desRoute               des_Route,
     |       t.desOrgCode             des_Org_Code,
     |       t.masterNodeCode         master_Node_Code,
     |       t.targetWeight           target_Weight,
     |       t.startRegionCode        start_Region_Code,
     |       t.desRegionCode          des_Region_Code,
     |       t.peizaiseoncdOrgCode    peizaiseoncd_Org_Code,
     |       t.peizaisecondRegionCode peizaisecond_Region_Code,
     |       t.peizaistartOrgCode     peizaistart_Org_Code,
     |       t.peizaistartRegionCode  peizaistart_Region_Code,
     |       t.startProvCode          start_Prov_Code,
     |       t.waybillNob             waybill_Nob,
     |       t.firstNodeCode          first_Node_Code,
     |       t.centerCarType          center_Car_Type,
     |       t.routeLineName          route_Line_Name,
     |       c.car_Type               car_Type,
     |       c.fache_Time             fache_Time,
     |       t.batchTime              batch_Time,
     |       '${batchTime}'           rpt_date
     | from (
     |	select *,
     |		       row_number() over(partition by waybillNo,orgCode,containerNo order by createtime desc) rn
     |		  from tidb_ytrpt.upcar_monitor_detail
     |	     where createTime>= '${ctStartDate}' and createTime < '${batchTime}'
     |) t left join temp_upcar_monitor_carno c on c.car_no = t.containerNo
     |where t.rn = 1
     |""".stripMargin*/

/*val sql =
  s"""
     |select t.waybillNo              waybill_No,
     |       t.bagNo                  bag_No,
     |       t.containerNo            container_No,
     |       t.expType                exp_Type,
     |       t.orgCode                org_Code,
     |       t.createTime             create_Time,
     |       t.weight                 weight,
     |       t.createUserCode         create_User_Code,
     |       t.createUserName         create_User_Name,
     |       t.line_name              line_Name,
     |       t.truckNo                truck_No,
     |       t.truckTypeNew           truck_Type_New,
     |       t.transCodeLink          trans_Code_Link,
     |       t.predictDepartTime      predict_Depart_Time,
     |       t.returnFlag             return_Flag,
     |       t.datoubi                datoubi,
     |       t.desRoute               des_Route,
     |       t.desOrgCode             des_Org_Code,
     |       t.masterNodeCode         master_Node_Code,
     |       t.targetWeight           target_Weight,
     |       t.startRegionCode        start_Region_Code,
     |       t.desRegionCode          des_Region_Code,
     |       t.peizaiseoncdOrgCode    peizaiseoncd_Org_Code,
     |       t.peizaisecondRegionCode peizaisecond_Region_Code,
     |       t.peizaistartOrgCode     peizaistart_Org_Code,
     |       t.peizaistartRegionCode  peizaistart_Region_Code,
     |       t.startProvCode          start_Prov_Code,
     |       t.waybillNob             waybill_Nob,
     |       t.firstNodeCode          first_Node_Code,
     |       t.centerCarType          center_Car_Type,
     |       t.routeLineName          route_Line_Name,
     |       t.car_Type               car_Type,
     |       t.fache_Time             fache_Time,
     |       t.batchTime              batch_Time,
     |       '${batchTime}'           rpt_date
     | from (
     |	select * from (
     |		select *,
     |		       row_number() over(partition by waybillNo,orgCode,containerNo order by createtime desc) rn
     |		  from (
     |           select t1.*, c.*
     |             from tidb_ytrpt.upcar_monitor_detail t1
     |             left join (select * from temp_upcar_monitor_carno where create_time<='${batchTime}') c on c.car_no = t1.containerno
     |            where t1.createTime>= '${ctStartDate}' and t1.createTime < '${batchTime}'
     |           union all
     |           select t2.*, c.*
     |             from tidb_ytrpt.upcar_monitor_detail t2
     |             join (select * from temp_upcar_monitor_carno where status = 'INVALID' and car_type='待发车') c on c.car_no = t2.containerno
     |            where t2.createTime>= '${initDate}' and t2.createTime < '${ctStartDate}'
     |    ) tab
     |	) temp where temp.rn = 1 and not exists (select 1 from temp_upcar_monitor_carno c where status = 'INVALID' and c.car_no=temp.containerNo)
     |) t
     |""".stripMargin
println(sql)
spark.sql(sql).repartition(100).persist(StorageLevel.MEMORY_ONLY_SER).createOrReplaceTempView("upcar_monitor_detail_temp")*/

// 九点批次固化发车状态
if ("09:00:00".equals(batchTime.split(" ", -1)(1))) {
  val sql =
    s"""
       |select '${DateHelper.convertDate(batchTime, "yyyy-MM-dd")}' rpt_date
       |	,t.car_no
       |	,t.create_time
       |	,t.car_type
       |	,t.status
       |	,t.fache_time
       |	,t.line_name
       |  from temp_upcar_monitor_carno t
       |""".stripMargin
  spark.sql(sql).write.mode(saveMode = "append").format("jdbc")
    .option("driver", "com.mysql.jdbc.Driver")
    .option("url", "jdbc:mysql://tidb1.prd.db:3390/ytrpt?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8")
    .option("useSSL", false)
    .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 10000)
    .option("dbtable", "upcar_monitor_carno_day") // database name and table name here
    .option("isolationLevel", "NONE") // set isolationLevel to NONE
    .option("user", "root") // TiDB user here
    .save()
}


val sql0 =
  s"""
     |select * from tidb_ytrpt.upcar_monitor_detail
     |where createTime>= '${initDate}' and createTime < '${batchTime}'
     |""".stripMargin
println(sql0)
//    spark.sql(sql).createOrReplaceTempView("upcar_monitor_detail_incomplete")
spark.sql(sql0)
  .repartition(256).persist(StorageLevel.MEMORY_ONLY_SER)
  .createOrReplaceTempView("upcar_monitor_detail_base_temp")


val sql =
  s"""
     |select t.waybillNo              waybill_No,
     |       t.bagNo                  bag_No,
     |       t.containerNo            container_No,
     |       t.expType                exp_Type,
     |       t.orgCode                org_Code,
     |       t.createTime             create_Time,
     |       t.weight                 weight,
     |       t.createUserCode         create_User_Code,
     |       t.createUserName         create_User_Name,
     |       t.line_name              line_Name,
     |       t.truckNo                truck_No,
     |       t.truckTypeNew           truck_Type_New,
     |       t.transCodeLink          trans_Code_Link,
     |       t.predictDepartTime      predict_Depart_Time,
     |       t.returnFlag             return_Flag,
     |       t.datoubi                datoubi,
     |       t.desRoute               des_Route,
     |       t.desOrgCode             des_Org_Code,
     |       t.masterNodeCode         master_Node_Code,
     |       t.startRegionCode        start_Region_Code,
     |       t.desRegionCode          des_Region_Code,
     |       t.peizaiseoncdOrgCode    peizaiseoncd_Org_Code,
     |       t.peizaisecondRegionCode peizaisecond_Region_Code,
     |       t.peizaistartOrgCode     peizaistart_Org_Code,
     |       t.peizaistartRegionCode  peizaistart_Region_Code,
     |       t.startProvCode          start_Prov_Code,
     |       t.waybillNob             waybill_Nob,
     |       t.firstNodeCode          first_Node_Code,
     |       t.centerCarType          center_Car_Type,
     |       t.routeLineName          route_Line_Name,
     |       case when t.car_Type is null or t.car_Type='' then t.carType else t.car_Type end as car_Type,
     |       t.run_mode               run_mode,
     |       t.fache_Time             fache_Time,
     |       t.batchTime              batch_Time,
     |       '${batchTime}'           rpt_date
     | from (
     |		select tab.*, c.*,
     |		       row_number() over(partition by tab.waybillNo,tab.orgCode,tab.containerNo order by tab.createtime desc) rn
     |		  from (
     |           select t1.*
     |             from upcar_monitor_detail_base_temp t1
     |            where t1.createTime>= '${ctStartDate}' and t1.createTime < '${batchTime}'
     |           union all
     |           select t2.*
     |             from upcar_monitor_detail_base_temp t2
     |             join (select car_no from tidb_ytrpt.upcar_monitor_carno_day where rpt_date = '${DateHelper.convertDate(batchTime, "yyyy-MM-dd")}' and car_type='待发车') c on c.car_no = t2.containerno
     |            where t2.createTime>= '${initDate}' and t2.createTime < '${ctStartDate}'
     |    ) tab left join (select * from temp_upcar_monitor_carno where create_time<='${batchTime}') c on c.car_no = tab.containerno
     |) t where rn = 1 and not exists (select 1 from temp_upcar_monitor_carno c where status = 'INVALID' and c.car_no=t.containerno)
     |""".stripMargin
println(sql)
//    spark.sql(sql).createOrReplaceTempView("upcar_monitor_detail_incomplete")
spark.sql(sql)
  .repartition(256).persist(StorageLevel.MEMORY_ONLY_SER)
  .createOrReplaceTempView("upcar_monitor_detail_temp")

}

def getBillDetail(spark: SparkSession, batchTime: String) = {
val sql =
s"“”
|select ‘${batchTime}’ rpt_date,
| t.center_code,
| m.org_name center_name,
| t.car_mark,
| t.waybill_num,
| t.route,
| t.car_time,
| t.kg_num,
| t.operate_name,
| t.operate_code,
| n.late_status
| from (
| select t1.master_node_code center_code,
| t1.container_no car_mark,
| t1.waybill_no waybill_num,
| t1.route_line_name route,
| t1.create_time car_time,
| t1.weight kg_num,
| t1.create_user_name operate_name,
| t1.create_user_code operate_code,
| t1.car_Type car_Type
| from upcar_monitor_detail_temp t1
| where t1.center_Car_Type <> ‘一致’
| and t1.center_Car_Type <> ‘特殊路由’
| ) t left join temp_upcar_monitor_orgdim m on t.center_code = m.org_code
| left join
| (
| select car_mark,max(late_status) as late_status from temp_now_car_details group by car_mark
| ) n on t.car_mark=n.car_mark
|“”".stripMargin
println(sql)
// val df = spark.sql(sql).repartition(256).persist(StorageLevel.MEMORY_ONLY_SER)
// df.createOrReplaceTempView(“upcar_monitor_waybill_details_temp”)
spark.sql(sql).repartition(256).write.mode(saveMode = “append”).format(“jdbc”)
.option(“driver”, “com.mysql.jdbc.Driver”)
.option(“url”, “jdbc:mysql://tidb1.prd.db:3390/ytrpt?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8”)
.option(“useSSL”, false)
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 10000)
.option(“dbtable”, “upcar_monitor_waybill_details”) // database name and table name here
.option(“isolationLevel”, “NONE”) // set isolationLevel to NONE
.option(“user”, “root”) // TiDB user here
.save()

//    // 9点批次写入日维度表中
//    DateCollectTotal.saveBillDay(spark, batchTime)
//
//    df.unpersist()

}

def getCarDetail(spark: SparkSession, batchTime: String, date_t:String,date_t_5:String) = {
val simpleBatchTime = batchTime.replaceAll(“-|:”, “”).replaceAll(" ", “”)
// 车签层级未展开汇总
/var sql =
s"“”
|select t.org_code start_center_code,
| t.des_route end_center_code,
| t.container_no car_mark,
| t.target_weight should_num,
| sum(t.weight) aboard_num,
| count(t.waybill_no) ticket_num,
| sum(case when t.center_car_type = ‘未按规划中转’ then 1 else 0 end) fault_num,
| t.car_type car_type,
| max(t.truck_no) car_plate,
| t.line_name route,
| t.fache_time,
| t.predict_depart_time
| from upcar_monitor_detail_temp t
| group by t.org_code,
| t.des_route,
| t.container_no,
| t.target_weight,
| t.car_type,
| t.line_name,
| t.fache_time
|“”".stripMargin
/

// 最近5天同线路、同车型平均装载量
var sql =
  s"""
    |select
    |    route
    |	,truck_type
    |	,max(should_num) as should_num
    |	,avg(aboard_num) as aboard_num
    |	,count(1) as car_count
    |from (
    |	select route,truck_type,car_mark,should_num ,aboard_num
    |		,row_number() over(partition by route,truck_type,car_mark order by rpt_date desc) as sn
    |	from tidb_ytrpt.upcar_monitor_car_details
    |	where rpt_date>='${date_t_5}'
    |	and rpt_date< '${date_t}'
    |	and car_type='已发车'
    |) tab where sn=1
    |group by route,truck_type
    |order by route,truck_type
    |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_upcar_monitor_sameline_sametype_should_numb")

// 获取 T-1 日期
val date_t_2 = DateHelper.defineDayBefore(date_t, -2, "yyyy-MM-dd")

// 获取配置表 “额定装载重量”
sql =
  s"""
     |select source_center_code
     |      ,des_center_code
     |      ,car_type
     |      ,target_num
     |      ,target_weight
     |from (
     |    select
     |        source_center_code
     |        ,des_center_code
     |        ,car_type
     |        ,target_num
     |        ,target_weight
     |        ,row_number() over(partition by source_center_code,des_center_code,car_type order by dt desc,c_id desc) rn
     |    from ods.rpt_db_input_cubed_out_new
     |    where dt >= regexp_replace('${date_t_2}','-','')
     |) t0
     |where t0.rn = 1
     |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_line_target")


sql =
"""
  |select t.org_code start_center_code,
  |       t.des_route end_center_code,
  |       t.container_no car_mark,
  |       sum(t.weight) aboard_num,
  |       count(t.waybill_no) ticket_num,
  |       sum(case when t.center_car_type = '未按规划中转' then 1 else 0 end) fault_num,
  |       t.car_type car_type,
  |       t.car_plate,
  |       t.run_mode,
  |       t.line_name route,
  |       t.fache_time,
  |       t.truck_Type_New,
  |       max(t.predict_depart_time) predict_depart_time
  |  from (select *,
  |               first_value(truck_no) over(partition by container_no order by predict_depart_time desc) car_plate
  |          from upcar_monitor_detail_temp) t
  | group by t.org_code,
  |          t.des_route,
  |          t.container_no,
  |          t.car_type,
  |          t.car_plate,
  |          t.run_mode,
  |          t.line_name,
  |          t.fache_time,
  |          t.truck_Type_New
  |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_car")

// 车签层级展开汇总
sql =
  """
    |select t.org_code start_center_code,
    |       t.des_route end_center_code,
    |       t.container_no car_mark,
    |       t.master_node_code center_code,
    |       count(t.waybill_no) center_ticket_num,
    |       case when t.center_car_type='未按规划中转' then '错装'
    |            when t.center_car_type in ('一致','特殊路由') then '一致'
    |            when t.center_car_type='临时路由' then '临时路由'
    |            when t.center_car_type='其他' then '其他'
    |        end center_car_type
    |  from upcar_monitor_detail_temp t
    | group by t.org_code,
    |          t.des_route,
    |          t.container_no,
    |          t.master_node_code,
    |          case when t.center_car_type='未按规划中转' then '错装'
    |               when t.center_car_type in ('一致','特殊路由') then '一致'
    |               when t.center_car_type='临时路由' then '临时路由'
    |               when t.center_car_type='其他' then '其他'
    |           end
    |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_car_one")

sql =
  s"""
     |select m1.region_code,
     |       m1.region_name,
     |       tab.start_center_code,
     |       m1.org_name start_center_name,
     |       tab.end_center_code,
     |       m2.org_name end_center_name,
     |       tab.car_mark,
     |       tab.aboard_num,
     |       tab.ticket_num,
     |       tab.fault_num,
     |       tab.car_type,
     |       tab.run_mode,
     |       tab.car_plate,
     |       tab.route,
     |       tab.center_code,
     |       m3.org_name center_name,
     |       tab.center_ticket_num,
     |       tab.center_car_type,
     |       tab.fache_time start_car_time,
     |       tab.predict_depart_time should_start_time,
     |       tab.truck_Type_New,
     |       case when tab.fache_time is not null and unix_timestamp(tab.fache_time, 'yyyy-MM-dd HH:mm:ss') - unix_timestamp(tab.predict_depart_time, 'yyyy-MM-dd HH:mm:ss') > 0 then 1 when tab.fache_time is null and unix_timestamp('${simpleBatchTime}', 'yyyyMMddHHmmss') - coalesce(unix_timestamp(tab.predict_depart_time, 'yyyy-MM-dd HH:mm:ss'), unix_timestamp('2099-12-01 23:59:59', 'yyyy-MM-dd HH:mm:ss')) > 0 then 1 else 0 end as late_status
     |  from (select a.start_center_code,
     |               a.end_center_code,
     |               a.car_mark,
     |               a.aboard_num,
     |               a.ticket_num,
     |               a.fault_num,
     |               a.car_type          car_type,
     |               a.run_mode,
     |               a.car_plate,
     |               a.route,
     |               a.fache_time         fache_time,
     |               a.predict_depart_time predict_depart_time,
     |               a.truck_Type_New,
     |               b.center_code,
     |               b.center_ticket_num,
     |               b.center_car_type
     |          from temp_t_lkn_upcar_monitor_base_car a
     |          join temp_t_lkn_upcar_monitor_base_car_one b
     |            on a.start_center_code = b.start_center_code
     |           and a.end_center_code = b.end_center_code
     |           and a.car_mark = b.car_mark) tab
     |  left join temp_upcar_monitor_orgdim m1
     |    on tab.start_center_code = m1.org_code
     |  left join temp_upcar_monitor_orgdim m2
     |    on tab.end_center_code = m2.org_code
     |  left join temp_upcar_monitor_orgdim m3
     |    on tab.center_code = m3.org_code
     |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_now_car_details")

sql =
  s"""
     |select '${batchTime}' rpt_date,
     |       region_code,
     |       region_name,
     |       start_center_code,
     |       start_center_name,
     |       end_center_code,
     |       end_center_name,
     |       car_mark,
     |       cast(sum(aboard_num) as decimal(12,4)) aboard_num,
     |       cast(sum(ticket_num) as decimal(12)) ticket_num,
     |       cast(sum(fault_num) as decimal(12)) fault_num,
     |       car_type,
     |       run_mode,
     |       car_plate,
     |       route,
     |       center_code,
     |       center_name,
     |       cast(sum(center_ticket_num) as decimal(12)) center_ticket_num,
     |       center_car_type,
     |       start_car_time,
     |       should_start_time,
     |       late_status,
     |       truck_Type_New as truck_type
     |  from temp_now_car_details
     | group by region_code,
     |       region_name,
     |       start_center_code,
     |       start_center_name,
     |       end_center_code,
     |       end_center_name,
     |       car_mark,
     |       car_type,
     |       run_mode,
     |       car_plate,
     |       route,
     |       center_code,
     |       center_name,
     |       center_car_type,
     |       start_car_time,
     |       should_start_time,
     |       late_status,
     |       truck_Type_New
     |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_car_three")

val sql_dfc =
  s"""
     |select *
     | from (select waybillno,
     |              orgcode,
     |              date_format(createtime,'yyyy-MM-dd HH:mm:ss') createtime,
     |              status,
     |              auxopcode,
     |              ROW_NUMBER() OVER(PARTITION BY waybillno,orgcode order by createtime desc,uploadtime desc) rk
     |         from rtorc.t_exp_op_record_truck
     |        where dt >= regexp_replace('${date_t_2}','-','')
     |          and opcode = '140'
     |       ) a
     | where rk=1
     |   and status>'0'
     |   and auxopcode <> 'DELETE'
     |""".stripMargin
println(sql_dfc)
spark.sql(sql_dfc).createOrReplaceTempView("temp_exp_op_record_truck")

sql =
  """
    |select
    |     ta.rpt_date
    |    ,ta.region_code
    |    ,ta.region_name
    |    ,ta.start_center_code
    |    ,ta.start_center_name
    |    ,ta.end_center_code
    |    ,ta.end_center_name
    |    ,ta.car_mark
    |    ,coalesce(tb.aboard_num,tc.target_weight,0) as should_num
    |    ,ta.aboard_num
    |    ,ta.ticket_num
    |    ,ta.fault_num
    |    ,ta.car_type
    |    ,ta.car_plate
    |    ,ta.route
    |    ,ta.center_code
    |    ,ta.center_name
    |    ,ta.center_ticket_num
    |    ,ta.center_car_type
    |    ,case when td.createtime is not null and td.createtime <> '' and td.createtime < ta.start_car_time and size(split(ta.route,'-')) > 2
    |          then td.createtime
    |          else ta.start_car_time
    |      end start_car_time
    |    ,ta.should_start_time
    |    ,ta.late_status
    |    ,ta.truck_type
    |    ,case when coalesce(tb.aboard_num,tc.target_weight,0)=0 then 0 else ta.aboard_num*1.0/coalesce(tb.aboard_num,tc.target_weight,0) end as loading_rate -- 装载率
    |    ,ta.run_mode
    |from temp_t_lkn_upcar_monitor_base_car_three ta
    |left join temp_t_upcar_monitor_sameline_sametype_should_numb tb
    |       on ta.route=tb.route and ta.truck_type=tb.truck_type
    |left join temp_line_target tc
    |       on ta.start_center_code=tc.source_center_code and ta.end_center_code=tc.des_center_code and ta.truck_type=tc.car_type
    |left join temp_exp_op_record_truck td
    |       on ta.car_mark = td.waybillno
    |      and ta.start_center_code = td.orgcode
    |""".stripMargin
println(sql)
// spark.sql(sql).createOrReplaceTempView("temp_upcar_monitor_car_details")


val df = spark.sql(sql).repartition(100).persist(StorageLevel.MEMORY_ONLY_SER)
df.write.mode(saveMode = "append").format("jdbc")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("url", "jdbc:mysql://tidb1.prd.db:3390/ytrpt?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8")
  .option("useSSL", false)
  .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 10000)
  .option("dbtable", "upcar_monitor_car_details") // database name and table name here
  .option("isolationLevel", "NONE") // set isolationLevel to NONE
  .option("user", "root") // TiDB user here
  .save()

//    // 9点批次写入日维度表中
//    DateCollectTotal.saveCarDay(spark, batchTime)

}

def getCenterTotal(spark: SparkSession, batchTime: String) = {
// 目的中心层级汇总 全部,已发车,待发车
var sql =
“”"
|select region_code,
| region_name,
| start_center_code,
| start_center_name,
| end_center_code,
| end_center_name,
| sum(center_ticket_num) car_num,
| sum(case when center_car_type = ‘一致’ then center_ticket_num else 0 end) unanimous_num,
| sum(case when center_car_type = ‘错装’ then center_ticket_num else 0 end) fault_num,
| sum(case when center_car_type = ‘临时路由’ then center_ticket_num else 0 end) temporary_num,
| sum(case when center_car_type = ‘其他’ then center_ticket_num else 0 end) other_num
| from temp_t_lkn_upcar_monitor_base_car_three
| group by region_code,
| region_name,
| start_center_code,
| start_center_name,
| end_center_code,
| end_center_name
|“”".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView(“temp_t_lkn_upcar_monitor_base_center”)

// 统计车签数量
sql =
  """
    |select start_center_code,
    |       end_center_code,
    |       count(car_mark) use_car_num
    |  from temp_t_lkn_upcar_monitor_base_car
    | group by start_center_code,
    |          end_center_code
    |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_carnum")

// 匹配车签数量
sql =
  """
    |select a.*,
    |       b.use_car_num
    |  from temp_t_lkn_upcar_monitor_base_center a
    |  join temp_t_lkn_upcar_monitor_base_center_carnum b
    |    on a.start_center_code=b.start_center_code
    |   and a.end_center_code=b.end_center_code
    |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_withnum")

// 始发中心层级汇总
sql =
  """
    |select region_code,
    |       region_name,
    |       start_center_code,
    |       start_center_name,
    |       sum(car_num) car_num,
    |       sum(unanimous_num) unanimous_num,
    |       sum(fault_num) fault_num,
    |       sum(temporary_num) temporary_num,
    |       sum(other_num) other_num,
    |       sum(use_car_num) use_car_num
    |  from temp_t_lkn_upcar_monitor_base_center_withnum
    |group by region_code,
    |         region_name,
    |         start_center_code,
    |         start_center_name
    |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_one")

// 目的始发汇总整合
sql =
  """
    |select region_code,
    |       region_name,
    |       start_center_code,
    |       start_center_name,
    |       end_center_code,
    |       end_center_name,
    |       car_num,
    |       unanimous_num,
    |       fault_num,
    |       temporary_num,
    |       other_num,
    |       '目的中心' center_type,
    |       use_car_num,
    |       '全部' car_type
    |  from temp_t_lkn_upcar_monitor_base_center_withnum
    |union all
    |select region_code,
    |       region_name,
    |       start_center_code,
    |       start_center_name,
    |       null end_center_code,
    |       null end_center_name,
    |       car_num,
    |       unanimous_num,
    |       fault_num,
    |       temporary_num,
    |       other_num,
    |       '始发中心' center_type,
    |       use_car_num,
    |       '全部' car_type
    |  from temp_t_lkn_upcar_monitor_base_center_one
    |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_two")

// 目的中心层级汇总 已发车
sql =
  """
    |select region_code,
    |       region_name,
    |       start_center_code,
    |       start_center_name,
    |       end_center_code,
    |       end_center_name,
    |       sum(center_ticket_num) car_num,
    |       sum(case when center_car_type='一致' then center_ticket_num else 0 end) unanimous_num,
    |       sum(case when center_car_type='错装' then center_ticket_num else 0 end) fault_num,
    |       sum(case when center_car_type='临时路由' then center_ticket_num else 0 end) temporary_num,
    |       sum(case when center_car_type='其他' then center_ticket_num else 0 end) other_num
    |  from temp_t_lkn_upcar_monitor_base_car_three
    | where car_type='已发车'
    |group by region_code,
    |         region_name,
    |         start_center_code,
    |         start_center_name,
    |         end_center_code,
    |         end_center_name
    |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_yifa")

// 统计车签数量
sql =
  """
    |select start_center_code,
    |       end_center_code,
    |       count(car_mark) use_car_num
    |  from temp_t_lkn_upcar_monitor_base_car
    | where car_type='已发车'
    | group by start_center_code,
    |          end_center_code
    |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_carnum_yifa")

// 匹配车签数量
sql =
  """
    |select a.*,
    |       b.use_car_num
    |  from temp_t_lkn_upcar_monitor_base_center_yifa a
    |  join temp_t_lkn_upcar_monitor_base_center_carnum_yifa b
    |    on a.start_center_code=b.start_center_code
    |   and a.end_center_code=b.end_center_code
    |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_withnum_yifa")

// 始发中心层级汇总
sql =
  """
    |select region_code,
    |       region_name,
    |       start_center_code,
    |       start_center_name,
    |       sum(car_num) car_num,
    |       sum(unanimous_num) unanimous_num,
    |       sum(fault_num) fault_num,
    |       sum(temporary_num) temporary_num,
    |       sum(other_num) other_num,
    |       sum(use_car_num) use_car_num
    |  from temp_t_lkn_upcar_monitor_base_center_withnum_yifa
    |group by region_code,
    |         region_name,
    |         start_center_code,
    |         start_center_name
    |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_one_yifa")

// 目的始发汇总整合
sql =
  """
    |select region_code,
    |       region_name,
    |       start_center_code,
    |       start_center_name,
    |       end_center_code,
    |       end_center_name,
    |       car_num,
    |       unanimous_num,
    |       fault_num,
    |       temporary_num,
    |       other_num,
    |       '目的中心' center_type,
    |       use_car_num,
    |       '已发车' car_type
    |  from temp_t_lkn_upcar_monitor_base_center_withnum_yifa
    |union all
    |select region_code,
    |       region_name,
    |       start_center_code,
    |       start_center_name,
    |       null end_center_code,
    |       null end_center_name,
    |       car_num,
    |       unanimous_num,
    |       fault_num,
    |       temporary_num,
    |       other_num,
    |       '始发中心' center_type,
    |       use_car_num,
    |       '已发车' car_type
    |  from temp_t_lkn_upcar_monitor_base_center_one_yifa
    |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_two_yifa")

// 目的中心层级汇总  待发车
sql =
  """
    |select region_code,
    |       region_name,
    |       start_center_code,
    |       start_center_name,
    |       end_center_code,
    |       end_center_name,
    |       sum(center_ticket_num) car_num,
    |       sum(case when center_car_type='一致' then center_ticket_num else 0 end) unanimous_num,
    |       sum(case when center_car_type='错装' then center_ticket_num else 0 end) fault_num,
    |       sum(case when center_car_type='临时路由' then center_ticket_num else 0 end) temporary_num,
    |       sum(case when center_car_type='其他' then center_ticket_num else 0 end) other_num
    |  from temp_t_lkn_upcar_monitor_base_car_three
    | where car_type='待发车'
    |group by region_code,
    |         region_name,
    |         start_center_code,
    |         start_center_name,
    |         end_center_code,
    |         end_center_name
    |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_daifa")

// 统计车签数量
sql =
  """
    |select start_center_code,
    |       end_center_code,
    |       count(car_mark) use_car_num
    |  from temp_t_lkn_upcar_monitor_base_car
    | where car_type='待发车'
    | group by start_center_code,
    |          end_center_code
    |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_carnum_daifa")

// 匹配车签数量
sql =
  """
    |select a.*,
    |       b.use_car_num
    |  from temp_t_lkn_upcar_monitor_base_center_daifa a
    |  join temp_t_lkn_upcar_monitor_base_center_carnum_daifa b
    |    on a.start_center_code=b.start_center_code
    |   and a.end_center_code=b.end_center_code
    |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_withnum_daifa")

// 始发中心层级汇总
sql =
  """
    |select region_code,
    |       region_name,
    |       start_center_code,
    |       start_center_name,
    |       sum(car_num) car_num,
    |       sum(unanimous_num) unanimous_num,
    |       sum(fault_num) fault_num,
    |       sum(temporary_num) temporary_num,
    |       sum(other_num) other_num,
    |       sum(use_car_num) use_car_num
    |  from temp_t_lkn_upcar_monitor_base_center_withnum_daifa
    |group by region_code,
    |         region_name,
    |         start_center_code,
    |         start_center_name
    |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_one_daifa")

// 目的始发汇总整合
sql =
  """
    |select region_code,
    |       region_name,
    |       start_center_code,
    |       start_center_name,
    |       end_center_code,
    |       end_center_name,
    |       car_num,
    |       unanimous_num,
    |       fault_num,
    |       temporary_num,
    |       other_num,
    |       '目的中心' center_type,
    |       use_car_num,
    |       '待发车' car_type
    |  from temp_t_lkn_upcar_monitor_base_center_withnum_daifa
    |union all
    |select region_code,
    |       region_name,
    |       start_center_code,
    |       start_center_name,
    |       null end_center_code,
    |       null end_center_name,
    |       car_num,
    |       unanimous_num,
    |       fault_num,
    |       temporary_num,
    |       other_num,
    |       '始发中心' center_type,
    |       use_car_num,
    |       '待发车' car_type
    |  from temp_t_lkn_upcar_monitor_base_center_one_daifa
    |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_two_daifa")

// 三种状态数据整合
sql =
  """
    |select *
    |  from temp_t_lkn_upcar_monitor_base_center_two
    |union all
    |select *
    |  from temp_t_lkn_upcar_monitor_base_center_two_yifa
    |union all
    |select *
    |  from temp_t_lkn_upcar_monitor_base_center_two_daifa
    |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_two_all")

// 中心层级落地
sql =
  """
    |select region_code,         --省区code
    |       region_name,         --省区name
    |       start_center_code,   --始发中心code
    |       start_center_name,   --始发中心name
    |       end_center_code,     --目的中心code
    |       end_center_name,     --目的中心name
    |       car_num,             --上车票量
    |       unanimous_num,       --一致票量
    |       fault_num,           --错装票量
    |       temporary_num,       --临时路由
    |       other_num,           --其他
    |       center_type,         --始发中心,目的中心
    |       use_car_num,        --用车辆
    |       car_type            --类型:全部,已发车,待发车
    |  from temp_t_lkn_upcar_monitor_base_center_two_all
    |""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_now_center_details")

sql =
  s"""
     |select '${batchTime}' rpt_date,
     |       region_code,
     |       region_name,
     |       start_center_code,
     |       start_center_name,
     |       end_center_code,
     |       end_center_name,
     |       cast(sum(car_num) as decimal(12)) car_num,
     |       cast(sum(unanimous_num) as decimal(12)) unanimous_num,
     |       cast(sum(fault_num) as decimal(12)) fault_num,
     |       cast(sum(temporary_num) as decimal(12)) temporary_num,
     |       cast(sum(other_num) as decimal(12)) other_num,
     |       center_type,
     |       cast(sum(use_car_num) as decimal(12)) use_car_num,
     |       car_type
     |  from temp_now_center_details t
     | group by region_code,
     |       region_name,
     |       start_center_code,
     |       start_center_name,
     |       end_center_code,
     |       end_center_name,
     |       center_type,
     |       car_type
     |""".stripMargin

println(sql)
//    val df = spark.sql(sql).persist(StorageLevel.MEMORY_ONLY_SER)
//    df.createOrReplaceTempView("upcar_monitor_center_details_temp")
spark.sql(sql).write.mode(saveMode = "append").format("jdbc")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("url", "jdbc:mysql://tidb1.prd.db:3390/ytrpt?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8")
  .option("useSSL", false)
  .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 10000)
  .option("dbtable", "upcar_monitor_center_details") // database name and table name here
  .option("isolationLevel", "NONE") // set isolationLevel to NONE
  .option("user", "root") // TiDB user here
  .save()

//    // 9点批次写入日维度表中
//    DateCollectTotal.saveCenterDay(spark, batchTime)

// 始发省区层级汇总,  始发省区落地 全国、率值字段后端计算
sql =
  """
    |select region_code,
    |       region_name,
    |       sum(car_num) car_num,
    |       sum(unanimous_num) unanimous_num,
    |       sum(fault_num) fault_num,
    |       sum(temporary_num) temporary_num,
    |       sum(other_num) other_num,
    |       car_type
    |  from temp_t_lkn_upcar_monitor_base_center_two_all
    | where center_type='始发中心'
    | group by region_code,
    |          region_name,
    |          car_type
    |""".stripMargin
spark.sql(sql).createOrReplaceTempView("temp_now_region_details")


sql =
  s"""
     |select '${batchTime}' rpt_date,
     |       region_code,
     |       region_name,
     |       cast(sum(car_num) as decimal(12)) car_num,
     |       cast(sum(unanimous_num) as decimal(12)) unanimous_num,
     |       cast(sum(fault_num) as decimal(12)) fault_num,
     |       cast(sum(temporary_num) as decimal(12)) temporary_num,
     |       cast(sum(other_num) as decimal(12)) other_num,
     |       car_type
     |  from temp_now_region_details tab
     |group by region_code,
     |         region_name,
     |         car_type
     |""".stripMargin

println(sql)
//    val df2 = spark.sql(sql).persist(StorageLevel.MEMORY_ONLY_SER)
//    df2.createOrReplaceTempView("upcar_monitor_region_details_temp")
spark.sql(sql).write.mode(saveMode = "append").format("jdbc")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("url", "jdbc:mysql://tidb1.prd.db:3390/ytrpt?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8")
  .option("useSSL", false)
  .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 10000)
  .option("dbtable", "upcar_monitor_region_details") // database name and table name here
  .option("isolationLevel", "NONE") // set isolationLevel to NONE
  .option("user", "root") // TiDB user here
  .save()

//    // 9点批次写入日维度表中
//    DateCollectTotal.saveRegionDay(spark, batchTime)


//------  “待发-晚发”逻辑  by sgp at 2022-09-22

sql =
  """
    |select region_code,
    |       region_name,
    |       start_center_code,
    |       start_center_name,
    |       end_center_code,
    |       end_center_name,
    |       sum(center_ticket_num) car_num,
    |       sum(case when center_car_type='一致' then center_ticket_num else 0 end) unanimous_num,
    |       sum(case when center_car_type='错装' then center_ticket_num else 0 end) fault_num,
    |       sum(case when center_car_type='临时路由' then center_ticket_num else 0 end) temporary_num,
    |       sum(case when center_car_type='其他' then center_ticket_num else 0 end) other_num
    |  from temp_t_lkn_upcar_monitor_base_car_three
    | where car_type='待发车'
    |    and late_status='1'
    |group by region_code,
    |         region_name,
    |         start_center_code,
    |         start_center_name,
    |         end_center_code,
    |         end_center_name
    |""".stripMargin
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_daifa_wanfa")

sql =
  """
    |select ta.start_center_code,
    |       ta.end_center_code,
    |       count(ta.car_mark) use_car_num
    |from
    |(
    |    select * from temp_t_lkn_upcar_monitor_base_car
    |    where car_type='待发车'
    |) ta inner join (
    |    select car_mark,max(late_status) as late_status from temp_now_car_details
    |    where late_status='1'
    |    group by car_mark
    |) tb on ta.car_mark=tb.car_mark
    |group by start_center_code
    |    ,end_center_code
    |""".stripMargin
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_carnum_daifa_wanfa")


sql =
  """
    |select a.*,
    |       b.use_car_num
    |  from temp_t_lkn_upcar_monitor_base_center_daifa_wanfa a
    |  join temp_t_lkn_upcar_monitor_base_center_carnum_daifa_wanfa b
    |    on a.start_center_code=b.start_center_code
    |   and a.end_center_code=b.end_center_code
    |""".stripMargin
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_withnum_daifa_wanfa")

sql =
  """
    |select region_code,
    |       region_name,
    |       start_center_code,
    |       start_center_name,
    |       sum(car_num) car_num,
    |       sum(unanimous_num) unanimous_num,
    |       sum(fault_num) fault_num,
    |       sum(temporary_num) temporary_num,
    |       sum(other_num) other_num,
    |       sum(use_car_num) use_car_num
    |  from temp_t_lkn_upcar_monitor_base_center_withnum_daifa_wanfa
    |group by region_code,
    |         region_name,
    |         start_center_code,
    |         start_center_name
    |""".stripMargin
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_one_daifa_wanfa")

sql =
  """
    |select region_code,
    |       region_name,
    |       start_center_code,
    |       start_center_name,
    |       end_center_code,
    |       end_center_name,
    |       car_num,
    |       unanimous_num,
    |       fault_num,
    |       temporary_num,
    |       other_num,
    |       '目的中心' center_type,
    |       use_car_num,
    |       '待发车' car_type
    |  from temp_t_lkn_upcar_monitor_base_center_withnum_daifa_wanfa
    |union all
    |select region_code,
    |       region_name,
    |       start_center_code,
    |       start_center_name,
    |       null end_center_code,
    |       null end_center_name,
    |       car_num,
    |       unanimous_num,
    |       fault_num,
    |       temporary_num,
    |       other_num,
    |       '始发中心' center_type,
    |       use_car_num,
    |       '待发车' car_type
    |  from temp_t_lkn_upcar_monitor_base_center_one_daifa_wanfa
    |""".stripMargin
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_two_daifa_wanfa")

sql =
  """
  
    |  from temp_t_lkn_upcar_monitor_base_center_two_daifa_wanfa
    |""".stripMargin
spark.sql(sql).persist(StorageLevel.MEMORY_ONLY_SER).createOrReplaceTempView("temp_now_center_details_wanfa")

sql =
  s"""
     |select '${batchTime}' rpt_date,
     |       region_code,
     |       region_name,
     |       start_center_code,
     |       start_center_name,
     |       end_center_code,
     |       end_center_name,
     |       cast(sum(car_num) as decimal(12)) car_num,
     |       cast(sum(unanimous_num) as decimal(12)) unanimous_num,
     |       cast(sum(fault_num) as decimal(12)) fault_num,
     |       cast(sum(temporary_num) as decimal(12)) temporary_num,
     |       cast(sum(other_num) as decimal(12)) other_num,
     |       center_type,
     |       cast(sum(use_car_num) as decimal(12)) use_car_num,
     |       car_type
     |  from temp_now_center_details_wanfa t
     | group by region_code,
     |       region_name,
     |       start_center_code,
     |       start_center_name,
     |       end_center_code,
     |       end_center_name,
     |       center_type,
     |       car_type
     |""".stripMargin
println(sql)
spark.sql(sql).write.mode(saveMode = "append").format("jdbc")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("url", "jdbc:mysql://ip:3390/ytrpt?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8")
  .option("useSSL", false)
  .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 10000)
  .option("dbtable", "upcar_monitor_center_details_late") // database name and table name here
  .option("isolationLevel", "NONE") // set isolationLevel to NONE
  .option("user", "root") // TiDB user here
  .save()


sql =
  s"""
     |select
     |       '${batchTime}' rpt_date,
     |       region_code,
     |       region_name,
     |       sum(car_num) car_num,
     |       sum(unanimous_num) unanimous_num,
     |       sum(fault_num) fault_num,
     |       sum(temporary_num) temporary_num,
     |       sum(other_num) other_num,
     |       car_type
     |  from temp_now_center_details_wanfa
     | where center_type='始发中心'
     | group by region_code,
     |          region_name,
     |          car_type
     |""".stripMargin
println(sql)
spark.sql(sql).write.mode(saveMode = "append").format("jdbc")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("url", "jdbc:mysql://ip:3390/ytrpt?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8")
  .option("useSSL", false)
  .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 10000)
  .option("dbtable", "upcar_monitor_region_details_late") // database name and table name here
  .option("isolationLevel", "NONE") // set isolationLevel to NONE
  .option("user", "root") // TiDB user here
  .save()

}

def getCarPeriod(spark: SparkSession, batchTime: String) = {
val sql1 =
“”"
|select
| containerNo
| ,orgCode
| ,hour_
| ,tickects
| ,weight
| ,first_value(hour_) over(partition by containerNo,orgCode order by hour_) as first_hour
| ,predictDepartTime
|from (
| select
| containerNo
| ,orgCode
| ,date_format(createTime,‘yyyy-MM-dd HH’) as hour_
| ,count(waybillNo) as tickects
| ,sum(weight) as weight
| ,max(predictDepartTime) as predictDepartTime
| from (
| select container_No containerNo,org_Code orgCode,create_Time createTime,waybill_No waybillNo,predict_Depart_Time predictDepartTime
| ,weight
| ,row_number() over(partition by container_No,org_Code,waybill_No order by create_Time desc) as sn
| from upcar_monitor_detail_temp
| ) tab where sn = 1
| group by containerNo,orgCode,date_format(createTime,‘yyyy-MM-dd HH’)
|) tab2
|“”".stripMargin

println("sql is :" + sql1)

spark.sql(sql1).createOrReplaceTempView("temp_middle_result_20220815")

// 生成各车签、中心,8+1个段数据
val sql2 =
  s"""
     |select
     |	containerNo
     |	,orgCode
     |	,from_unixtime(unix_timestamp(cast(concat(first_hour,':00:00') as timestamp))+3600*hour_num,'yyyy-MM-dd HH') as hour_per
     | ,SUBSTRING(from_unixtime(unix_timestamp(cast(concat(first_hour,':00:00') as timestamp))+3600*(hour_num+1),'yyyy-MM-dd HH'),12,2) as end_hour
     | ,hour_num
     |from (
     |	select
     |		containerNo
     |		,orgCode
     |		,first_hour
     |		,cast(hour_num as bigint) as hour_num
     |	from (SELECT explode(split('0,1,2,3,4,5,6,7,8,9,10,11',',')) AS hour_num) tab1
     |	cross join (select containerNo,orgCode,first_hour from temp_middle_result_20220815 group by containerNo,orgCode,first_hour)
     |) tab
     |""".stripMargin
spark.sql(sql2).createOrReplaceTempView("temp_middle_result2_20220815")
println("sql2 is : " + sql2)

/*val dataframe2 = spark.sql("select * from temp_middle_result2_20220815").limit(10)
dataframe2.collect().map(ele=>{
  logger.info("row is:"+ele.getString(0)+"|"+ele.getString(1)+"|"+ele.getString(2)+"|"+ele.getString(3)+"|"+ele.getLong(4))
})*/

val sql3 =
  """
    |--- 0~7 时段
    |select
    |	ta.containerNo
    |	,ta.orgCode
    |	,concat(ta.hour_per,':00-',ta.end_hour,':00') as hour_per
    |	,tb.tickects
    | ,tb.weight
    | ,ta.hour_num+1 as order_num
    | ,tb.predictDepartTime
    |from temp_middle_result2_20220815 ta  left join temp_middle_result_20220815 tb
    |on ta.containerNo=tb.containerNo
    |and ta.orgCode=tb.orgCode
    |and ta.hour_per=tb.hour_
    |union all
    |--- > 13 时段
    |select ta.containerNo
    |	,ta.orgCode
    |	,'其他' as hour_per
    |	,sum(ta.tickects) as tickects
    | ,sum(ta.weight) as weight
    | ,'13' as order_num
    | ,max(ta.predictDepartTime) as predictDepartTime
    |from temp_middle_result_20220815 ta  left join temp_middle_result2_20220815 tb
    |on ta.containerNo=tb.containerNo
    |and ta.orgCode=tb.orgCode
    |and ta.hour_=tb.hour_per
    |where tb.hour_per is null
    |group by ta.containerNo
    |	,ta.orgCode
    |""".stripMargin

spark.sql(sql3).createOrReplaceTempView("temp_middle_result3_20220815")
println("sql3 is : " + sql3)

// spark.sql("select * from temp_middle_result3_20220815").show(1)

// val dataFrame = spark.sql("select count(1) from temp_middle_result3_20220815")
// logger.error("sgp say datacount is : " + d )

val sql4 =
  s"""
     |select
     | '${batchTime}' as rpt_date
     | ,ta.orgCode     as  center_code
     |	,tc.org_name    as  center_name
     |	,ta.containerNo as  car_mark
     |	,ta.hour_per    as  period
     |	,ta.tickects    as ticket_num
     | ,ta.order_num   as order_numb
     | ,tb.car_type      as car_type
     | ,ta.predictDepartTime               as plan_depart_time
     | ,from_unixtime(unix_timestamp())    as create_time
     | ,td.late_status
     | ,ta.weight      as weight
     |from temp_middle_result3_20220815 ta left join
     |(
     |select car_no,status,car_type,ROW_NUMBER() over(partition by car_no order by create_time desc) as sn from tidb_ytrpt.upcar_monitor_carno
     |) tb
     |on ta.containerNo=tb.car_no and tb.sn=1
     |left join
     |(
     |   select org_code,max(org_name) as org_name from (
     |     select org_code,org_name,row_number() over(partition by org_code order by dt desc) as sn
     |     from dim.t03_user_org_mdm_org
     |     where dt >= date_format(date_add(current_date,-2),'yyyyMMdd')
     |   ) tab
     |   group by org_code
     |) tc
     |on ta.orgCode=tc.org_code
     |left join (
     |    select car_mark,max(late_status) as late_status from temp_now_car_details group by car_mark
     |) td on ta.containerNo=td.car_mark
     |""".stripMargin

println("sql4 is :" + sql4)



// 写实时表
spark.sql(sql4).write.mode(saveMode = "append").format("jdbc")
  .option("driver", "com.mysql.jdbc.Driver")
  // 生产
  .option("url", "jdbc:mysql://ip:3390/ytrpt?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8")
  // 测试
  //.option("url", "jdbc:mysql://tidb:4000/ytrpt?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8")
  //      .option("url", "jdbc:mysql://ip:4000/test?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8")
  .option("useSSL", false)
  .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 10000)
  .option("dbtable", "upcar_monitor_car_details_period") // database name and table name here
  .option("isolationLevel", "NONE") // set isolationLevel to NONE
  .option("user", "root") // TiDB user here
  .save()

}

}

代码有敏感信息,记得删一下。你的ip啥的都在里面。。

从这个堆栈上看,只有这个像是你的代码,其他都是依赖包里面的东西。

嗯,已经删除,谢谢提醒