大佬们!sparksql将1.7亿hive表数据写入tidb表中,写入速度可以优化吗

spark代码:

object Xunzhan2ToTidb {

  def main(args: Array[String]) {

    val spark = SparkSession.builder
      .appName("Xunzhan2ToTidb")
      //.master("local[*]")
      .config("spark.driver.allowMultipleContexts", true)
      .config("hive.exec.dynamic.partition.mode", "nonstrict")
      .enableHiveSupport()
      .getOrCreate()

    //spark.sparkContext.setLogLevel("ERROR")

    var part = ""
    part = args(0)

    val customer = spark.sql(
      s"""
         |select
         |      master_user_id as user_id   --用户ID
         |      ,story_id                   --故事id
         |      ,100 as ks_play_end_time    --播放截止时间
         |      ,100 as ks_media_file_play_start_time --播放开始时间
         |      ,1   as ks_media_finish_state --播放完成状态
         |      ,listen_times  as ks_media_duration_info --播放时长
         |      ,100 as ks_play_start_time --播放开始时间
         |      ,100 as ks_media_file_play_end_time --媒体文件播放开始时间
         |      ,duration  as ks_media_file_duration --文件总时长
         |      ,100 as ks_media_current_progress --播放百分比
         |      ,from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss') as create_time --开始时间
         |      ,from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss') as update_time --更新时间
         |      ,1   as media_type --媒体类型
         |      ,unix_timestamp() as local_time_ms --本地时间
         |      ,unix_timestamp() as server_time --服务器时间
         |from tmp.tmp_xunzhan_history_play_2year_active_sum_a where user_part = '${part}'
         |
         |""".stripMargin)
    println("分区"+part+",要写入的数据量"+customer.count())
    // you might repartition source to make it balance across nodes
    // and increase concurrency
    val df1 = customer.repartition(32)

    df1.write
      .mode(saveMode = "append")
      .format("jdbc")
      .option("driver", "com.mysql.jdbc.Driver")
      // replace host and port as your and be sure to use rewrite batch
      .option("url", "jdbc:mysql://:4000/ks_mission_data?rewriteBatchedStatements=true")
      .option("useSSL", "false")
      // As tested, 150 is good practice
      .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 1000)//设置批量插入数据量
      .option("dbtable", "") // database name and table name here
      .option("isolationLevel", "NONE") //不开启事务 recommended to set isolationLevel to NONE if you have a large DF to load.
      .option("user", "") // TiDB user here
      .option("password", "")
      .save()
    println("写入成功")


    spark.stop()
  }
}

参数:
–master yarn --deploy-mode client --queue hive2 --driver-memory 2g --executor-memory 30g --num-executors 20 --executor-cores 20
–class com.kaishu.warehouse.service.Xunzhan2ToTidb ossref://bigdata-mapreduce/res/spark/data_market_test-3c22fb43fb8.jar 8

用时最快45分钟写完,慢点一个多小时。

yarn UI:





什么版本?

spark什么版本,tispark什么版本,tikv几个服务器?什么配置?

发出来 大家帮忙看看 45分钟 2亿 已经很快了

1 个赞

可以参考tidb负载和系统资源进行增大你的各类并发和每次批量的数量测测,也可以给大家一个参考

该主题在最后一个回复创建后60天后自动关闭。不再允许新的回复。