Structured streaming使用sql关联数据库中表,数据库中表更新不显示

structured streaming使用sql关联数据库中表,当数据库中表更新时,sql查询的结果不发生改变。

 binlog.createOrReplaceGlobalTempView("binlog_test");
        Dataset<Row> d = sqlContext.sql("select * from global_temp.binlog_test a left join test.dm_test b on a.id = b.id");

global_temp.binlog_test 为structured streaming获取的实时数据创建的临时表,test.dm_test为数据库中表
当test.dm_test数据改变,该sql结果不该变

请描述一下 Structured streaming 在使用 TiDB 过程如何配置的,其他操作是否正常 ?

通过kafka接入流式处理 源码如下:

package com.hydee;

import com.hydee.entity.BinglogTest;
import com.hydee.proto.BinlogOuterClass;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;


import java.sql.*;

/**
 * test spark!
 */
public class App {


    public static void main(String[] args) {
        final Connection conn = getConnect();

        SparkConf conf = new SparkConf().setAppName("StructuredStream")
                .setMaster("spark://192.168.10.201:7077")
                .set("spark.driver.cores","1")
                .set("spark.driver.memory","1G")
                .set("spark.deploy.defaultCores","3")
                .set("spark.cores.max","3")
                .set("spark.executor.cores","1")
                .set("spark.executor.memory","2G");


        SparkSession spark = SparkSession
                .builder()
                .config(conf)
                .getOrCreate();
        spark.sparkContext().setLogLevel("WARN");
        SQLContext sqlContext = spark.sqlContext();

        Dataset<Row> df = spark.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "192.168.10.201:9092,192.168.10.202:9092,192.168.10.203:9092,192.168.10.204:9092")
                .option("subscribe","kafka_obinlog")
                .option("startingOffsets","latest")
                .load();
        Dataset<Row> data = df.select(functions.col("value"),functions.col("timestamp"));

        Encoder<BinglogTest> rowEncoder = Encoders.bean(BinglogTest.class);
        Dataset<Row> binlog = data.map(line->{
            BinlogOuterClass.Binlog binlogText = BinlogOuterClass.Binlog.parseFrom((byte[])line.get(0));
            BinlogOuterClass.Table table = binlogText.getDmlData().getTables(0);
            BinlogOuterClass.Row row = table.getMutations(0).getRow();
            return new BinglogTest((int)row.getColumns(0).getInt64Value(),row.getColumns(1).getStringValue());
        },rowEncoder).toDF();

        binlog.createOrReplaceGlobalTempView("binlog_test");
        Dataset<Row> d = sqlContext.sql("select * from global_temp.binlog_test a left join hydee.dm_test b on a.id = b.id");
        sqlContext.tables().show();
        StreamingQuery streamingQuery = d.writeStream()
                .outputMode("append")
                .format("console")
                .option("truncate", "false")
                .start();

        try {
            streamingQuery.awaitTermination();
            streamingQuery2.awaitTermination();
        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }finally {
            System.out.println("streamingQuery stop");
            streamingQuery.stop();
            streamingQuery2.stop();
            spark.close();
        }
    }
}

@zhengyunpeng

使用 TiSpark 是需要配置 pd addrs 和 spark.sql.extensions 的。 具体可以参考官网文档。

配置了,tispark能正常的使用的,能查到数据,但是数据库里数据变更sql的结果不变

structured streaming的执行计划:

WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@7dfa0785
12-09-2019 15:41:46 CST StructuredStream INFO - +- *(1) BroadcastHashJoin [cast(id#28 as bigint)], [id#33L], LeftOuter, BuildRight
12-09-2019 15:41:46 CST StructuredStream INFO -    :- *(1) SerializeFromObject [assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getId AS id#28, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getName, true, false) AS name#29]
12-09-2019 15:41:46 CST StructuredStream INFO -    :  +- *(1) MapElements com.hydee.App$$Lambda$39/1684572536@46a97805, obj#27: com.hydee.entity.BinglogTest
12-09-2019 15:41:46 CST StructuredStream INFO -    :     +- *(1) Filter com.hydee.App$$Lambda$38/2017890623@667dd150.call
12-09-2019 15:41:46 CST StructuredStream INFO -    :        +- *(1) DeserializeToObject createexternalrow(value#328, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#335, true, false), StructField(value,BinaryType,true), StructField(timestamp,TimestampType,true)), obj#26: org.apache.spark.sql.Row
12-09-2019 15:41:46 CST StructuredStream INFO -    :           +- *(1) Project [value#328, timestamp#335]
12-09-2019 15:41:46 CST StructuredStream INFO -    :              +- Scan ExistingRDD[key#326,value#328,topic#330,partition#332,offset#333L,timestamp#335,timestampType#337]
12-09-2019 15:41:46 CST StructuredStream INFO -    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
12-09-2019 15:41:46 CST StructuredStream INFO -       +- TiSpark CoprocessorRDD{[table: dm_test] TableScan, Columns: [id], [name], KeyRange: ([t200\000\000\000\000\000\003333_r\000\000\000\000\000\000\000\000], [t200\000\000\000\000\000\003333_s\000\000\000\000\000\000\000\000]), startTs: 411113641089695745}

每次流数据关联的时候startTs: 411113641089695745都是一样的,是不是tidb的快照策略导致每次查询的都是startTs: 411113641089695745时候的数据?

是的, 如果 ts 不变,那么查到的数据也是一样的。

这种情况怎么解决?我目前是想要关联到tidb的表数据是最新的,这是tispark里增加的吗?structured streaming没有这个问题的,他对接hdfs的时候查的数据是最新的

每次使用新的 DataFrame 应当可以解决这一问题。

目前我们公司规划是使用sql来处理所有逻辑,易于开发和管理,所以暂时不考虑使用新的 DataFrame。如果在流式处理里使用新的 DataFrame会增加开发、系统复杂度。 而且如果简单的关联查询修改为创建新的 DataFrame再进行关联查询对性能也有一定影响吧?(猜测)
这个问题是tispark里进行处理时使用了ts查快照数据导致的吗,能如何不使用快照时间戳查询?

请关注这个issue https://github.com/pingcap/tispark/issues/1102

后续修复了这个问题也会更新到这个帖子

好的,谢谢

@zhengyunpeng 请问您是哪个公司的?

@zhengyunpeng 请参考这个PR https://github.com/pingcap/tispark/pull/1104

我编译了一个jar包,能否请您帮我测试一下,感谢

链接: https://pan.baidu.com/s/1OTu_Kr9PBOHt_HzqUqSjmA 提取码: 9w3i 复制这段内容后打开百度网盘手机App,操作更方便哦

1 个赞

好的,我测试一下

java.io.InvalidClassException: org.apache.spark.sql.tispark.TiRDD; local class incompatible: stream classdesc serialVersionUID = -6650751149050193034, local class serialVersionUID = 1833683067262546845

这是版本问题导致的吗?执行stuctured streaming任务时报错 我们用的ansible-playbook安装的tidb2.1,自动安装的spark是2.11_2.3.2

可以先用local模式运行,在本地安装spark-2.3.3或者spark-2.4.3

因为最新tispark只支持spark-2.3和spark-2.4

spark下载链接 https://archive.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz

1 个赞

嗯,好的

可以刷新数据了,谢谢,问题解决了