structured streaming使用sql关联数据库中表,当数据库中表更新时,sql查询的结果不发生改变。
binlog.createOrReplaceGlobalTempView("binlog_test");
Dataset<Row> d = sqlContext.sql("select * from global_temp.binlog_test a left join test.dm_test b on a.id = b.id");
global_temp.binlog_test 为structured streaming获取的实时数据创建的临时表,test.dm_test为数据库中表
当test.dm_test数据改变,该sql结果不该变
请描述一下 Structured streaming 在使用 TiDB 过程如何配置的,其他操作是否正常 ?
通过kafka接入流式处理
源码如下:
package com.hydee;
import com.hydee.entity.BinglogTest;
import com.hydee.proto.BinlogOuterClass;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import java.sql.*;
/**
* test spark!
*/
public class App {
public static void main(String[] args) {
final Connection conn = getConnect();
SparkConf conf = new SparkConf().setAppName("StructuredStream")
.setMaster("spark://192.168.10.201:7077")
.set("spark.driver.cores","1")
.set("spark.driver.memory","1G")
.set("spark.deploy.defaultCores","3")
.set("spark.cores.max","3")
.set("spark.executor.cores","1")
.set("spark.executor.memory","2G");
SparkSession spark = SparkSession
.builder()
.config(conf)
.getOrCreate();
spark.sparkContext().setLogLevel("WARN");
SQLContext sqlContext = spark.sqlContext();
Dataset<Row> df = spark.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.10.201:9092,192.168.10.202:9092,192.168.10.203:9092,192.168.10.204:9092")
.option("subscribe","kafka_obinlog")
.option("startingOffsets","latest")
.load();
Dataset<Row> data = df.select(functions.col("value"),functions.col("timestamp"));
Encoder<BinglogTest> rowEncoder = Encoders.bean(BinglogTest.class);
Dataset<Row> binlog = data.map(line->{
BinlogOuterClass.Binlog binlogText = BinlogOuterClass.Binlog.parseFrom((byte[])line.get(0));
BinlogOuterClass.Table table = binlogText.getDmlData().getTables(0);
BinlogOuterClass.Row row = table.getMutations(0).getRow();
return new BinglogTest((int)row.getColumns(0).getInt64Value(),row.getColumns(1).getStringValue());
},rowEncoder).toDF();
binlog.createOrReplaceGlobalTempView("binlog_test");
Dataset<Row> d = sqlContext.sql("select * from global_temp.binlog_test a left join hydee.dm_test b on a.id = b.id");
sqlContext.tables().show();
StreamingQuery streamingQuery = d.writeStream()
.outputMode("append")
.format("console")
.option("truncate", "false")
.start();
try {
streamingQuery.awaitTermination();
streamingQuery2.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
}finally {
System.out.println("streamingQuery stop");
streamingQuery.stop();
streamingQuery2.stop();
spark.close();
}
}
}
@zhengyunpeng
使用 TiSpark 是需要配置 pd addrs 和 spark.sql.extensions 的。 具体可以参考官网文档。
配置了,tispark能正常的使用的,能查到数据,但是数据库里数据变更sql的结果不变
structured streaming的执行计划:
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@7dfa0785
12-09-2019 15:41:46 CST StructuredStream INFO - +- *(1) BroadcastHashJoin [cast(id#28 as bigint)], [id#33L], LeftOuter, BuildRight
12-09-2019 15:41:46 CST StructuredStream INFO - :- *(1) SerializeFromObject [assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getId AS id#28, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.hydee.entity.BinglogTest, true]).getName, true, false) AS name#29]
12-09-2019 15:41:46 CST StructuredStream INFO - : +- *(1) MapElements com.hydee.App$$Lambda$39/1684572536@46a97805, obj#27: com.hydee.entity.BinglogTest
12-09-2019 15:41:46 CST StructuredStream INFO - : +- *(1) Filter com.hydee.App$$Lambda$38/2017890623@667dd150.call
12-09-2019 15:41:46 CST StructuredStream INFO - : +- *(1) DeserializeToObject createexternalrow(value#328, staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, ObjectType(class java.sql.Timestamp), toJavaTimestamp, timestamp#335, true, false), StructField(value,BinaryType,true), StructField(timestamp,TimestampType,true)), obj#26: org.apache.spark.sql.Row
12-09-2019 15:41:46 CST StructuredStream INFO - : +- *(1) Project [value#328, timestamp#335]
12-09-2019 15:41:46 CST StructuredStream INFO - : +- Scan ExistingRDD[key#326,value#328,topic#330,partition#332,offset#333L,timestamp#335,timestampType#337]
12-09-2019 15:41:46 CST StructuredStream INFO - +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
12-09-2019 15:41:46 CST StructuredStream INFO - +- TiSpark CoprocessorRDD{[table: dm_test] TableScan, Columns: [id], [name], KeyRange: ([t200\000\000\000\000\000\003333_r\000\000\000\000\000\000\000\000], [t200\000\000\000\000\000\003333_s\000\000\000\000\000\000\000\000]), startTs: 411113641089695745}
zhengyunpeng:
TiSpark CoprocessorRDD{[table: dm_test] TableScan, Columns: [id], [name], KeyRange: ([t200\000\000\000\000\000\003333_r\000\000\000\000\000\000\000\000], [t200\000\000\000\000\000\003333_s\000\000\000\000\000\000\000\000]), startTs: 411113641089695745}
每次流数据关联的时候startTs: 411113641089695745都是一样的,是不是tidb的快照策略导致每次查询的都是startTs: 411113641089695745时候的数据?
是的, 如果 ts 不变,那么查到的数据也是一样的。
这种情况怎么解决?我目前是想要关联到tidb的表数据是最新的,这是tispark里增加的吗?structured streaming没有这个问题的,他对接hdfs的时候查的数据是最新的
每次使用新的 DataFrame 应当可以解决这一问题。
目前我们公司规划是使用sql来处理所有逻辑,易于开发和管理,所以暂时不考虑使用新的 DataFrame。如果在流式处理里使用新的 DataFrame会增加开发、系统复杂度。 而且如果简单的关联查询修改为创建新的 DataFrame再进行关联查询对性能也有一定影响吧?(猜测)
这个问题是tispark里进行处理时使用了ts查快照数据导致的吗,能如何不使用快照时间戳查询?
java.io.InvalidClassException: org.apache.spark.sql.tispark.TiRDD; local class incompatible: stream classdesc serialVersionUID = -6650751149050193034, local class serialVersionUID = 1833683067262546845
这是版本问题导致的吗?执行stuctured streaming任务时报错
我们用的ansible-playbook安装的tidb2.1,自动安装的spark是2.11_2.3.2
可以先用local模式运行,在本地安装spark-2.3.3或者spark-2.4.3
因为最新tispark只支持spark-2.3和spark-2.4
spark下载链接
https://archive.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
1 个赞