【 TiDB 使用环境】生产环境
【 TiDB 版本】v5.2.1
【复现路径】spark版本是2.4,用tispark读tidb,偶尔会报Exception in thread "main"org-apachespark.sql.Analysisxception: Table or view not found:
【遇到的问题:问题现象及影响】读取失败,任务会抛异常
【资源配置】进入到 TiDB Dashboard -集群信息 (Cluster Info) -主机(Hosts) 截图此页面
【附件:截图/日志/监控】
如果是“偶尔”,会不会可能是超时或者中断了!
直接用 sql client 去读呢?
偶尔发生这种情况
部署环境的spark目录下conf里hive-site.xml文件没有更新,导致找不到数据库。
将hive的conf目录中的hive-site.xml拷贝到spark目录下conf里试一下。
这个是tispark直接连的tidb,不是用spark访问hive。
tispark报错的相关代码是用scala写的看不大懂,你可以自己去研究下
https://github.com/pingcap/tispark/blob/e17587c7c74b93878f67be70ea7523fe635ddbd6/core/src/test/scala/com/pingcap/tispark/insert/InsertSuite.scala#L19
https://github.com/pingcap/tispark/blob/e17587c7c74b93878f67be70ea7523fe635ddbd6/core/src/test/scala/com/pingcap/tispark/datasource/ExceptionTestSuite.scala#L25
谢谢,现在还在tisaprk读取数据是节点报的Exception in thread “main” org.apache.spark.sql.AnalysisException: Table or view not found
程序里有使用过spark.sql.runSQLOnFiles 这个参数吗?
还有更具体的错误信息么?
最好把代码也贴出来。。。
Exception in thread “main” org.apache.spark.sql.AnalysisException: Table or view not found: tidb_rpt
.no_built_package_details
; line 2 pos 67;
'Project [unresolvedalias('date_format('max('rpt_date), yyyy-MM-dd HH:mm:ss), None)]
± 'Filter ('rpt_date >= 2023-10-30)
± 'UnresolvedRelation tidb_rpt
.no_built_package_details
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:90)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:108)
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
at com.yto56.daichaibao.testcase.tpch.DaichaobaoTestLogic$.achive(DaichaobaoTestLogic.scala:37)
at com.yto56.daichaibao.DaichaibaoTest$.main(DaichaibaoTest.scala:50)
at com.yto56.daichaibao.DaichaibaoTest.main(DaichaibaoTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
没有用到spark.sql.runSQLOn
代码是怎么写的。。。使用spark-sql前有切换tidb的catalog吗?
发生这个错误之前,是否有某个tikv的重启?
没有收到tikv重启的报警
import package cn.net.dfo.dw.driver
import cn.net.dfo.dw.utils.DateHelper
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.storage.StorageLevel
import org.slf4j.{Logger, LoggerFactory}
object BatchCollectTotal {
val log: Logger = LoggerFactory.getLogger(this.getClass.getName)
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().
//setIfMissing("spark.master", "local[*]").
setIfMissing("spark.app.name", getClass.getName).
setIfMissing("spark.driver.allowMultipleContexts", "true").
setIfMissing("spark.sql.extensions", "org.apache.spark.sql.TiExtensions").
// 生产
setIfMissing("spark.tispark.pd.addresses", "ip:2379").
// 测试
// setIfMissing("spark.tispark.pd.addresses", "ip:2379").
setIfMissing("spark.driver.maxResultSize", "16g").
setIfMissing("spark.debug.maxToStringFields", "150").
// 生产
setIfMissing("spark.tispark.tidb.addr", "tidb1.prd.db").
setIfMissing("spark.tispark.tidb.port", "3390").
// 测试
// setIfMissing("spark.tispark.tidb.addr", "ip.122").
// setIfMissing("spark.tispark.tidb.port", "4000").
//如果要同时操作hive和tidb,加上enableHiveSupport方法。
// 如果有重名库 要操作Tidb表要加上前缀已区分是hive还是tidb的表
setIfMissing("spark.tispark.write.allow_spark_sql", "true").
setIfMissing("spark.tispark.db_prefix", "tidb_").
setIfMissing("spark.sql.crossJoin.enabled", "true").
setIfMissing("spark.tispark.use.tiflash", "true")
val spark = SparkSession.builder.config(sparkConf).getOrCreate()
var batchTime = ""
if (args.size == 2) {
batchTime = args(0) + " " + args(1) //yyyy-MM-dd HH:mm:ss
println("传入参数: " + batchTime)
} else {
batchTime = DateHelper.getCurrTimeSegment(10) //yyyy-MM-dd HH:mm:ss
println("默认参数:" + batchTime)
}
val historyTime = DateHelper.beforeMinute(batchTime, "yyyy-MM-dd HH:mm:ss", 10)
// T 日
val dateStr = batchTime.substring(0, 10)
// T-5 日
val date_t_5 = DateHelper.defineDayBefore(dateStr, -5, "yyyy-MM-dd")
println("batchTime => " + batchTime)
println("historyTime => " + historyTime)
try {
//
// deleteData(historyTime)
// (mdm_org_temp)
println(">>>>>>>>>>>>>>>>>>>>>>>>>>> ")
basicData(spark, batchTime)
getCarPeriod(spark, batchTime)
} catch {
case e: Exception =>
e.printStackTrace()
log.error(e.getMessage)
} finally {
spark.close()
System.exit(0)
}
}
def basicData(spark: SparkSession, batchTime: String) = {
val firstThreeDays = DateHelper.defineDayBefore(batchTime.substring(0, 10).replaceAll(“-”, “”), -3, “yyyyMMdd”)
// 维度表
var sql =
s"“”
|select org_code,
| org_name,
| region_code,
| region_name,
| transfer_code,
| transfer_name,
| transfer_prov_code,
| transfer_prov_name
| from (select org_code,
|
| else
| region_name
| end region_name,
| transfer_code,
| transfer_name,
| transfer_prov_code,
| transfer_prov_name,
| row_number() over(partition by org_code order by dt desc) rk
| from dim.t03_user_org_mdm_org
| where dt >= ‘${firstThreeDays}’) m
| where rk = ‘1’
|“”".stripMargin
/var sql =
“”"
|select sub_department_code org_code,
| sub_department_name org_name,
| region_code region_code,
| region_name region_name,
| transfer_code transfer_code,
| transfer_name transfer_name
| from default.dw_dim_dfo_org_trans
|“”".stripMargin/
println(sql)
spark.sql(sql).persist(StorageLevel.MEMORY_ONLY_SER).createOrReplaceTempView(“temp_upcar_monitor_orgdim”)
// spark.sql(“select * from temp_upcar_monitor_orgdim limit 10”).show()
sql =
s"""
|select /** BROADCAST(t) */ * from tidb_rpt.upcar_monitor_carno t
|""".stripMargin
spark.sql(sql).persist(StorageLevel.MEMORY_ONLY_SER).createOrReplaceTempView("temp_upcar_monitor_carno")
// spark.sql("select * from temp_upcar_monitor_carno limit 10").show()
}
def firstBatchFlag(historyTime: String) = {
val beginBatchTime = historyTime.split(" ")(1)
“09:00:00”.equals(beginBatchTime)
}
/def deleteData(historyTime: String) = {
if (“00:00:00”.equals(historyTime.split(" “)(1))) {
val date = DateHelper.defineDayBefore(historyTime, -30, “yyyy-MM-dd”)
@transient val conn = TidbClusterPool.getConnection
val stmt = conn.createStatement()
val sql = s"delete from tidb_ytrpt.upcar_monitor_carno where create_time<‘${date}’”
println(sql)
stmt.execute(sql)
}
}/
def getTotalDetail(spark: SparkSession, batchTime: String) = {
val dateStr = batchTime.substring(0, 10)
var ctStartDate = ""
// var ctEndDate = ""
var initDate = ""
if (DateHelper.theNextDay(batchTime) > 0) {
initDate = DateHelper.defineDayBefore(dateStr, -2, "yyyy-MM-dd") + " 09:00:00"
ctStartDate = DateHelper.defineDayBefore(dateStr, -1, "yyyy-MM-dd") + " 09:00:00"
// ctEndDate = dateStr + " 09:00:00"
} else {
initDate = DateHelper.defineDayBefore(dateStr, -1, "yyyy-MM-dd") + " 09:00:00"
ctStartDate = dateStr + " 09:00:00"
// ctEndDate = DateHelper.defineDayBefore(dateStr, 1, "yyyy-MM-dd") + " 09:00:00"
}
/*val sql =
s"""
|select t.waybillNo waybill_No,
| t.bagNo bag_No,
| t.containerNo container_No,
| t.expType exp_Type,
| t.orgCode org_Code,
| t.createTime create_Time,
| t.weight weight,
| t.createUserCode create_User_Code,
| t.createUserName create_User_Name,
| case when c.line_Name is null or c.line_name ='' then first_value(t.lineName) over(order by t.lineName) else c.line_name end line_Name,
| t.truckNo truck_No,
| t.truckTypeNew truck_Type_New,
| t.transCodeLink trans_Code_Link,
| t.predictDepartTime predict_Depart_Time,
| t.returnFlag return_Flag,
| t.datoubi datoubi,
| t.desRoute des_Route,
| t.desOrgCode des_Org_Code,
| t.masterNodeCode master_Node_Code,
| t.targetWeight target_Weight,
| t.startRegionCode start_Region_Code,
| t.desRegionCode des_Region_Code,
| t.peizaiseoncdOrgCode peizaiseoncd_Org_Code,
| t.peizaisecondRegionCode peizaisecond_Region_Code,
| t.peizaistartOrgCode peizaistart_Org_Code,
| t.peizaistartRegionCode peizaistart_Region_Code,
| t.startProvCode start_Prov_Code,
| t.waybillNob waybill_Nob,
| t.firstNodeCode first_Node_Code,
| t.centerCarType center_Car_Type,
| t.routeLineName route_Line_Name,
| c.car_Type car_Type,
| c.fache_Time fache_Time,
| t.batchTime batch_Time,
| '${batchTime}' rpt_date
| from (
| select *,
| row_number() over(partition by waybillNo,orgCode,containerNo order by createtime desc) rn
| from tidb_ytrpt.upcar_monitor_detail
| where createTime>= '${ctStartDate}' and createTime < '${batchTime}'
|) t left join temp_upcar_monitor_carno c on c.car_no = t.containerNo
|where t.rn = 1
|""".stripMargin*/
/*val sql =
s"""
|select t.waybillNo waybill_No,
| t.bagNo bag_No,
| t.containerNo container_No,
| t.expType exp_Type,
| t.orgCode org_Code,
| t.createTime create_Time,
| t.weight weight,
| t.createUserCode create_User_Code,
| t.createUserName create_User_Name,
| t.line_name line_Name,
| t.truckNo truck_No,
| t.truckTypeNew truck_Type_New,
| t.transCodeLink trans_Code_Link,
| t.predictDepartTime predict_Depart_Time,
| t.returnFlag return_Flag,
| t.datoubi datoubi,
| t.desRoute des_Route,
| t.desOrgCode des_Org_Code,
| t.masterNodeCode master_Node_Code,
| t.targetWeight target_Weight,
| t.startRegionCode start_Region_Code,
| t.desRegionCode des_Region_Code,
| t.peizaiseoncdOrgCode peizaiseoncd_Org_Code,
| t.peizaisecondRegionCode peizaisecond_Region_Code,
| t.peizaistartOrgCode peizaistart_Org_Code,
| t.peizaistartRegionCode peizaistart_Region_Code,
| t.startProvCode start_Prov_Code,
| t.waybillNob waybill_Nob,
| t.firstNodeCode first_Node_Code,
| t.centerCarType center_Car_Type,
| t.routeLineName route_Line_Name,
| t.car_Type car_Type,
| t.fache_Time fache_Time,
| t.batchTime batch_Time,
| '${batchTime}' rpt_date
| from (
| select * from (
| select *,
| row_number() over(partition by waybillNo,orgCode,containerNo order by createtime desc) rn
| from (
| select t1.*, c.*
| from tidb_ytrpt.upcar_monitor_detail t1
| left join (select * from temp_upcar_monitor_carno where create_time<='${batchTime}') c on c.car_no = t1.containerno
| where t1.createTime>= '${ctStartDate}' and t1.createTime < '${batchTime}'
| union all
| select t2.*, c.*
| from tidb_ytrpt.upcar_monitor_detail t2
| join (select * from temp_upcar_monitor_carno where status = 'INVALID' and car_type='待发车') c on c.car_no = t2.containerno
| where t2.createTime>= '${initDate}' and t2.createTime < '${ctStartDate}'
| ) tab
| ) temp where temp.rn = 1 and not exists (select 1 from temp_upcar_monitor_carno c where status = 'INVALID' and c.car_no=temp.containerNo)
|) t
|""".stripMargin
println(sql)
spark.sql(sql).repartition(100).persist(StorageLevel.MEMORY_ONLY_SER).createOrReplaceTempView("upcar_monitor_detail_temp")*/
// 九点批次固化发车状态
if ("09:00:00".equals(batchTime.split(" ", -1)(1))) {
val sql =
s"""
|select '${DateHelper.convertDate(batchTime, "yyyy-MM-dd")}' rpt_date
| ,t.car_no
| ,t.create_time
| ,t.car_type
| ,t.status
| ,t.fache_time
| ,t.line_name
| from temp_upcar_monitor_carno t
|""".stripMargin
spark.sql(sql).write.mode(saveMode = "append").format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://tidb1.prd.db:3390/ytrpt?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8")
.option("useSSL", false)
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 10000)
.option("dbtable", "upcar_monitor_carno_day") // database name and table name here
.option("isolationLevel", "NONE") // set isolationLevel to NONE
.option("user", "root") // TiDB user here
.save()
}
val sql0 =
s"""
|select * from tidb_ytrpt.upcar_monitor_detail
|where createTime>= '${initDate}' and createTime < '${batchTime}'
|""".stripMargin
println(sql0)
// spark.sql(sql).createOrReplaceTempView("upcar_monitor_detail_incomplete")
spark.sql(sql0)
.repartition(256).persist(StorageLevel.MEMORY_ONLY_SER)
.createOrReplaceTempView("upcar_monitor_detail_base_temp")
val sql =
s"""
|select t.waybillNo waybill_No,
| t.bagNo bag_No,
| t.containerNo container_No,
| t.expType exp_Type,
| t.orgCode org_Code,
| t.createTime create_Time,
| t.weight weight,
| t.createUserCode create_User_Code,
| t.createUserName create_User_Name,
| t.line_name line_Name,
| t.truckNo truck_No,
| t.truckTypeNew truck_Type_New,
| t.transCodeLink trans_Code_Link,
| t.predictDepartTime predict_Depart_Time,
| t.returnFlag return_Flag,
| t.datoubi datoubi,
| t.desRoute des_Route,
| t.desOrgCode des_Org_Code,
| t.masterNodeCode master_Node_Code,
| t.startRegionCode start_Region_Code,
| t.desRegionCode des_Region_Code,
| t.peizaiseoncdOrgCode peizaiseoncd_Org_Code,
| t.peizaisecondRegionCode peizaisecond_Region_Code,
| t.peizaistartOrgCode peizaistart_Org_Code,
| t.peizaistartRegionCode peizaistart_Region_Code,
| t.startProvCode start_Prov_Code,
| t.waybillNob waybill_Nob,
| t.firstNodeCode first_Node_Code,
| t.centerCarType center_Car_Type,
| t.routeLineName route_Line_Name,
| case when t.car_Type is null or t.car_Type='' then t.carType else t.car_Type end as car_Type,
| t.run_mode run_mode,
| t.fache_Time fache_Time,
| t.batchTime batch_Time,
| '${batchTime}' rpt_date
| from (
| select tab.*, c.*,
| row_number() over(partition by tab.waybillNo,tab.orgCode,tab.containerNo order by tab.createtime desc) rn
| from (
| select t1.*
| from upcar_monitor_detail_base_temp t1
| where t1.createTime>= '${ctStartDate}' and t1.createTime < '${batchTime}'
| union all
| select t2.*
| from upcar_monitor_detail_base_temp t2
| join (select car_no from tidb_ytrpt.upcar_monitor_carno_day where rpt_date = '${DateHelper.convertDate(batchTime, "yyyy-MM-dd")}' and car_type='待发车') c on c.car_no = t2.containerno
| where t2.createTime>= '${initDate}' and t2.createTime < '${ctStartDate}'
| ) tab left join (select * from temp_upcar_monitor_carno where create_time<='${batchTime}') c on c.car_no = tab.containerno
|) t where rn = 1 and not exists (select 1 from temp_upcar_monitor_carno c where status = 'INVALID' and c.car_no=t.containerno)
|""".stripMargin
println(sql)
// spark.sql(sql).createOrReplaceTempView("upcar_monitor_detail_incomplete")
spark.sql(sql)
.repartition(256).persist(StorageLevel.MEMORY_ONLY_SER)
.createOrReplaceTempView("upcar_monitor_detail_temp")
}
def getBillDetail(spark: SparkSession, batchTime: String) = {
val sql =
s"“”
|select ‘${batchTime}’ rpt_date,
| t.center_code,
| m.org_name center_name,
| t.car_mark,
| t.waybill_num,
| t.route,
| t.car_time,
| t.kg_num,
| t.operate_name,
| t.operate_code,
| n.late_status
| from (
| select t1.master_node_code center_code,
| t1.container_no car_mark,
| t1.waybill_no waybill_num,
| t1.route_line_name route,
| t1.create_time car_time,
| t1.weight kg_num,
| t1.create_user_name operate_name,
| t1.create_user_code operate_code,
| t1.car_Type car_Type
| from upcar_monitor_detail_temp t1
| where t1.center_Car_Type <> ‘一致’
| and t1.center_Car_Type <> ‘特殊路由’
| ) t left join temp_upcar_monitor_orgdim m on t.center_code = m.org_code
| left join
| (
| select car_mark,max(late_status) as late_status from temp_now_car_details group by car_mark
| ) n on t.car_mark=n.car_mark
|“”".stripMargin
println(sql)
// val df = spark.sql(sql).repartition(256).persist(StorageLevel.MEMORY_ONLY_SER)
// df.createOrReplaceTempView(“upcar_monitor_waybill_details_temp”)
spark.sql(sql).repartition(256).write.mode(saveMode = “append”).format(“jdbc”)
.option(“driver”, “com.mysql.jdbc.Driver”)
.option(“url”, “jdbc:mysql://tidb1.prd.db:3390/ytrpt?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8”)
.option(“useSSL”, false)
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 10000)
.option(“dbtable”, “upcar_monitor_waybill_details”) // database name and table name here
.option(“isolationLevel”, “NONE”) // set isolationLevel to NONE
.option(“user”, “root”) // TiDB user here
.save()
// // 9点批次写入日维度表中
// DateCollectTotal.saveBillDay(spark, batchTime)
//
// df.unpersist()
}
def getCarDetail(spark: SparkSession, batchTime: String, date_t:String,date_t_5:String) = {
val simpleBatchTime = batchTime.replaceAll(“-|:”, “”).replaceAll(" ", “”)
// 车签层级未展开汇总
/var sql =
s"“”
|select t.org_code start_center_code,
| t.des_route end_center_code,
| t.container_no car_mark,
| t.target_weight should_num,
| sum(t.weight) aboard_num,
| count(t.waybill_no) ticket_num,
| sum(case when t.center_car_type = ‘未按规划中转’ then 1 else 0 end) fault_num,
| t.car_type car_type,
| max(t.truck_no) car_plate,
| t.line_name route,
| t.fache_time,
| t.predict_depart_time
| from upcar_monitor_detail_temp t
| group by t.org_code,
| t.des_route,
| t.container_no,
| t.target_weight,
| t.car_type,
| t.line_name,
| t.fache_time
|“”".stripMargin/
// 最近5天同线路、同车型平均装载量
var sql =
s"""
|select
| route
| ,truck_type
| ,max(should_num) as should_num
| ,avg(aboard_num) as aboard_num
| ,count(1) as car_count
|from (
| select route,truck_type,car_mark,should_num ,aboard_num
| ,row_number() over(partition by route,truck_type,car_mark order by rpt_date desc) as sn
| from tidb_ytrpt.upcar_monitor_car_details
| where rpt_date>='${date_t_5}'
| and rpt_date< '${date_t}'
| and car_type='已发车'
|) tab where sn=1
|group by route,truck_type
|order by route,truck_type
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_upcar_monitor_sameline_sametype_should_numb")
// 获取 T-1 日期
val date_t_2 = DateHelper.defineDayBefore(date_t, -2, "yyyy-MM-dd")
// 获取配置表 “额定装载重量”
sql =
s"""
|select source_center_code
| ,des_center_code
| ,car_type
| ,target_num
| ,target_weight
|from (
| select
| source_center_code
| ,des_center_code
| ,car_type
| ,target_num
| ,target_weight
| ,row_number() over(partition by source_center_code,des_center_code,car_type order by dt desc,c_id desc) rn
| from ods.rpt_db_input_cubed_out_new
| where dt >= regexp_replace('${date_t_2}','-','')
|) t0
|where t0.rn = 1
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_line_target")
sql =
"""
|select t.org_code start_center_code,
| t.des_route end_center_code,
| t.container_no car_mark,
| sum(t.weight) aboard_num,
| count(t.waybill_no) ticket_num,
| sum(case when t.center_car_type = '未按规划中转' then 1 else 0 end) fault_num,
| t.car_type car_type,
| t.car_plate,
| t.run_mode,
| t.line_name route,
| t.fache_time,
| t.truck_Type_New,
| max(t.predict_depart_time) predict_depart_time
| from (select *,
| first_value(truck_no) over(partition by container_no order by predict_depart_time desc) car_plate
| from upcar_monitor_detail_temp) t
| group by t.org_code,
| t.des_route,
| t.container_no,
| t.car_type,
| t.car_plate,
| t.run_mode,
| t.line_name,
| t.fache_time,
| t.truck_Type_New
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_car")
// 车签层级展开汇总
sql =
"""
|select t.org_code start_center_code,
| t.des_route end_center_code,
| t.container_no car_mark,
| t.master_node_code center_code,
| count(t.waybill_no) center_ticket_num,
| case when t.center_car_type='未按规划中转' then '错装'
| when t.center_car_type in ('一致','特殊路由') then '一致'
| when t.center_car_type='临时路由' then '临时路由'
| when t.center_car_type='其他' then '其他'
| end center_car_type
| from upcar_monitor_detail_temp t
| group by t.org_code,
| t.des_route,
| t.container_no,
| t.master_node_code,
| case when t.center_car_type='未按规划中转' then '错装'
| when t.center_car_type in ('一致','特殊路由') then '一致'
| when t.center_car_type='临时路由' then '临时路由'
| when t.center_car_type='其他' then '其他'
| end
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_car_one")
sql =
s"""
|select m1.region_code,
| m1.region_name,
| tab.start_center_code,
| m1.org_name start_center_name,
| tab.end_center_code,
| m2.org_name end_center_name,
| tab.car_mark,
| tab.aboard_num,
| tab.ticket_num,
| tab.fault_num,
| tab.car_type,
| tab.run_mode,
| tab.car_plate,
| tab.route,
| tab.center_code,
| m3.org_name center_name,
| tab.center_ticket_num,
| tab.center_car_type,
| tab.fache_time start_car_time,
| tab.predict_depart_time should_start_time,
| tab.truck_Type_New,
| case when tab.fache_time is not null and unix_timestamp(tab.fache_time, 'yyyy-MM-dd HH:mm:ss') - unix_timestamp(tab.predict_depart_time, 'yyyy-MM-dd HH:mm:ss') > 0 then 1 when tab.fache_time is null and unix_timestamp('${simpleBatchTime}', 'yyyyMMddHHmmss') - coalesce(unix_timestamp(tab.predict_depart_time, 'yyyy-MM-dd HH:mm:ss'), unix_timestamp('2099-12-01 23:59:59', 'yyyy-MM-dd HH:mm:ss')) > 0 then 1 else 0 end as late_status
| from (select a.start_center_code,
| a.end_center_code,
| a.car_mark,
| a.aboard_num,
| a.ticket_num,
| a.fault_num,
| a.car_type car_type,
| a.run_mode,
| a.car_plate,
| a.route,
| a.fache_time fache_time,
| a.predict_depart_time predict_depart_time,
| a.truck_Type_New,
| b.center_code,
| b.center_ticket_num,
| b.center_car_type
| from temp_t_lkn_upcar_monitor_base_car a
| join temp_t_lkn_upcar_monitor_base_car_one b
| on a.start_center_code = b.start_center_code
| and a.end_center_code = b.end_center_code
| and a.car_mark = b.car_mark) tab
| left join temp_upcar_monitor_orgdim m1
| on tab.start_center_code = m1.org_code
| left join temp_upcar_monitor_orgdim m2
| on tab.end_center_code = m2.org_code
| left join temp_upcar_monitor_orgdim m3
| on tab.center_code = m3.org_code
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_now_car_details")
sql =
s"""
|select '${batchTime}' rpt_date,
| region_code,
| region_name,
| start_center_code,
| start_center_name,
| end_center_code,
| end_center_name,
| car_mark,
| cast(sum(aboard_num) as decimal(12,4)) aboard_num,
| cast(sum(ticket_num) as decimal(12)) ticket_num,
| cast(sum(fault_num) as decimal(12)) fault_num,
| car_type,
| run_mode,
| car_plate,
| route,
| center_code,
| center_name,
| cast(sum(center_ticket_num) as decimal(12)) center_ticket_num,
| center_car_type,
| start_car_time,
| should_start_time,
| late_status,
| truck_Type_New as truck_type
| from temp_now_car_details
| group by region_code,
| region_name,
| start_center_code,
| start_center_name,
| end_center_code,
| end_center_name,
| car_mark,
| car_type,
| run_mode,
| car_plate,
| route,
| center_code,
| center_name,
| center_car_type,
| start_car_time,
| should_start_time,
| late_status,
| truck_Type_New
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_car_three")
val sql_dfc =
s"""
|select *
| from (select waybillno,
| orgcode,
| date_format(createtime,'yyyy-MM-dd HH:mm:ss') createtime,
| status,
| auxopcode,
| ROW_NUMBER() OVER(PARTITION BY waybillno,orgcode order by createtime desc,uploadtime desc) rk
| from rtorc.t_exp_op_record_truck
| where dt >= regexp_replace('${date_t_2}','-','')
| and opcode = '140'
| ) a
| where rk=1
| and status>'0'
| and auxopcode <> 'DELETE'
|""".stripMargin
println(sql_dfc)
spark.sql(sql_dfc).createOrReplaceTempView("temp_exp_op_record_truck")
sql =
"""
|select
| ta.rpt_date
| ,ta.region_code
| ,ta.region_name
| ,ta.start_center_code
| ,ta.start_center_name
| ,ta.end_center_code
| ,ta.end_center_name
| ,ta.car_mark
| ,coalesce(tb.aboard_num,tc.target_weight,0) as should_num
| ,ta.aboard_num
| ,ta.ticket_num
| ,ta.fault_num
| ,ta.car_type
| ,ta.car_plate
| ,ta.route
| ,ta.center_code
| ,ta.center_name
| ,ta.center_ticket_num
| ,ta.center_car_type
| ,case when td.createtime is not null and td.createtime <> '' and td.createtime < ta.start_car_time and size(split(ta.route,'-')) > 2
| then td.createtime
| else ta.start_car_time
| end start_car_time
| ,ta.should_start_time
| ,ta.late_status
| ,ta.truck_type
| ,case when coalesce(tb.aboard_num,tc.target_weight,0)=0 then 0 else ta.aboard_num*1.0/coalesce(tb.aboard_num,tc.target_weight,0) end as loading_rate -- 装载率
| ,ta.run_mode
|from temp_t_lkn_upcar_monitor_base_car_three ta
|left join temp_t_upcar_monitor_sameline_sametype_should_numb tb
| on ta.route=tb.route and ta.truck_type=tb.truck_type
|left join temp_line_target tc
| on ta.start_center_code=tc.source_center_code and ta.end_center_code=tc.des_center_code and ta.truck_type=tc.car_type
|left join temp_exp_op_record_truck td
| on ta.car_mark = td.waybillno
| and ta.start_center_code = td.orgcode
|""".stripMargin
println(sql)
// spark.sql(sql).createOrReplaceTempView("temp_upcar_monitor_car_details")
val df = spark.sql(sql).repartition(100).persist(StorageLevel.MEMORY_ONLY_SER)
df.write.mode(saveMode = "append").format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://tidb1.prd.db:3390/ytrpt?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8")
.option("useSSL", false)
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 10000)
.option("dbtable", "upcar_monitor_car_details") // database name and table name here
.option("isolationLevel", "NONE") // set isolationLevel to NONE
.option("user", "root") // TiDB user here
.save()
// // 9点批次写入日维度表中
// DateCollectTotal.saveCarDay(spark, batchTime)
}
def getCenterTotal(spark: SparkSession, batchTime: String) = {
// 目的中心层级汇总 全部,已发车,待发车
var sql =
“”"
|select region_code,
| region_name,
| start_center_code,
| start_center_name,
| end_center_code,
| end_center_name,
| sum(center_ticket_num) car_num,
| sum(case when center_car_type = ‘一致’ then center_ticket_num else 0 end) unanimous_num,
| sum(case when center_car_type = ‘错装’ then center_ticket_num else 0 end) fault_num,
| sum(case when center_car_type = ‘临时路由’ then center_ticket_num else 0 end) temporary_num,
| sum(case when center_car_type = ‘其他’ then center_ticket_num else 0 end) other_num
| from temp_t_lkn_upcar_monitor_base_car_three
| group by region_code,
| region_name,
| start_center_code,
| start_center_name,
| end_center_code,
| end_center_name
|“”".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView(“temp_t_lkn_upcar_monitor_base_center”)
// 统计车签数量
sql =
"""
|select start_center_code,
| end_center_code,
| count(car_mark) use_car_num
| from temp_t_lkn_upcar_monitor_base_car
| group by start_center_code,
| end_center_code
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_carnum")
// 匹配车签数量
sql =
"""
|select a.*,
| b.use_car_num
| from temp_t_lkn_upcar_monitor_base_center a
| join temp_t_lkn_upcar_monitor_base_center_carnum b
| on a.start_center_code=b.start_center_code
| and a.end_center_code=b.end_center_code
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_withnum")
// 始发中心层级汇总
sql =
"""
|select region_code,
| region_name,
| start_center_code,
| start_center_name,
| sum(car_num) car_num,
| sum(unanimous_num) unanimous_num,
| sum(fault_num) fault_num,
| sum(temporary_num) temporary_num,
| sum(other_num) other_num,
| sum(use_car_num) use_car_num
| from temp_t_lkn_upcar_monitor_base_center_withnum
|group by region_code,
| region_name,
| start_center_code,
| start_center_name
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_one")
// 目的始发汇总整合
sql =
"""
|select region_code,
| region_name,
| start_center_code,
| start_center_name,
| end_center_code,
| end_center_name,
| car_num,
| unanimous_num,
| fault_num,
| temporary_num,
| other_num,
| '目的中心' center_type,
| use_car_num,
| '全部' car_type
| from temp_t_lkn_upcar_monitor_base_center_withnum
|union all
|select region_code,
| region_name,
| start_center_code,
| start_center_name,
| null end_center_code,
| null end_center_name,
| car_num,
| unanimous_num,
| fault_num,
| temporary_num,
| other_num,
| '始发中心' center_type,
| use_car_num,
| '全部' car_type
| from temp_t_lkn_upcar_monitor_base_center_one
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_two")
// 目的中心层级汇总 已发车
sql =
"""
|select region_code,
| region_name,
| start_center_code,
| start_center_name,
| end_center_code,
| end_center_name,
| sum(center_ticket_num) car_num,
| sum(case when center_car_type='一致' then center_ticket_num else 0 end) unanimous_num,
| sum(case when center_car_type='错装' then center_ticket_num else 0 end) fault_num,
| sum(case when center_car_type='临时路由' then center_ticket_num else 0 end) temporary_num,
| sum(case when center_car_type='其他' then center_ticket_num else 0 end) other_num
| from temp_t_lkn_upcar_monitor_base_car_three
| where car_type='已发车'
|group by region_code,
| region_name,
| start_center_code,
| start_center_name,
| end_center_code,
| end_center_name
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_yifa")
// 统计车签数量
sql =
"""
|select start_center_code,
| end_center_code,
| count(car_mark) use_car_num
| from temp_t_lkn_upcar_monitor_base_car
| where car_type='已发车'
| group by start_center_code,
| end_center_code
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_carnum_yifa")
// 匹配车签数量
sql =
"""
|select a.*,
| b.use_car_num
| from temp_t_lkn_upcar_monitor_base_center_yifa a
| join temp_t_lkn_upcar_monitor_base_center_carnum_yifa b
| on a.start_center_code=b.start_center_code
| and a.end_center_code=b.end_center_code
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_withnum_yifa")
// 始发中心层级汇总
sql =
"""
|select region_code,
| region_name,
| start_center_code,
| start_center_name,
| sum(car_num) car_num,
| sum(unanimous_num) unanimous_num,
| sum(fault_num) fault_num,
| sum(temporary_num) temporary_num,
| sum(other_num) other_num,
| sum(use_car_num) use_car_num
| from temp_t_lkn_upcar_monitor_base_center_withnum_yifa
|group by region_code,
| region_name,
| start_center_code,
| start_center_name
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_one_yifa")
// 目的始发汇总整合
sql =
"""
|select region_code,
| region_name,
| start_center_code,
| start_center_name,
| end_center_code,
| end_center_name,
| car_num,
| unanimous_num,
| fault_num,
| temporary_num,
| other_num,
| '目的中心' center_type,
| use_car_num,
| '已发车' car_type
| from temp_t_lkn_upcar_monitor_base_center_withnum_yifa
|union all
|select region_code,
| region_name,
| start_center_code,
| start_center_name,
| null end_center_code,
| null end_center_name,
| car_num,
| unanimous_num,
| fault_num,
| temporary_num,
| other_num,
| '始发中心' center_type,
| use_car_num,
| '已发车' car_type
| from temp_t_lkn_upcar_monitor_base_center_one_yifa
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_two_yifa")
// 目的中心层级汇总 待发车
sql =
"""
|select region_code,
| region_name,
| start_center_code,
| start_center_name,
| end_center_code,
| end_center_name,
| sum(center_ticket_num) car_num,
| sum(case when center_car_type='一致' then center_ticket_num else 0 end) unanimous_num,
| sum(case when center_car_type='错装' then center_ticket_num else 0 end) fault_num,
| sum(case when center_car_type='临时路由' then center_ticket_num else 0 end) temporary_num,
| sum(case when center_car_type='其他' then center_ticket_num else 0 end) other_num
| from temp_t_lkn_upcar_monitor_base_car_three
| where car_type='待发车'
|group by region_code,
| region_name,
| start_center_code,
| start_center_name,
| end_center_code,
| end_center_name
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_daifa")
// 统计车签数量
sql =
"""
|select start_center_code,
| end_center_code,
| count(car_mark) use_car_num
| from temp_t_lkn_upcar_monitor_base_car
| where car_type='待发车'
| group by start_center_code,
| end_center_code
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_carnum_daifa")
// 匹配车签数量
sql =
"""
|select a.*,
| b.use_car_num
| from temp_t_lkn_upcar_monitor_base_center_daifa a
| join temp_t_lkn_upcar_monitor_base_center_carnum_daifa b
| on a.start_center_code=b.start_center_code
| and a.end_center_code=b.end_center_code
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_withnum_daifa")
// 始发中心层级汇总
sql =
"""
|select region_code,
| region_name,
| start_center_code,
| start_center_name,
| sum(car_num) car_num,
| sum(unanimous_num) unanimous_num,
| sum(fault_num) fault_num,
| sum(temporary_num) temporary_num,
| sum(other_num) other_num,
| sum(use_car_num) use_car_num
| from temp_t_lkn_upcar_monitor_base_center_withnum_daifa
|group by region_code,
| region_name,
| start_center_code,
| start_center_name
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_one_daifa")
// 目的始发汇总整合
sql =
"""
|select region_code,
| region_name,
| start_center_code,
| start_center_name,
| end_center_code,
| end_center_name,
| car_num,
| unanimous_num,
| fault_num,
| temporary_num,
| other_num,
| '目的中心' center_type,
| use_car_num,
| '待发车' car_type
| from temp_t_lkn_upcar_monitor_base_center_withnum_daifa
|union all
|select region_code,
| region_name,
| start_center_code,
| start_center_name,
| null end_center_code,
| null end_center_name,
| car_num,
| unanimous_num,
| fault_num,
| temporary_num,
| other_num,
| '始发中心' center_type,
| use_car_num,
| '待发车' car_type
| from temp_t_lkn_upcar_monitor_base_center_one_daifa
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_two_daifa")
// 三种状态数据整合
sql =
"""
|select *
| from temp_t_lkn_upcar_monitor_base_center_two
|union all
|select *
| from temp_t_lkn_upcar_monitor_base_center_two_yifa
|union all
|select *
| from temp_t_lkn_upcar_monitor_base_center_two_daifa
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_two_all")
// 中心层级落地
sql =
"""
|select region_code, --省区code
| region_name, --省区name
| start_center_code, --始发中心code
| start_center_name, --始发中心name
| end_center_code, --目的中心code
| end_center_name, --目的中心name
| car_num, --上车票量
| unanimous_num, --一致票量
| fault_num, --错装票量
| temporary_num, --临时路由
| other_num, --其他
| center_type, --始发中心,目的中心
| use_car_num, --用车辆
| car_type --类型:全部,已发车,待发车
| from temp_t_lkn_upcar_monitor_base_center_two_all
|""".stripMargin
println(sql)
spark.sql(sql).createOrReplaceTempView("temp_now_center_details")
sql =
s"""
|select '${batchTime}' rpt_date,
| region_code,
| region_name,
| start_center_code,
| start_center_name,
| end_center_code,
| end_center_name,
| cast(sum(car_num) as decimal(12)) car_num,
| cast(sum(unanimous_num) as decimal(12)) unanimous_num,
| cast(sum(fault_num) as decimal(12)) fault_num,
| cast(sum(temporary_num) as decimal(12)) temporary_num,
| cast(sum(other_num) as decimal(12)) other_num,
| center_type,
| cast(sum(use_car_num) as decimal(12)) use_car_num,
| car_type
| from temp_now_center_details t
| group by region_code,
| region_name,
| start_center_code,
| start_center_name,
| end_center_code,
| end_center_name,
| center_type,
| car_type
|""".stripMargin
println(sql)
// val df = spark.sql(sql).persist(StorageLevel.MEMORY_ONLY_SER)
// df.createOrReplaceTempView("upcar_monitor_center_details_temp")
spark.sql(sql).write.mode(saveMode = "append").format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://tidb1.prd.db:3390/ytrpt?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8")
.option("useSSL", false)
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 10000)
.option("dbtable", "upcar_monitor_center_details") // database name and table name here
.option("isolationLevel", "NONE") // set isolationLevel to NONE
.option("user", "root") // TiDB user here
.save()
// // 9点批次写入日维度表中
// DateCollectTotal.saveCenterDay(spark, batchTime)
// 始发省区层级汇总, 始发省区落地 全国、率值字段后端计算
sql =
"""
|select region_code,
| region_name,
| sum(car_num) car_num,
| sum(unanimous_num) unanimous_num,
| sum(fault_num) fault_num,
| sum(temporary_num) temporary_num,
| sum(other_num) other_num,
| car_type
| from temp_t_lkn_upcar_monitor_base_center_two_all
| where center_type='始发中心'
| group by region_code,
| region_name,
| car_type
|""".stripMargin
spark.sql(sql).createOrReplaceTempView("temp_now_region_details")
sql =
s"""
|select '${batchTime}' rpt_date,
| region_code,
| region_name,
| cast(sum(car_num) as decimal(12)) car_num,
| cast(sum(unanimous_num) as decimal(12)) unanimous_num,
| cast(sum(fault_num) as decimal(12)) fault_num,
| cast(sum(temporary_num) as decimal(12)) temporary_num,
| cast(sum(other_num) as decimal(12)) other_num,
| car_type
| from temp_now_region_details tab
|group by region_code,
| region_name,
| car_type
|""".stripMargin
println(sql)
// val df2 = spark.sql(sql).persist(StorageLevel.MEMORY_ONLY_SER)
// df2.createOrReplaceTempView("upcar_monitor_region_details_temp")
spark.sql(sql).write.mode(saveMode = "append").format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://tidb1.prd.db:3390/ytrpt?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8")
.option("useSSL", false)
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 10000)
.option("dbtable", "upcar_monitor_region_details") // database name and table name here
.option("isolationLevel", "NONE") // set isolationLevel to NONE
.option("user", "root") // TiDB user here
.save()
// // 9点批次写入日维度表中
// DateCollectTotal.saveRegionDay(spark, batchTime)
//------ “待发-晚发”逻辑 by sgp at 2022-09-22
sql =
"""
|select region_code,
| region_name,
| start_center_code,
| start_center_name,
| end_center_code,
| end_center_name,
| sum(center_ticket_num) car_num,
| sum(case when center_car_type='一致' then center_ticket_num else 0 end) unanimous_num,
| sum(case when center_car_type='错装' then center_ticket_num else 0 end) fault_num,
| sum(case when center_car_type='临时路由' then center_ticket_num else 0 end) temporary_num,
| sum(case when center_car_type='其他' then center_ticket_num else 0 end) other_num
| from temp_t_lkn_upcar_monitor_base_car_three
| where car_type='待发车'
| and late_status='1'
|group by region_code,
| region_name,
| start_center_code,
| start_center_name,
| end_center_code,
| end_center_name
|""".stripMargin
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_daifa_wanfa")
sql =
"""
|select ta.start_center_code,
| ta.end_center_code,
| count(ta.car_mark) use_car_num
|from
|(
| select * from temp_t_lkn_upcar_monitor_base_car
| where car_type='待发车'
|) ta inner join (
| select car_mark,max(late_status) as late_status from temp_now_car_details
| where late_status='1'
| group by car_mark
|) tb on ta.car_mark=tb.car_mark
|group by start_center_code
| ,end_center_code
|""".stripMargin
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_carnum_daifa_wanfa")
sql =
"""
|select a.*,
| b.use_car_num
| from temp_t_lkn_upcar_monitor_base_center_daifa_wanfa a
| join temp_t_lkn_upcar_monitor_base_center_carnum_daifa_wanfa b
| on a.start_center_code=b.start_center_code
| and a.end_center_code=b.end_center_code
|""".stripMargin
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_withnum_daifa_wanfa")
sql =
"""
|select region_code,
| region_name,
| start_center_code,
| start_center_name,
| sum(car_num) car_num,
| sum(unanimous_num) unanimous_num,
| sum(fault_num) fault_num,
| sum(temporary_num) temporary_num,
| sum(other_num) other_num,
| sum(use_car_num) use_car_num
| from temp_t_lkn_upcar_monitor_base_center_withnum_daifa_wanfa
|group by region_code,
| region_name,
| start_center_code,
| start_center_name
|""".stripMargin
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_one_daifa_wanfa")
sql =
"""
|select region_code,
| region_name,
| start_center_code,
| start_center_name,
| end_center_code,
| end_center_name,
| car_num,
| unanimous_num,
| fault_num,
| temporary_num,
| other_num,
| '目的中心' center_type,
| use_car_num,
| '待发车' car_type
| from temp_t_lkn_upcar_monitor_base_center_withnum_daifa_wanfa
|union all
|select region_code,
| region_name,
| start_center_code,
| start_center_name,
| null end_center_code,
| null end_center_name,
| car_num,
| unanimous_num,
| fault_num,
| temporary_num,
| other_num,
| '始发中心' center_type,
| use_car_num,
| '待发车' car_type
| from temp_t_lkn_upcar_monitor_base_center_one_daifa_wanfa
|""".stripMargin
spark.sql(sql).createOrReplaceTempView("temp_t_lkn_upcar_monitor_base_center_two_daifa_wanfa")
sql =
"""
| from temp_t_lkn_upcar_monitor_base_center_two_daifa_wanfa
|""".stripMargin
spark.sql(sql).persist(StorageLevel.MEMORY_ONLY_SER).createOrReplaceTempView("temp_now_center_details_wanfa")
sql =
s"""
|select '${batchTime}' rpt_date,
| region_code,
| region_name,
| start_center_code,
| start_center_name,
| end_center_code,
| end_center_name,
| cast(sum(car_num) as decimal(12)) car_num,
| cast(sum(unanimous_num) as decimal(12)) unanimous_num,
| cast(sum(fault_num) as decimal(12)) fault_num,
| cast(sum(temporary_num) as decimal(12)) temporary_num,
| cast(sum(other_num) as decimal(12)) other_num,
| center_type,
| cast(sum(use_car_num) as decimal(12)) use_car_num,
| car_type
| from temp_now_center_details_wanfa t
| group by region_code,
| region_name,
| start_center_code,
| start_center_name,
| end_center_code,
| end_center_name,
| center_type,
| car_type
|""".stripMargin
println(sql)
spark.sql(sql).write.mode(saveMode = "append").format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://ip:3390/ytrpt?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8")
.option("useSSL", false)
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 10000)
.option("dbtable", "upcar_monitor_center_details_late") // database name and table name here
.option("isolationLevel", "NONE") // set isolationLevel to NONE
.option("user", "root") // TiDB user here
.save()
sql =
s"""
|select
| '${batchTime}' rpt_date,
| region_code,
| region_name,
| sum(car_num) car_num,
| sum(unanimous_num) unanimous_num,
| sum(fault_num) fault_num,
| sum(temporary_num) temporary_num,
| sum(other_num) other_num,
| car_type
| from temp_now_center_details_wanfa
| where center_type='始发中心'
| group by region_code,
| region_name,
| car_type
|""".stripMargin
println(sql)
spark.sql(sql).write.mode(saveMode = "append").format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://ip:3390/ytrpt?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8")
.option("useSSL", false)
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 10000)
.option("dbtable", "upcar_monitor_region_details_late") // database name and table name here
.option("isolationLevel", "NONE") // set isolationLevel to NONE
.option("user", "root") // TiDB user here
.save()
}
def getCarPeriod(spark: SparkSession, batchTime: String) = {
val sql1 =
“”"
|select
| containerNo
| ,orgCode
| ,hour_
| ,tickects
| ,weight
| ,first_value(hour_) over(partition by containerNo,orgCode order by hour_) as first_hour
| ,predictDepartTime
|from (
| select
| containerNo
| ,orgCode
| ,date_format(createTime,‘yyyy-MM-dd HH’) as hour_
| ,count(waybillNo) as tickects
| ,sum(weight) as weight
| ,max(predictDepartTime) as predictDepartTime
| from (
| select container_No containerNo,org_Code orgCode,create_Time createTime,waybill_No waybillNo,predict_Depart_Time predictDepartTime
| ,weight
| ,row_number() over(partition by container_No,org_Code,waybill_No order by create_Time desc) as sn
| from upcar_monitor_detail_temp
| ) tab where sn = 1
| group by containerNo,orgCode,date_format(createTime,‘yyyy-MM-dd HH’)
|) tab2
|“”".stripMargin
println("sql is :" + sql1)
spark.sql(sql1).createOrReplaceTempView("temp_middle_result_20220815")
// 生成各车签、中心,8+1个段数据
val sql2 =
s"""
|select
| containerNo
| ,orgCode
| ,from_unixtime(unix_timestamp(cast(concat(first_hour,':00:00') as timestamp))+3600*hour_num,'yyyy-MM-dd HH') as hour_per
| ,SUBSTRING(from_unixtime(unix_timestamp(cast(concat(first_hour,':00:00') as timestamp))+3600*(hour_num+1),'yyyy-MM-dd HH'),12,2) as end_hour
| ,hour_num
|from (
| select
| containerNo
| ,orgCode
| ,first_hour
| ,cast(hour_num as bigint) as hour_num
| from (SELECT explode(split('0,1,2,3,4,5,6,7,8,9,10,11',',')) AS hour_num) tab1
| cross join (select containerNo,orgCode,first_hour from temp_middle_result_20220815 group by containerNo,orgCode,first_hour)
|) tab
|""".stripMargin
spark.sql(sql2).createOrReplaceTempView("temp_middle_result2_20220815")
println("sql2 is : " + sql2)
/*val dataframe2 = spark.sql("select * from temp_middle_result2_20220815").limit(10)
dataframe2.collect().map(ele=>{
logger.info("row is:"+ele.getString(0)+"|"+ele.getString(1)+"|"+ele.getString(2)+"|"+ele.getString(3)+"|"+ele.getLong(4))
})*/
val sql3 =
"""
|--- 0~7 时段
|select
| ta.containerNo
| ,ta.orgCode
| ,concat(ta.hour_per,':00-',ta.end_hour,':00') as hour_per
| ,tb.tickects
| ,tb.weight
| ,ta.hour_num+1 as order_num
| ,tb.predictDepartTime
|from temp_middle_result2_20220815 ta left join temp_middle_result_20220815 tb
|on ta.containerNo=tb.containerNo
|and ta.orgCode=tb.orgCode
|and ta.hour_per=tb.hour_
|union all
|--- > 13 时段
|select ta.containerNo
| ,ta.orgCode
| ,'其他' as hour_per
| ,sum(ta.tickects) as tickects
| ,sum(ta.weight) as weight
| ,'13' as order_num
| ,max(ta.predictDepartTime) as predictDepartTime
|from temp_middle_result_20220815 ta left join temp_middle_result2_20220815 tb
|on ta.containerNo=tb.containerNo
|and ta.orgCode=tb.orgCode
|and ta.hour_=tb.hour_per
|where tb.hour_per is null
|group by ta.containerNo
| ,ta.orgCode
|""".stripMargin
spark.sql(sql3).createOrReplaceTempView("temp_middle_result3_20220815")
println("sql3 is : " + sql3)
// spark.sql("select * from temp_middle_result3_20220815").show(1)
// val dataFrame = spark.sql("select count(1) from temp_middle_result3_20220815")
// logger.error("sgp say datacount is : " + d )
val sql4 =
s"""
|select
| '${batchTime}' as rpt_date
| ,ta.orgCode as center_code
| ,tc.org_name as center_name
| ,ta.containerNo as car_mark
| ,ta.hour_per as period
| ,ta.tickects as ticket_num
| ,ta.order_num as order_numb
| ,tb.car_type as car_type
| ,ta.predictDepartTime as plan_depart_time
| ,from_unixtime(unix_timestamp()) as create_time
| ,td.late_status
| ,ta.weight as weight
|from temp_middle_result3_20220815 ta left join
|(
|select car_no,status,car_type,ROW_NUMBER() over(partition by car_no order by create_time desc) as sn from tidb_ytrpt.upcar_monitor_carno
|) tb
|on ta.containerNo=tb.car_no and tb.sn=1
|left join
|(
| select org_code,max(org_name) as org_name from (
| select org_code,org_name,row_number() over(partition by org_code order by dt desc) as sn
| from dim.t03_user_org_mdm_org
| where dt >= date_format(date_add(current_date,-2),'yyyyMMdd')
| ) tab
| group by org_code
|) tc
|on ta.orgCode=tc.org_code
|left join (
| select car_mark,max(late_status) as late_status from temp_now_car_details group by car_mark
|) td on ta.containerNo=td.car_mark
|""".stripMargin
println("sql4 is :" + sql4)
// 写实时表
spark.sql(sql4).write.mode(saveMode = "append").format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
// 生产
.option("url", "jdbc:mysql://ip:3390/ytrpt?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8")
// 测试
//.option("url", "jdbc:mysql://tidb:4000/ytrpt?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8")
// .option("url", "jdbc:mysql://ip:4000/test?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=UTF-8")
.option("useSSL", false)
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 10000)
.option("dbtable", "upcar_monitor_car_details_period") // database name and table name here
.option("isolationLevel", "NONE") // set isolationLevel to NONE
.option("user", "root") // TiDB user here
.save()
}
}
代码有敏感信息,记得删一下。你的ip啥的都在里面。。
从这个堆栈上看,只有这个像是你的代码,其他都是依赖包里面的东西。
嗯,已经删除,谢谢提醒