Structured streaming使用sql关联数据库中表,数据库中表更新不显示

可以先用这个零时的jar包,后续发布正式版本后,再切换到正式版吧

好的,谢谢

测试代码:

binlog.createOrReplaceGlobalTempView("binlog_test");
        Dataset<Row> d = sqlContext.sql("select * from global_temp.binlog_test a left join hydee.dm_test b on a.id = b.id");
        StreamingQuery streamingQuery = d.writeStream()
                .outputMode("append")
                .format("console")
                .option("truncate", "false")
                .start();


        Thread listener = new Thread(() -> {
            while(true){
                System.out.println("streamingQuery.explain");
                System.out.println(System.currentTimeMillis());
                streamingQuery.explain();
                System.out.println("");
                System.out.println("");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

测试计划时间变更日志:

streamingQuery.explain
1568684744781
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@a9b9211
+- *(1) BroadcastHashJoin [cast(id#28 as bigint)], [id#33L], LeftOuter, BuildRight
   :- *(1) SerializeFromObject [assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getId AS id#28, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getName, true, false) AS name#29]
   :  +- *(1) MapElements com.hydee.App$$Lambda$41/1608757336@fb2c2f3, obj#27: com.hydee.entity.BinglogTest
   :     +- *(1) Filter com.hydee.App$$Lambda$40/675002551@148fca83.call
   :        +- *(1) DeserializeToObject createexternalrow(value#652, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#656, true, false), StructField(value,BinaryType,true), StructField(timestamp,TimestampType,true)), obj#26: org.apache.spark.sql.Row
   :           +- *(1) Project [value#652, timestamp#656]
   :              +- Scan ExistingRDD[key#651,value#652,topic#653,partition#654,offset#655L,timestamp#656,timestampType#657]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
      +- TiDB CoprocessorRDD{[table: dm_test] TableScan, Columns: [id], [name], KeyRange: ([t200\000\000\000\000\000\000-_r\000\000\000\000\000\000\000\000], [t200\000\000\000\000\000\000-_s\000\000\000\000\000\000\000\000]), startTs: 411221289384542210}


streamingQuery.explain
1568684746786
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@a9b9211
+- *(1) BroadcastHashJoin [cast(id#28 as bigint)], [id#33L], LeftOuter, BuildRight
   :- *(1) SerializeFromObject [assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getId AS id#28, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getName, true, false) AS name#29]
   :  +- *(1) MapElements com.hydee.App$$Lambda$41/1608757336@fb2c2f3, obj#27: com.hydee.entity.BinglogTest
   :     +- *(1) Filter com.hydee.App$$Lambda$40/675002551@148fca83.call
   :        +- *(1) DeserializeToObject createexternalrow(value#652, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#656, true, false), StructField(value,BinaryType,true), StructField(timestamp,TimestampType,true)), obj#26: org.apache.spark.sql.Row
   :           +- *(1) Project [value#652, timestamp#656]
   :              +- Scan ExistingRDD[key#651,value#652,topic#653,partition#654,offset#655L,timestamp#656,timestampType#657]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
      +- TiDB CoprocessorRDD{[table: dm_test] TableScan, Columns: [id], [name], KeyRange: ([t200\000\000\000\000\000\000-_r\000\000\000\000\000\000\000\000], [t200\000\000\000\000\000\000-_s\000\000\000\000\000\000\000\000]), startTs: 411221289384542210}


streamingQuery.explain
1568684748790
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@a9b9211
+- *(1) BroadcastHashJoin [cast(id#28 as bigint)], [id#33L], LeftOuter, BuildRight
   :- *(1) SerializeFromObject [assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getId AS id#28, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getName, true, false) AS name#29]
   :  +- *(1) MapElements com.hydee.App$$Lambda$41/1608757336@fb2c2f3, obj#27: com.hydee.entity.BinglogTest
   :     +- *(1) Filter com.hydee.App$$Lambda$40/675002551@148fca83.call
   :        +- *(1) DeserializeToObject createexternalrow(value#652, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#656, true, false), StructField(value,BinaryType,true), StructField(timestamp,TimestampType,true)), obj#26: org.apache.spark.sql.Row
   :           +- *(1) Project [value#652, timestamp#656]
   :              +- Scan ExistingRDD[key#651,value#652,topic#653,partition#654,offset#655L,timestamp#656,timestampType#657]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
      +- TiDB CoprocessorRDD{[table: dm_test] TableScan, Columns: [id], [name], KeyRange: ([t200\000\000\000\000\000\000-_r\000\000\000\000\000\000\000\000], [t200\000\000\000\000\000\000-_s\000\000\000\000\000\000\000\000]), startTs: 411221289384542210}


streamingQuery.explain
1568684750795
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@a9b9211
+- *(1) BroadcastHashJoin [cast(id#28 as bigint)], [id#33L], LeftOuter, BuildRight
   :- *(1) SerializeFromObject [assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getId AS id#28, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getName, true, false) AS name#29]
   :  +- *(1) MapElements com.hydee.App$$Lambda$41/1608757336@fb2c2f3, obj#27: com.hydee.entity.BinglogTest
   :     +- *(1) Filter com.hydee.App$$Lambda$40/675002551@148fca83.call
   :        +- *(1) DeserializeToObject createexternalrow(value#652, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#656, true, false), StructField(value,BinaryType,true), StructField(timestamp,TimestampType,true)), obj#26: org.apache.spark.sql.Row
   :           +- *(1) Project [value#652, timestamp#656]
   :              +- Scan ExistingRDD[key#651,value#652,topic#653,partition#654,offset#655L,timestamp#656,timestampType#657]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
      +- TiDB CoprocessorRDD{[table: dm_test] TableScan, Columns: [id], [name], KeyRange: ([t200\000\000\000\000\000\000-_r\000\000\000\000\000\000\000\000], [t200\000\000\000\000\000\000-_s\000\000\000\000\000\000\000\000]), startTs: 411221289384542210}


streamingQuery.explain
1568684752800
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@a9b9211
+- *(1) BroadcastHashJoin [cast(id#28 as bigint)], [id#33L], LeftOuter, BuildRight
   :- *(1) SerializeFromObject [assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getId AS id#28, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getName, true, false) AS name#29]
   :  +- *(1) MapElements com.hydee.App$$Lambda$41/1608757336@fb2c2f3, obj#27: com.hydee.entity.BinglogTest
   :     +- *(1) Filter com.hydee.App$$Lambda$40/675002551@148fca83.call
   :        +- *(1) DeserializeToObject createexternalrow(value#652, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#656, true, false), StructField(value,BinaryType,true), StructField(timestamp,TimestampType,true)), obj#26: org.apache.spark.sql.Row
   :           +- *(1) Project [value#652, timestamp#656]
   :              +- Scan ExistingRDD[key#651,value#652,topic#653,partition#654,offset#655L,timestamp#656,timestampType#657]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
      +- TiDB CoprocessorRDD{[table: dm_test] TableScan, Columns: [id], [name], KeyRange: ([t200\000\000\000\000\000\000-_r\000\000\000\000\000\000\000\000], [t200\000\000\000\000\000\000-_s\000\000\000\000\000\000\000\000]), startTs: 411221289384542210}


-------------------------------------------
Batch: 12
-------------------------------------------
+---+----+----+----+
|id |name|id  |name|
+---+----+----+----+
|5  |e   |null|null|
+---+----+----+----+

streamingQuery.explain
1568684754805
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@40d43acc
+- *(1) BroadcastHashJoin [cast(id#28 as bigint)], [id#33L], LeftOuter, BuildRight
   :- *(1) SerializeFromObject [assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getId AS id#28, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getName, true, false) AS name#29]
   :  +- *(1) MapElements com.hydee.App$$Lambda$41/1608757336@fb2c2f3, obj#27: com.hydee.entity.BinglogTest
   :     +- *(1) Filter com.hydee.App$$Lambda$40/675002551@148fca83.call
   :        +- *(1) DeserializeToObject createexternalrow(value#702, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#706, true, false), StructField(value,BinaryType,true), StructField(timestamp,TimestampType,true)), obj#26: org.apache.spark.sql.Row
   :           +- *(1) Project [value#702, timestamp#706]
   :              +- Scan ExistingRDD[key#701,value#702,topic#703,partition#704,offset#705L,timestamp#706,timestampType#707]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
      +- TiDB CoprocessorRDD{[table: dm_test] TableScan, Columns: [id], [name], KeyRange: ([t200\000\000\000\000\000\000-_r\000\000\000\000\000\000\000\000], [t200\000\000\000\000\000\000-_s\000\000\000\000\000\000\000\000]), startTs: 411221295885713410}
1 个赞

此话题已在最后回复的 1 分钟后被自动关闭。不再允许新回复。