目錄
- ??1 設(shè)備監(jiān)控?cái)?shù)據(jù)??
- ??1.1 創(chuàng)建 Topic??
- ??1.2 模擬數(shù)據(jù)??
- ??2 基于DataFrame分析??
- ??3 基于SQL分析??
- ??4 時(shí)間概念??
- ??5 event-time 窗口分析??
- ??6 event-time 窗口生成??
- ??7 延遲數(shù)據(jù)處理??
- ??7.1 延遲數(shù)據(jù)??
- ??7.2 Watermarking 水位??
- ??7.3 官方案例演示??
1 設(shè)備監(jiān)控?cái)?shù)據(jù)
在物聯(lián)網(wǎng)時(shí)代,大量的感知器每天都在收集并產(chǎn)生著涉及各個(gè)領(lǐng)域的數(shù)據(jù)。物聯(lián)網(wǎng)提供源源不斷的數(shù)據(jù)流,使實(shí)時(shí)數(shù)據(jù)分析成為分析數(shù)據(jù)的理想工具。
模擬一個(gè)智能物聯(lián)網(wǎng)系統(tǒng)的數(shù)據(jù)統(tǒng)計(jì)分析,產(chǎn)生設(shè)備數(shù)據(jù)發(fā)送到Kafka,結(jié)構(gòu)化流Structured
Streaming實(shí)時(shí)消費(fèi)統(tǒng)計(jì)。對(duì)物聯(lián)網(wǎng)設(shè)備狀態(tài)信號(hào)數(shù)據(jù),實(shí)時(shí)統(tǒng)計(jì)分析:
- 1)、信號(hào)強(qiáng)度大于30的設(shè)備;
- 2)、各種設(shè)備類型的數(shù)量;
- 3)、各種設(shè)備類型的平均信號(hào)強(qiáng)度;
編寫程序模擬生成物聯(lián)網(wǎng)設(shè)備監(jiān)控?cái)?shù)據(jù),發(fā)送到Kafka Topic中,此處為了演示字段較少,實(shí)際
生產(chǎn)項(xiàng)目中字段很多。
1.1 創(chuàng)建 Topic
啟動(dòng)Kafka Broker服務(wù),創(chuàng)建Topic【search-log-topic】,命令如下所示:
# 啟動(dòng)Zookeeper /export/server/zookeeper/bin/zkServer.sh start # 啟動(dòng)Kafka Broker /export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties rm -rf /export/server/kafka/logs/* # 創(chuàng)建topic /export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1.oldlu.cn:2181/kafka200 --replication-fa ctor 1 --partitions 3 --topic iotTopic # 模擬生產(chǎn)者 /export/server/kafka/bin/kafka-console-producer.sh --broker-list node1.oldlu.cn:9092 --topic iotTopic # 模擬消費(fèi)者 /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1.oldlu.cn:9092 --topic iotTopic --from-beginning # 刪除topic /export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1.oldlu.cn:2181/kafka200 --topic iotTopic
1.2 模擬數(shù)據(jù)
模擬設(shè)備監(jiān)控日志數(shù)據(jù),字段信息封裝到CaseClass樣例類【DeviceData】類,代碼如下
package cn.oldlu.spark.iot * 物聯(lián)網(wǎng)設(shè)備發(fā)送狀態(tài)數(shù)據(jù) * @param device 設(shè)備標(biāo)識(shí)符ID * @param deviceType 設(shè)備類型,如服務(wù)器mysql, redis, kafka或路由器route * @param signal 設(shè)備信號(hào) * @param time 發(fā)送數(shù)據(jù)時(shí)間 */ case class DeviceData( device: String, // deviceType: String, // signal: Double, // time: Long // )
模擬產(chǎn)生日志數(shù)據(jù)類【MockIotDatas】具體代碼如下:
import java.util.Properties import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializer import org.json4s.jackson.Json import scala.util.Random object MockIotDatas { def main(args: Array[String]): Unit = { // 發(fā)送Kafka Topic val props = new Properties() props.put("bootstrap.servers", "node1.oldlu.cn:9092") props.put("acks", "1") props.put("retries", "3") props.put("key.serializer", classOf[StringSerializer].getName) props.put("value.serializer", classOf[StringSerializer].getName) val producer = new KafkaProducer[String, String](props) val deviceTypes = Array( "db", "bigdata", "kafka", "route", "bigdata", "db", "bigdata", "bigdata", "bigdata" ) val random: Random = new Random() while (true) { val index: Int = random.nextInt(deviceTypes.length) val deviceId: String = s"device_${(index + 1) * 10 + random.nextInt(index + 1)}" val deviceType: String = deviceTypes(index) val deviceSignal: Int = 10 + random.nextInt(90) // 模擬構(gòu)造設(shè)備數(shù)據(jù) val deviceData = DeviceData(deviceId, deviceType, deviceSignal, System.currentTimeMillis()) // 轉(zhuǎn)換為JSON字符串 val deviceJson: String = new Json(org.json4s.DefaultFormats).write(deviceData) println(deviceJson) Thread.sleep(100 + random.nextInt(500)) val record = new ProducerRecord[String, String]("iotTopic", deviceJson) producer.send(record) } // 關(guān)閉連接 producer.close() } }
相當(dāng)于大機(jī)房中各個(gè)服務(wù)器定時(shí)發(fā)送相關(guān)監(jiān)控?cái)?shù)據(jù)至Kafka中,服務(wù)器部署服務(wù)有數(shù)據(jù)庫db、大
數(shù)據(jù)集群bigdata、消息隊(duì)列kafka及路由器route等等,數(shù)據(jù)樣本:
{"device":"device_50","deviceType":"bigdata","signal":91.0,"time":1590660338429} {"device":"device_20","deviceType":"bigdata","signal":17.0,"time":1590660338790} {"device":"device_32","deviceType":"kafka","signal":93.0,"time":1590660338908} {"device":"device_82","deviceType":"bigdata","signal":72.0,"time":1590660339380} {"device":"device_32","deviceType":"kafka","signal":10.0,"time":1590660339972} {"device":"device_96","deviceType":"bigdata","signal":18.0,"time":1590660343554}
2 基于DataFrame分析
按照業(yè)務(wù)需求,從Kafka消費(fèi)日志數(shù)據(jù),基于DataFrame數(shù)據(jù)結(jié)構(gòu)調(diào)用函數(shù)分析,代碼如下:
package cn.oldlu.spark.iot import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery} import org.apache.spark.sql.types.{DoubleType, LongType} import org.apache.spark.sql.{DataFrame, SparkSession} /** * 對(duì)物聯(lián)網(wǎng)設(shè)備狀態(tài)信號(hào)數(shù)據(jù),實(shí)時(shí)統(tǒng)計(jì)分析: * 1)、信號(hào)強(qiáng)度大于30的設(shè)備 * 2)、各種設(shè)備類型的數(shù)量 * 3)、各種設(shè)備類型的平均信號(hào)強(qiáng)度 */ object IotStreamingOnline { def main(args: Array[String]): Unit = { // 1. 構(gòu)建SparkSession會(huì)話實(shí)例對(duì)象,設(shè)置屬性信息 val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[3]") .config("spark.sql.shuffle.partitions", "3") .getOrCreate() // 導(dǎo)入隱式轉(zhuǎn)換和函數(shù)庫 import org.apache.spark.sql.functions._ import spark.implicits._ // 2. 從Kafka讀取數(shù)據(jù),底層采用New Consumer API val iotStreamDF: DataFrame = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "node1.oldlu.cn:9092") .option("subscribe", "iotTopic") // 設(shè)置每批次消費(fèi)數(shù)據(jù)最大值 .option("maxOffsetsPerTrigger", "100000") .load() // 3. 對(duì)獲取數(shù)據(jù)進(jìn)行解析,封裝到DeviceData中 val etlStreamDF: DataFrame = iotStreamDF // 獲取value字段的值,轉(zhuǎn)換為String類型 .selectExpr("CAST(value AS STRING)") // 將數(shù)據(jù)轉(zhuǎn)換Dataset .as[String] // 內(nèi)部字段名為value // 過濾數(shù)據(jù) .filter(line => null != line && line.trim.length > 0) // 解析JSON數(shù)據(jù):{"device":"device_65","deviceType":"db","signal":12.0,"time":1589718910796} .select( get_json_object($"value", "$.device").as("device_id"), get_json_object($"value", "$.deviceType").as("device_type"), get_json_object($"value", "$.signal").cast(DoubleType).as("signal"), get_json_object($"value", "$.time").cast(LongType).as("time") ) // 4. 依據(jù)業(yè)務(wù),分析處理 // TODO: signal > 30 所有數(shù)據(jù),按照設(shè)備類型 分組,統(tǒng)計(jì)數(shù)量、平均信號(hào)強(qiáng)度 val resultStreamDF: DataFrame = etlStreamDF // 信號(hào)強(qiáng)度大于10 .filter($"signal" > 30) // 按照設(shè)備類型 分組 .groupBy($"device_type") // 統(tǒng)計(jì)數(shù)量、評(píng)價(jià)信號(hào)強(qiáng)度 .agg( count($"device_type").as("count_device"), round(avg($"signal"), 2).as("avg_signal") ) // 5. 啟動(dòng)流式應(yīng)用,結(jié)果輸出控制臺(tái) val query: StreamingQuery = resultStreamDF.writeStream .outputMode(OutputMode.Complete()) .format("console") .option("numRows", "10") .option("truncate", "false") .start() query.awaitTermination() query.stop() } }
其中使用函數(shù)get_json_object提取JSON字符串中字段值,將最終結(jié)果打印控制臺(tái)。
3 基于SQL分析
按照業(yè)務(wù)需求,從Kafka消費(fèi)日志數(shù)據(jù),提取字段信息,將DataFrame注冊(cè)為臨時(shí)視圖,編寫
SQL執(zhí)行分析,代碼如下:
package cn.oldlu.spark.iot import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery} import org.apache.spark.sql.types.{DoubleType, LongType} import org.apache.spark.sql.{DataFrame, SparkSession} /** * 對(duì)物聯(lián)網(wǎng)設(shè)備狀態(tài)信號(hào)數(shù)據(jù),實(shí)時(shí)統(tǒng)計(jì)分析,基于SQL編程 * 1)、信號(hào)強(qiáng)度大于30的設(shè)備 * 2)、各種設(shè)備類型的數(shù)量 * 3)、各種設(shè)備類型的平均信號(hào)強(qiáng)度 */ object IotStreamingOnlineSQL { def main(args: Array[String]): Unit = { // 1. 構(gòu)建SparkSession會(huì)話實(shí)例對(duì)象,設(shè)置屬性信息 val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[3]") .config("spark.sql.shuffle.partitions", "3") .getOrCreate() import org.apache.spark.sql.functions._ import spark.implicits._ // 2. 從Kafka讀取數(shù)據(jù),底層采用New Consumer API val iotStreamDF: DataFrame = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "node1.oldlu.cn:9092") .option("subscribe", "iotTopic") // 設(shè)置每批次消費(fèi)數(shù)據(jù)最大值 .option("maxOffsetsPerTrigger", "100000") .load() // 3. 對(duì)獲取數(shù)據(jù)進(jìn)行解析,封裝到DeviceData中 val etlStreamDF: DataFrame = iotStreamDF // 獲取value字段的值,轉(zhuǎn)換為String類型 .selectExpr("CAST(value AS STRING)") // 將數(shù)據(jù)轉(zhuǎn)換Dataset .as[String] // 內(nèi)部字段名為value // 過濾數(shù)據(jù) .filter(line => null != line && line.trim.length > 0) // 解析JSON數(shù)據(jù):{"device":"device_65","deviceType":"db","signal":12.0,"time":1589718910796} .select( get_json_object($"value", "$.device").as("device_id"), get_json_object($"value", "$.deviceType").as("device_type"), get_json_object($"value", "$.signal").cast(DoubleType).as("signal"), get_json_object($"value", "$.time").cast(LongType).as("time") ) // 4. 依據(jù)業(yè)務(wù),分析處理 // TODO: signal > 30 所有數(shù)據(jù),按照設(shè)備類型 分組,統(tǒng)計(jì)數(shù)量、平均信號(hào)強(qiáng)度 // 4.1 注冊(cè)DataFrame為臨時(shí)視圖 etlStreamDF.createOrReplaceTempView("view_tmp_stream_iots") // 4.2 編寫SQL執(zhí)行查詢 val resultStreamDF: DataFrame = spark.sql( """ |SELECT | device_type, COUNT(device_type) AS count_device, ROUND(AVG(signal), 2) AS avg_signal |FROM view_tmp_stream_iots |WHERE signal > 30 GROUP BY device_type |""".stripMargin) // 5. 啟動(dòng)流式應(yīng)用,結(jié)果輸出控制臺(tái) val query: StreamingQuery = resultStreamDF.writeStream .outputMode(OutputMode.Complete()) .foreachBatch { (batchDF: DataFrame, batchId: Long) => println("===========================================") println(s"BatchId = ${batchId}") println("===========================================") if (!batchDF.isEmpty) batchDF.coalesce(1).show(20, truncate = false) } .start() query.awaitTermination() query.stop() } }
運(yùn)行流式應(yīng)用,結(jié)果如下圖所示:
4 時(shí)間概念
在SparkStreaming中窗口統(tǒng)計(jì)分析:Window Operation(設(shè)置窗口大小WindowInterval和滑動(dòng)大小SlideInterval),按照Streaming 流式應(yīng)用接收數(shù)據(jù)的時(shí)間進(jìn)行窗口設(shè)計(jì)的,其實(shí)是不符合實(shí)際應(yīng)用場(chǎng)景的。
例如,在物聯(lián)網(wǎng)數(shù)據(jù)平臺(tái)中,每個(gè)設(shè)備產(chǎn)生的數(shù)據(jù),其中包含數(shù)據(jù)產(chǎn)生的時(shí)間,然而數(shù)據(jù)需要
經(jīng)過一系列采集傳輸才能被流式計(jì)算框架處理:SparkStreaming,此過程需要時(shí)間的,再按照處理
時(shí)間來統(tǒng)計(jì)業(yè)務(wù)的時(shí)候,準(zhǔn)確性降低,存在不合理性。
在結(jié)構(gòu)化流Structured Streaming中窗口數(shù)據(jù)統(tǒng)計(jì)時(shí)間是基于數(shù)據(jù)本身事件時(shí)間EventTime字
段統(tǒng)計(jì),更加合理性,官方文檔:
http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#window-operations-on-event-time
在Streaming流式數(shù)據(jù)處理中,按照時(shí)間處理數(shù)據(jù),其中時(shí)間有三種概念:
- 1)、事件時(shí)間EventTime,表示數(shù)據(jù)本身產(chǎn)生的時(shí)間,該字段在數(shù)據(jù)本身中;
- 2)、注入時(shí)間IngestionTime,表示數(shù)據(jù)到達(dá)流式系統(tǒng)時(shí)間,簡而言之就是流式處理系統(tǒng)接收到
數(shù)據(jù)的時(shí)間; - 3)、處理時(shí)間ProcessingTime,表示數(shù)據(jù)被流式系統(tǒng)真正開始計(jì)算操作的時(shí)間。不同流式計(jì)算框架支持時(shí)間不一樣,SparkStreaming框架僅僅支持處理時(shí)間ProcessTime,StructuredStreaming支持事件時(shí)間和處理時(shí)間,F(xiàn)link框架支持三種時(shí)間數(shù)據(jù)操作,實(shí)際項(xiàng)目中往往針對(duì)【事件時(shí)間EventTime】進(jìn)行數(shù)據(jù)處理操作,更加合理化。
5 event-time 窗口分析
基于事件時(shí)間窗口聚合操作:基于窗口的聚合(例如每分鐘事件數(shù))只是事件時(shí)間列上特殊類型的分組和聚合,其中每個(gè)時(shí)間窗口都是一個(gè)組,并且每一行可以屬于多個(gè)窗口/組。事件時(shí)間EventTime是嵌入到數(shù)據(jù)本身中的時(shí)間,數(shù)據(jù)實(shí)際真實(shí)產(chǎn)生的時(shí)間。例如,如果希望獲得每分鐘由物聯(lián)網(wǎng)設(shè)備生成的事件數(shù),那么可能希望使用生成數(shù)據(jù)的時(shí)間(即數(shù)據(jù)中的事件時(shí)間event time),而不是Spark接收數(shù)據(jù)的時(shí)間(receive time/archive time)。
這個(gè)事件時(shí)間很自然地用這個(gè)模型表示,設(shè)備中的每個(gè)事件(Event)都是表中的一行(Row),而事件時(shí)間(Event Time)是行中的一列值(Column Value)。
因此,這種基于事件時(shí)間窗口的聚合查詢既可以在靜態(tài)數(shù)據(jù)集(例如,從收集的設(shè)備事件日志中)上定義,也可以在數(shù)據(jù)流上定義,從而使用戶的使用更加容易。修改詞頻統(tǒng)計(jì)程序,數(shù)據(jù)流包含每行數(shù)據(jù)以及生成每行行的時(shí)間。希望在10分鐘的窗口內(nèi)對(duì)單詞進(jìn)行計(jì)數(shù),每5分鐘更新一次,如下圖所示:
單詞在10分鐘窗口【12:00-12:10、12:05-12:15、12:10-12:20】等之間接收的單詞中計(jì)數(shù)。注意,
【12:00-12:10】表示處理數(shù)據(jù)的事件時(shí)間為12:00之后但12:10之前的數(shù)據(jù)。思考一下,12:07的一條數(shù)據(jù),應(yīng)該增加對(duì)應(yīng)于兩個(gè)窗口12:00-12:10和12:05-12:15的計(jì)數(shù)?;谑录r(shí)間窗口統(tǒng)計(jì)有兩個(gè)參數(shù)索引:分組鍵(如單詞)和窗口(事件時(shí)間字段)。
為了演示案例,將上述案例中的每5分鐘統(tǒng)計(jì)最近10分鐘窗口改為每5秒統(tǒng)計(jì)最近10秒窗口數(shù)
據(jù),測(cè)試數(shù)據(jù)集:
2019-10-12 09:00:02,cat dog 2019-10-12 09:00:03,dog dog 2019-10-12 09:00:07,owl cat 2019-10-12 09:00:11,dog 2019-10-12 09:00:13,owl
案例中三個(gè)時(shí)間范圍,說明如下:
1、觸發(fā)時(shí)間間隔,trigger interval:5秒 (案例:5分鐘) 2、事件時(shí)間窗口大小,window interval:10秒(案例:10分鐘) 3、滑動(dòng)大小,slider interval:5秒(案例:5分鐘)
官方案例演示代碼如下:
import java.sql.Timestamp import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger} import org.apache.spark.sql.{DataFrame, SparkSession} /** * 基于Structured Streaming 模塊讀取TCP Socket讀取數(shù)據(jù),進(jìn)行事件時(shí)間窗口統(tǒng)計(jì)詞頻WordCount,將結(jié)果打印到控制臺(tái) * TODO:每5秒鐘統(tǒng)計(jì)最近10秒內(nèi)的數(shù)據(jù)(詞頻:WordCount) * * EventTime即事件真正生成的時(shí)間: * 例如一個(gè)用戶在10:06點(diǎn)擊 了一個(gè)按鈕,記錄在系統(tǒng)中為10:06 * 這條數(shù)據(jù)發(fā)送到Kafka,又到了Spark Streaming中處理,已經(jīng)是10:08,這個(gè)處理的時(shí)間就是process Time。 * * 測(cè)試數(shù)據(jù): * 2019-10-12 09:00:02,cat dog * 2019-10-12 09:00:03,dog dog * 2019-10-12 09:00:07,owl cat * 2019-10-12 09:00:11,dog * 2019-10-12 09:00:13,owl */ object StructuredWindow { def main(args: Array[String]): Unit = { // 1. 構(gòu)建SparkSession實(shí)例對(duì)象,傳遞sparkConf參數(shù) val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[2]") .config("spark.sql.shuffle.partitions", "2") .getOrCreate() import org.apache.spark.sql.functions._ import spark.implicits._ // 2. 使用SparkSession從TCP Socket讀取流式數(shù)據(jù) val inputStreamDF: DataFrame = spark.readStream .format("socket") .option("host", "node1.oldlu.cn") .option("port", 9999) .load() // 3. 針對(duì)獲取流式DStream進(jìn)行詞頻統(tǒng)計(jì) val resultStreamDF = inputStreamDF // 將DataFrame轉(zhuǎn)換為Dataset操作,Dataset是類型安全,強(qiáng)類型 .as[String] .filter(line => null != line && line.trim.length > 0) // 將每行數(shù)據(jù)進(jìn)行分割單詞: 2019-10-12 09:00:02,cat dog .flatMap { line => val arr = line.trim.split(",") arr(1).split("\s+").map(word => (Timestamp.valueOf(arr(0)), word)) } // 設(shè)置列的名稱 .toDF("insert_timestamp", "word") // TODO:設(shè)置基于事件時(shí)間(event time)窗口 -> insert_timestamp, 每5秒統(tǒng)計(jì)最近10秒內(nèi)數(shù)據(jù) /* 1. 先按照窗口分組、2. 再對(duì)窗口中按照單詞分組、 3. 最后使用聚合函數(shù)聚合 */ .groupBy( window($"insert_timestamp", "10 seconds", "5 seconds"), $"word" ).count() .orderBy($"window") // 按照窗口字段降序排序 /* root |-- window: struct (nullable = true) | |-- start: timestamp (nullable = true) | |-- end: timestamp (nullable = true) |-- word: string (nullable = true) |-- count: long (nullable = false) */ //resultStreamDF.printSchema() // 4. 將計(jì)算的結(jié)果輸出,打印到控制臺(tái) val query: StreamingQuery = resultStreamDF.writeStream .outputMode(OutputMode.Complete()) .format("console") .option("numRows", "100") .option("truncate", "false") .trigger(Trigger.ProcessingTime("5 seconds")) .start() query.awaitTermination() query.stop() } }
運(yùn)行上述基于事件時(shí)間Event Time窗口統(tǒng)計(jì)流式應(yīng)用,演示效果圖如下所示:
6 event-time 窗口生成
Structured Streaming中如何依據(jù)EventTime事件時(shí)間生成窗口的呢?查看類TimeWindowing源碼中生成窗口規(guī)則:
org.apache.spark.sql.catalyst.analysis.TimeWindowing // 窗口個(gè)數(shù) /* 最大的窗口數(shù) = 向上取整(窗口長度/滑動(dòng)步長) */ maxNumOverlapping <- ceil(windowDuration / slideDuration) for (i <- 0 until maxNumOverlapping) /** timestamp是event-time 傳進(jìn)的時(shí)間戳 startTime是window窗口參數(shù),默認(rèn)是0 second 從時(shí)間的0s 含義:event-time從1970年...有多少個(gè)滑動(dòng)步長,如果說浮點(diǎn)數(shù)會(huì)向上取整 */ windowId <- ceil((timestamp - startTime) / slideDuration) /** windowId * slideDuration 向上取能整除滑動(dòng)步長的時(shí)間 (i - maxNumOverlapping) * slideDuration 每一個(gè)窗口開始時(shí)間相差一個(gè)步長 */ windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTime windowEnd <- windowStart + windowDuration return windowStart, windowEnd
將【(event-time向上取 能整除 滑動(dòng)步長的時(shí)間) - (最大窗口數(shù)×滑動(dòng)步長)】作為"初始窗口"
的開始時(shí)間,然后按照窗口滑動(dòng)寬度逐漸向時(shí)間軸前方推進(jìn),直到某個(gè)窗口不再包含該event-time
為止,最終以"初始窗口"與"結(jié)束窗口"之間的若干個(gè)窗口作為最終生成的 event-time 的時(shí)間窗口。
每個(gè)窗口的起始時(shí)間start與結(jié)束時(shí)間end都是前閉后開(左閉右開)的區(qū)間,因此初始窗口和結(jié)束窗口都不會(huì)包含 event-time,最終不會(huì)被使用。假設(shè)數(shù)據(jù)為【2019-08-14 10:50:00, dog】,按照上述規(guī)則計(jì)算窗口示意圖如下:
得到窗口如下:
7 延遲數(shù)據(jù)處理
Structed Streaming與Spark Streaming相比一大特性就是支持基于數(shù)據(jù)中的時(shí)間戳的數(shù)據(jù)處理。也就是在處理數(shù)據(jù)時(shí),可以對(duì)記錄中的eventTime事件時(shí)間字段進(jìn)行考慮。因?yàn)閑ventTime更好的代表數(shù)據(jù)本身的信息,且可以借助eventTime處理比預(yù)期晚到達(dá)的數(shù)據(jù),但是需要有一個(gè)限度(閾值),不能一直等,應(yīng)該要設(shè)定最多等多久。
7.1 延遲數(shù)據(jù)
在很多流計(jì)算系統(tǒng)中,數(shù)據(jù)延遲到達(dá)(the events arrives late to the application)的情況很常見,并且很多時(shí)候是不可控的,因?yàn)楹芏鄷r(shí)候是外圍系統(tǒng)自身問題造成的。Structured Streaming可以保證一條舊的數(shù)據(jù)進(jìn)入到流上時(shí),依然可以基于這些“遲到”的數(shù)據(jù)重新計(jì)算并更新計(jì)算結(jié)果。
上圖中在12:04(即事件時(shí)間)生成的單詞可能在12:11被應(yīng)用程序接收,此時(shí),應(yīng)用程序應(yīng)使用時(shí)間12:04而不是12:11更新窗口12:00-12:10的舊計(jì)數(shù)。但是會(huì)出現(xiàn)如下兩個(gè)問題:
- 問題一:延遲數(shù)據(jù)計(jì)算是否有價(jià)值
-
- 如果某些數(shù)據(jù),延遲很長時(shí)間(如30分鐘)才到達(dá)流式處理系統(tǒng),數(shù)據(jù)還需要再次計(jì)算嗎?
計(jì)算的結(jié)果還有價(jià)值嗎?原因在于流式處理系統(tǒng)處理數(shù)據(jù)關(guān)鍵核心在于實(shí)時(shí)性; - 實(shí)踐表明,流計(jì)算關(guān)注的是近期數(shù)據(jù),更新一個(gè)很早之前的狀態(tài)往往已經(jīng)不再具有很大的業(yè)務(wù)價(jià)值;
- 問題二:以前狀態(tài)保存浪費(fèi)資源
- 實(shí)時(shí)統(tǒng)計(jì)來說,如果保存很久以前的數(shù)據(jù)狀態(tài),很多時(shí)候沒有作用的,反而浪費(fèi)大量資源;
Spark 2.1引入的watermarking允許用戶指定延遲數(shù)據(jù)的閾值,也允許引擎清除掉舊的狀態(tài)。即根據(jù)watermark機(jī)制來設(shè)置和判斷消息的有效性,如可以獲取消息本身的時(shí)間戳,然后根據(jù)該時(shí)間戳來判斷消息的到達(dá)是否延遲(亂序)以及延遲的時(shí)間是否在容忍的范圍內(nèi)(延遲的數(shù)據(jù)是否處理)。
7.2 Watermarking 水位
水位watermarking官方定義:
lets the engine automatically track the current event time in the data and attempt to clean up old state accordingly.
翻譯:讓Spark SQL引擎自動(dòng)追蹤數(shù)據(jù)中當(dāng)前事件時(shí)間EventTime,依據(jù)規(guī)則清除舊的狀態(tài)數(shù)據(jù)。通過指定event-time列(上一批次數(shù)據(jù)中EventTime最大值)和預(yù)估事件的延遲時(shí)間上限(Threshold)來定義一個(gè)查詢的水位線watermark。
Watermark = MaxEventTime - Threshod
- 第一點(diǎn):執(zhí)行第一批次數(shù)據(jù)時(shí),Watermarker為0,所以此批次中所有數(shù)據(jù)都參與計(jì)算;
- 第二點(diǎn):Watermarker值只能逐漸增加,不能減少;
- 第三點(diǎn):Watermark機(jī)制主要解決處理聚合延遲數(shù)據(jù)和減少內(nèi)存中維護(hù)的聚合狀態(tài);
- 第四點(diǎn):設(shè)置Watermark以后,輸出模式OutputMode只能是Append和Update;
如下方式設(shè)置閾值Threshold,計(jì)算每批次數(shù)據(jù)執(zhí)行時(shí)的水位Watermark:
看一下官方案例:詞頻統(tǒng)計(jì)WordCount,設(shè)置閾值Threshold為10分鐘,每5分鐘觸發(fā)執(zhí)行一次。
- 延遲到達(dá)但沒有超過watermark:(12:08, dog)
在12:20觸發(fā)執(zhí)行窗口(12:10-12:20)數(shù)據(jù)中,(12:08, dog) 數(shù)據(jù)是延遲數(shù)據(jù),閾值Threshold設(shè)定為10分鐘,此時(shí)水位線【W(wǎng)atermark = 12:14 - 10m = 12:04】,因?yàn)?2:14是上個(gè)窗口(12:05-12:15)中接收到的最大的事件時(shí)間,代表目標(biāo)系統(tǒng)最后時(shí)刻的狀態(tài),由于12:08在12:04之后,因此被視為“雖然遲到但尚且可以接收”的數(shù)據(jù)而被更新到了結(jié)果表中,也就是(12:00 - 12:10, dog, 2)和(12:05 - 12:11,dog, 3)。 - 超出watermark:(12:04, donkey)
在12:25觸發(fā)執(zhí)行窗口(12:15-12:25)數(shù)據(jù)中,(12:04, donkey)數(shù)據(jù)是延遲數(shù)據(jù),上個(gè)窗口中接收到最大的事件時(shí)間為12:21,此時(shí)水位線【W(wǎng)atermark = 12:21 - 10m = 12:11】,而(12:04, donkey)比這個(gè)值還要早,說明它”太舊了”,所以不會(huì)被更新到結(jié)果表中了。
設(shè)置水位線Watermark以后,不同輸出模式OutputMode,結(jié)果輸出不一樣: - Update模式:總是傾向于“盡可能早”的將處理結(jié)果更新到sink,當(dāng)出現(xiàn)遲到數(shù)據(jù)時(shí),早期的某個(gè)計(jì)算結(jié)果將會(huì)被更新;
- Append模式:推遲計(jì)算結(jié)果的輸出到一個(gè)相對(duì)較晚的時(shí)刻,確保結(jié)果是穩(wěn)定的,不會(huì)再被更新,
比如:12:00 - 12:10窗口的處理結(jié)果會(huì)等到watermark更新到12:11之后才會(huì)寫入到sink。如果用于接收處理結(jié)果的sink不支持更新操作,則只能選擇Append模式。
7.3 官方案例演示
編寫代碼,演示官方案例,如下幾點(diǎn)注意:
1、該outputMode為update模式,即只會(huì)輸出那些有更新的數(shù)據(jù)?。? 2、該開窗窗口長度為10 min,步長5 min,水印為eventtime-10 min,(需理解開窗規(guī)則) 3、官網(wǎng)案例trigger(Trigger.ProcessingTime("5 minutes")),但是測(cè)試的時(shí)候不建議使用這個(gè) 4、未輸出數(shù)據(jù)不代表已經(jīng)在內(nèi)存中被剔除,只是由于update模式的原因 5、建議比對(duì)append理解水印
測(cè)試數(shù)據(jù):
dog,2019-10-10 12:00:07 owl,2019-10-10 12:00:08 dog,2019-10-10 12:00:14 cat,2019-10-10 12:00:09 cat,2019-10-10 12:00:15 dog,2019-10-10 12:00:08 owl,2019-10-10 12:00:13 owl,2019-10-10 12:00:21 owl,2019-10-10 12:00:17
具體案例代碼如下:
import java.sql.Timestamp import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger} import org.apache.spark.sql.{DataFrame, SparkSession} /** * 基于Structured Streaming 讀取TCP Socket讀取數(shù)據(jù),事件時(shí)間窗口統(tǒng)計(jì)詞頻,將結(jié)果打印到控制臺(tái) * TODO:每5秒鐘統(tǒng)計(jì)最近10秒內(nèi)的數(shù)據(jù)(詞頻:WordCount),設(shè)置水位Watermark時(shí)間為10秒 * dog,2019-10-10 12:00:07 * owl,2019-10-10 12:00:08 * dog,2019-10-10 12:00:14 * cat,2019-10-10 12:00:09 * cat,2019-10-10 12:00:15 * dog,2019-10-10 12:00:08 * owl,2019-10-10 12:00:13 * owl,2019-10-10 12:00:21 * owl,2019-10-10 12:00:17 */ object StructuredWatermarkUpdate { def main(args: Array[String]): Unit = { // 1. 構(gòu)建SparkSession實(shí)例對(duì)象,傳遞sparkConf參數(shù) val spark: SparkSession = SparkSession.builder() .appName(this.getClass.getSimpleName.stripSuffix("$")) .master("local[2]") .config("spark.sql.shuffle.partitions", "2") .getOrCreate() // b. 導(dǎo)入隱式轉(zhuǎn)換及函數(shù)庫 import org.apache.spark.sql.functions._ import spark.implicits._ // 2. 使用SparkSession從TCP Socket讀取流式數(shù)據(jù) val inputStreamDF: DataFrame = spark.readStream .format("socket") .option("host", "node1.oldlu.cn") .option("port", 9999) .load() // 3. 針對(duì)獲取流式DStream設(shè)置EventTime窗口及Watermark水位限制 val resultStreamDF = inputStreamDF // 將DataFrame轉(zhuǎn)換為Dataset操作,Dataset是類型安全,強(qiáng)類型 .as[String] // 過濾無效數(shù)據(jù) .filter(line => null != line && line.trim.length > 0) // 將每行數(shù)據(jù)進(jìn)行分割單詞: 2019-10-12 09:00:02,cat dog .map { line => val arr = line.trim.split(",") (arr(0), Timestamp.valueOf(arr(1))) } // 設(shè)置列的名稱 .toDF("word", "time") // TODO:設(shè)置水位Watermark .withWatermark("time", "10 seconds") // TODO:設(shè)置基于事件時(shí)間(event time)窗口 -> time, 每5秒統(tǒng)計(jì)最近10秒內(nèi)數(shù)據(jù) .groupBy( window($"time", "10 seconds", "5 seconds"), $"word" ).count() // 4. 將計(jì)算的結(jié)果輸出,打印到控制臺(tái) val query: StreamingQuery = resultStreamDF.writeStream .outputMode(OutputMode.Update()) .format("console") .option("numRows", "100") .option("truncate", "false") .trigger(Trigger.ProcessingTime("5 seconds")) .start() // 流式DataFrame,需要啟動(dòng) // 查詢器一直等待流式應(yīng)用結(jié)束 query.awaitTermination() query.stop() } }