可以先用这个零时的jar包,后续发布正式版本后,再切换到正式版吧
好的,谢谢
测试代码:
binlog.createOrReplaceGlobalTempView("binlog_test");
Dataset<Row> d = sqlContext.sql("select * from global_temp.binlog_test a left join hydee.dm_test b on a.id = b.id");
StreamingQuery streamingQuery = d.writeStream()
.outputMode("append")
.format("console")
.option("truncate", "false")
.start();
Thread listener = new Thread(() -> {
while(true){
System.out.println("streamingQuery.explain");
System.out.println(System.currentTimeMillis());
streamingQuery.explain();
System.out.println("");
System.out.println("");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
测试计划时间变更日志:
streamingQuery.explain
1568684744781
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@a9b9211
+- *(1) BroadcastHashJoin [cast(id#28 as bigint)], [id#33L], LeftOuter, BuildRight
:- *(1) SerializeFromObject [assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getId AS id#28, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getName, true, false) AS name#29]
: +- *(1) MapElements com.hydee.App$$Lambda$41/1608757336@fb2c2f3, obj#27: com.hydee.entity.BinglogTest
: +- *(1) Filter com.hydee.App$$Lambda$40/675002551@148fca83.call
: +- *(1) DeserializeToObject createexternalrow(value#652, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#656, true, false), StructField(value,BinaryType,true), StructField(timestamp,TimestampType,true)), obj#26: org.apache.spark.sql.Row
: +- *(1) Project [value#652, timestamp#656]
: +- Scan ExistingRDD[key#651,value#652,topic#653,partition#654,offset#655L,timestamp#656,timestampType#657]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- TiDB CoprocessorRDD{[table: dm_test] TableScan, Columns: [id], [name], KeyRange: ([t200\000\000\000\000\000\000-_r\000\000\000\000\000\000\000\000], [t200\000\000\000\000\000\000-_s\000\000\000\000\000\000\000\000]), startTs: 411221289384542210}
streamingQuery.explain
1568684746786
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@a9b9211
+- *(1) BroadcastHashJoin [cast(id#28 as bigint)], [id#33L], LeftOuter, BuildRight
:- *(1) SerializeFromObject [assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getId AS id#28, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getName, true, false) AS name#29]
: +- *(1) MapElements com.hydee.App$$Lambda$41/1608757336@fb2c2f3, obj#27: com.hydee.entity.BinglogTest
: +- *(1) Filter com.hydee.App$$Lambda$40/675002551@148fca83.call
: +- *(1) DeserializeToObject createexternalrow(value#652, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#656, true, false), StructField(value,BinaryType,true), StructField(timestamp,TimestampType,true)), obj#26: org.apache.spark.sql.Row
: +- *(1) Project [value#652, timestamp#656]
: +- Scan ExistingRDD[key#651,value#652,topic#653,partition#654,offset#655L,timestamp#656,timestampType#657]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- TiDB CoprocessorRDD{[table: dm_test] TableScan, Columns: [id], [name], KeyRange: ([t200\000\000\000\000\000\000-_r\000\000\000\000\000\000\000\000], [t200\000\000\000\000\000\000-_s\000\000\000\000\000\000\000\000]), startTs: 411221289384542210}
streamingQuery.explain
1568684748790
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@a9b9211
+- *(1) BroadcastHashJoin [cast(id#28 as bigint)], [id#33L], LeftOuter, BuildRight
:- *(1) SerializeFromObject [assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getId AS id#28, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getName, true, false) AS name#29]
: +- *(1) MapElements com.hydee.App$$Lambda$41/1608757336@fb2c2f3, obj#27: com.hydee.entity.BinglogTest
: +- *(1) Filter com.hydee.App$$Lambda$40/675002551@148fca83.call
: +- *(1) DeserializeToObject createexternalrow(value#652, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#656, true, false), StructField(value,BinaryType,true), StructField(timestamp,TimestampType,true)), obj#26: org.apache.spark.sql.Row
: +- *(1) Project [value#652, timestamp#656]
: +- Scan ExistingRDD[key#651,value#652,topic#653,partition#654,offset#655L,timestamp#656,timestampType#657]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- TiDB CoprocessorRDD{[table: dm_test] TableScan, Columns: [id], [name], KeyRange: ([t200\000\000\000\000\000\000-_r\000\000\000\000\000\000\000\000], [t200\000\000\000\000\000\000-_s\000\000\000\000\000\000\000\000]), startTs: 411221289384542210}
streamingQuery.explain
1568684750795
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@a9b9211
+- *(1) BroadcastHashJoin [cast(id#28 as bigint)], [id#33L], LeftOuter, BuildRight
:- *(1) SerializeFromObject [assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getId AS id#28, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getName, true, false) AS name#29]
: +- *(1) MapElements com.hydee.App$$Lambda$41/1608757336@fb2c2f3, obj#27: com.hydee.entity.BinglogTest
: +- *(1) Filter com.hydee.App$$Lambda$40/675002551@148fca83.call
: +- *(1) DeserializeToObject createexternalrow(value#652, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#656, true, false), StructField(value,BinaryType,true), StructField(timestamp,TimestampType,true)), obj#26: org.apache.spark.sql.Row
: +- *(1) Project [value#652, timestamp#656]
: +- Scan ExistingRDD[key#651,value#652,topic#653,partition#654,offset#655L,timestamp#656,timestampType#657]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- TiDB CoprocessorRDD{[table: dm_test] TableScan, Columns: [id], [name], KeyRange: ([t200\000\000\000\000\000\000-_r\000\000\000\000\000\000\000\000], [t200\000\000\000\000\000\000-_s\000\000\000\000\000\000\000\000]), startTs: 411221289384542210}
streamingQuery.explain
1568684752800
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@a9b9211
+- *(1) BroadcastHashJoin [cast(id#28 as bigint)], [id#33L], LeftOuter, BuildRight
:- *(1) SerializeFromObject [assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getId AS id#28, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getName, true, false) AS name#29]
: +- *(1) MapElements com.hydee.App$$Lambda$41/1608757336@fb2c2f3, obj#27: com.hydee.entity.BinglogTest
: +- *(1) Filter com.hydee.App$$Lambda$40/675002551@148fca83.call
: +- *(1) DeserializeToObject createexternalrow(value#652, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#656, true, false), StructField(value,BinaryType,true), StructField(timestamp,TimestampType,true)), obj#26: org.apache.spark.sql.Row
: +- *(1) Project [value#652, timestamp#656]
: +- Scan ExistingRDD[key#651,value#652,topic#653,partition#654,offset#655L,timestamp#656,timestampType#657]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- TiDB CoprocessorRDD{[table: dm_test] TableScan, Columns: [id], [name], KeyRange: ([t200\000\000\000\000\000\000-_r\000\000\000\000\000\000\000\000], [t200\000\000\000\000\000\000-_s\000\000\000\000\000\000\000\000]), startTs: 411221289384542210}
-------------------------------------------
Batch: 12
-------------------------------------------
+---+----+----+----+
|id |name|id |name|
+---+----+----+----+
|5 |e |null|null|
+---+----+----+----+
streamingQuery.explain
1568684754805
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@40d43acc
+- *(1) BroadcastHashJoin [cast(id#28 as bigint)], [id#33L], LeftOuter, BuildRight
:- *(1) SerializeFromObject [assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getId AS id#28, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getName, true, false) AS name#29]
: +- *(1) MapElements com.hydee.App$$Lambda$41/1608757336@fb2c2f3, obj#27: com.hydee.entity.BinglogTest
: +- *(1) Filter com.hydee.App$$Lambda$40/675002551@148fca83.call
: +- *(1) DeserializeToObject createexternalrow(value#702, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#706, true, false), StructField(value,BinaryType,true), StructField(timestamp,TimestampType,true)), obj#26: org.apache.spark.sql.Row
: +- *(1) Project [value#702, timestamp#706]
: +- Scan ExistingRDD[key#701,value#702,topic#703,partition#704,offset#705L,timestamp#706,timestampType#707]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
+- TiDB CoprocessorRDD{[table: dm_test] TableScan, Columns: [id], [name], KeyRange: ([t200\000\000\000\000\000\000-_r\000\000\000\000\000\000\000\000], [t200\000\000\000\000\000\000-_s\000\000\000\000\000\000\000\000]), startTs: 411221295885713410}
1 个赞
此话题已在最后回复的 1 分钟后被自动关闭。不再允许新回复。