大數(shù)據(jù)Spark物聯(lián)網(wǎng)設(shè)備數(shù)據(jù)分析

| 2022-09-15 admin

目錄


1 設(shè)備監(jiān)控?cái)?shù)據(jù)

在物聯(lián)網(wǎng)時(shí)代,大量的感知器每天都在收集并產(chǎn)生著涉及各個(gè)領(lǐng)域的數(shù)據(jù)。物聯(lián)網(wǎng)提供源源不斷的數(shù)據(jù)流,使實(shí)時(shí)數(shù)據(jù)分析成為分析數(shù)據(jù)的理想工具。大數(shù)據(jù)Spark物聯(lián)網(wǎng)設(shè)備數(shù)據(jù)分析_big data

模擬一個(gè)智能物聯(lián)網(wǎng)系統(tǒng)的數(shù)據(jù)統(tǒng)計(jì)分析,產(chǎn)生設(shè)備數(shù)據(jù)發(fā)送到Kafka,結(jié)構(gòu)化流Structured

Streaming實(shí)時(shí)消費(fèi)統(tǒng)計(jì)。對(duì)物聯(lián)網(wǎng)設(shè)備狀態(tài)信號(hào)數(shù)據(jù),實(shí)時(shí)統(tǒng)計(jì)分析:

  •  
  • 1)、信號(hào)強(qiáng)度大于30的設(shè)備;
  • 2)、各種設(shè)備類型的數(shù)量;
  • 3)、各種設(shè)備類型的平均信號(hào)強(qiáng)度;

編寫程序模擬生成物聯(lián)網(wǎng)設(shè)備監(jiān)控?cái)?shù)據(jù),發(fā)送到Kafka Topic中,此處為了演示字段較少,實(shí)際

生產(chǎn)項(xiàng)目中字段很多。

1.1 創(chuàng)建 Topic

啟動(dòng)Kafka Broker服務(wù),創(chuàng)建Topic【search-log-topic】,命令如下所示:

# 啟動(dòng)Zookeeper
/export/server/zookeeper/bin/zkServer.sh start
# 啟動(dòng)Kafka Broker
/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties
rm -rf /export/server/kafka/logs/*
# 創(chuàng)建topic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1.oldlu.cn:2181/kafka200 --replication-fa
ctor 1 --partitions 3 --topic iotTopic
# 模擬生產(chǎn)者
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1.oldlu.cn:9092 --topic iotTopic
# 模擬消費(fèi)者
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1.oldlu.cn:9092 --topic iotTopic
--from-beginning
# 刪除topic
/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1.oldlu.cn:2181/kafka200 --topic iotTopic

1.2 模擬數(shù)據(jù)

模擬設(shè)備監(jiān)控日志數(shù)據(jù),字段信息封裝到CaseClass樣例類【DeviceData】類,代碼如下

package cn.oldlu.spark.iot
 * 物聯(lián)網(wǎng)設(shè)備發(fā)送狀態(tài)數(shù)據(jù)
 * @param device     設(shè)備標(biāo)識(shí)符ID
 * @param deviceType 設(shè)備類型,如服務(wù)器mysql, redis, kafka或路由器route
 * @param signal     設(shè)備信號(hào)
 * @param time       發(fā)送數(shù)據(jù)時(shí)間
 */
case class DeviceData(
                       device: String, //
                       deviceType: String, //
                       signal: Double, //
                       time: Long //
                     )

模擬產(chǎn)生日志數(shù)據(jù)類【MockIotDatas】具體代碼如下:

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.json4s.jackson.Json
import scala.util.Random

object MockIotDatas {
  def main(args: Array[String]): Unit = {
    // 發(fā)送Kafka Topic
    val props = new Properties()
    props.put("bootstrap.servers", "node1.oldlu.cn:9092")
    props.put("acks", "1")
    props.put("retries", "3")
    props.put("key.serializer", classOf[StringSerializer].getName)
    props.put("value.serializer", classOf[StringSerializer].getName)
    val producer = new KafkaProducer[String, String](props)
    val deviceTypes = Array(
      "db", "bigdata", "kafka", "route", "bigdata", "db", "bigdata", "bigdata", "bigdata"
    )
    val random: Random = new Random()
    while (true) {
      val index: Int = random.nextInt(deviceTypes.length)
      val deviceId: String = s"device_${(index + 1) * 10 + random.nextInt(index + 1)}"
      val deviceType: String = deviceTypes(index)
      val deviceSignal: Int = 10 + random.nextInt(90)
      // 模擬構(gòu)造設(shè)備數(shù)據(jù)
      val deviceData = DeviceData(deviceId, deviceType, deviceSignal, System.currentTimeMillis())
      // 轉(zhuǎn)換為JSON字符串
      val deviceJson: String = new Json(org.json4s.DefaultFormats).write(deviceData)
      println(deviceJson)
      Thread.sleep(100 + random.nextInt(500))
      val record = new ProducerRecord[String, String]("iotTopic", deviceJson)
      producer.send(record)
    }
    // 關(guān)閉連接
    producer.close()
  }
}

相當(dāng)于大機(jī)房中各個(gè)服務(wù)器定時(shí)發(fā)送相關(guān)監(jiān)控?cái)?shù)據(jù)至Kafka中,服務(wù)器部署服務(wù)有數(shù)據(jù)庫db、大

數(shù)據(jù)集群bigdata、消息隊(duì)列kafka及路由器route等等,數(shù)據(jù)樣本:

{"device":"device_50","deviceType":"bigdata","signal":91.0,"time":1590660338429}
{"device":"device_20","deviceType":"bigdata","signal":17.0,"time":1590660338790}
{"device":"device_32","deviceType":"kafka","signal":93.0,"time":1590660338908}
{"device":"device_82","deviceType":"bigdata","signal":72.0,"time":1590660339380}
{"device":"device_32","deviceType":"kafka","signal":10.0,"time":1590660339972}
{"device":"device_96","deviceType":"bigdata","signal":18.0,"time":1590660343554}

2 基于DataFrame分析

按照業(yè)務(wù)需求,從Kafka消費(fèi)日志數(shù)據(jù),基于DataFrame數(shù)據(jù)結(jié)構(gòu)調(diào)用函數(shù)分析,代碼如下:

package cn.oldlu.spark.iot

import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types.{DoubleType, LongType}
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 對(duì)物聯(lián)網(wǎng)設(shè)備狀態(tài)信號(hào)數(shù)據(jù),實(shí)時(shí)統(tǒng)計(jì)分析:
 * 1)、信號(hào)強(qiáng)度大于30的設(shè)備
 * 2)、各種設(shè)備類型的數(shù)量
 * 3)、各種設(shè)備類型的平均信號(hào)強(qiáng)度
 */
object IotStreamingOnline {
  def main(args: Array[String]): Unit = {
    // 1. 構(gòu)建SparkSession會(huì)話實(shí)例對(duì)象,設(shè)置屬性信息
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[3]")
      .config("spark.sql.shuffle.partitions", "3")
      .getOrCreate()
    // 導(dǎo)入隱式轉(zhuǎn)換和函數(shù)庫
    import org.apache.spark.sql.functions._
    import spark.implicits._
    // 2. 從Kafka讀取數(shù)據(jù),底層采用New Consumer API
    val iotStreamDF: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "node1.oldlu.cn:9092")
      .option("subscribe", "iotTopic")
      // 設(shè)置每批次消費(fèi)數(shù)據(jù)最大值
      .option("maxOffsetsPerTrigger", "100000")
      .load()
    // 3. 對(duì)獲取數(shù)據(jù)進(jìn)行解析,封裝到DeviceData中
    val etlStreamDF: DataFrame = iotStreamDF
      // 獲取value字段的值,轉(zhuǎn)換為String類型
      .selectExpr("CAST(value AS STRING)")
      // 將數(shù)據(jù)轉(zhuǎn)換Dataset
      .as[String] // 內(nèi)部字段名為value
      // 過濾數(shù)據(jù)
      .filter(line => null != line && line.trim.length > 0)
      // 解析JSON數(shù)據(jù):{"device":"device_65","deviceType":"db","signal":12.0,"time":1589718910796}
      .select(
        get_json_object($"value", "$.device").as("device_id"),
        get_json_object($"value", "$.deviceType").as("device_type"),
        get_json_object($"value", "$.signal").cast(DoubleType).as("signal"),
        get_json_object($"value", "$.time").cast(LongType).as("time")
      )
    // 4. 依據(jù)業(yè)務(wù),分析處理
    // TODO: signal > 30 所有數(shù)據(jù),按照設(shè)備類型 分組,統(tǒng)計(jì)數(shù)量、平均信號(hào)強(qiáng)度
    val resultStreamDF: DataFrame = etlStreamDF
      // 信號(hào)強(qiáng)度大于10
      .filter($"signal" > 30)
      // 按照設(shè)備類型 分組
      .groupBy($"device_type")
      // 統(tǒng)計(jì)數(shù)量、評(píng)價(jià)信號(hào)強(qiáng)度
      .agg(
        count($"device_type").as("count_device"),
        round(avg($"signal"), 2).as("avg_signal")
      )
    // 5. 啟動(dòng)流式應(yīng)用,結(jié)果輸出控制臺(tái)
    val query: StreamingQuery = resultStreamDF.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .option("numRows", "10")
      .option("truncate", "false")
      .start()
    query.awaitTermination()
    query.stop()
  }
}

其中使用函數(shù)get_json_object提取JSON字符串中字段值,將最終結(jié)果打印控制臺(tái)。

3 基于SQL分析

按照業(yè)務(wù)需求,從Kafka消費(fèi)日志數(shù)據(jù),提取字段信息,將DataFrame注冊(cè)為臨時(shí)視圖,編寫

SQL執(zhí)行分析,代碼如下:

package cn.oldlu.spark.iot

import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types.{DoubleType, LongType}
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 對(duì)物聯(lián)網(wǎng)設(shè)備狀態(tài)信號(hào)數(shù)據(jù),實(shí)時(shí)統(tǒng)計(jì)分析,基于SQL編程
 * 1)、信號(hào)強(qiáng)度大于30的設(shè)備
 * 2)、各種設(shè)備類型的數(shù)量
 * 3)、各種設(shè)備類型的平均信號(hào)強(qiáng)度
 */
object IotStreamingOnlineSQL {
  def main(args: Array[String]): Unit = {
    // 1. 構(gòu)建SparkSession會(huì)話實(shí)例對(duì)象,設(shè)置屬性信息
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[3]")
      .config("spark.sql.shuffle.partitions", "3")
      .getOrCreate()
    import org.apache.spark.sql.functions._
    import spark.implicits._
    // 2. 從Kafka讀取數(shù)據(jù),底層采用New Consumer API
    val iotStreamDF: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "node1.oldlu.cn:9092")
      .option("subscribe", "iotTopic")
      // 設(shè)置每批次消費(fèi)數(shù)據(jù)最大值
      .option("maxOffsetsPerTrigger", "100000")
      .load()
    // 3. 對(duì)獲取數(shù)據(jù)進(jìn)行解析,封裝到DeviceData中
    val etlStreamDF: DataFrame = iotStreamDF
      // 獲取value字段的值,轉(zhuǎn)換為String類型
      .selectExpr("CAST(value AS STRING)")
      // 將數(shù)據(jù)轉(zhuǎn)換Dataset
      .as[String] // 內(nèi)部字段名為value
      // 過濾數(shù)據(jù)
      .filter(line => null != line && line.trim.length > 0)
      // 解析JSON數(shù)據(jù):{"device":"device_65","deviceType":"db","signal":12.0,"time":1589718910796}
      .select(
        get_json_object($"value", "$.device").as("device_id"),
        get_json_object($"value", "$.deviceType").as("device_type"),
        get_json_object($"value", "$.signal").cast(DoubleType).as("signal"),
        get_json_object($"value", "$.time").cast(LongType).as("time")
      )
    // 4. 依據(jù)業(yè)務(wù),分析處理
    // TODO: signal > 30 所有數(shù)據(jù),按照設(shè)備類型 分組,統(tǒng)計(jì)數(shù)量、平均信號(hào)強(qiáng)度
    // 4.1 注冊(cè)DataFrame為臨時(shí)視圖
    etlStreamDF.createOrReplaceTempView("view_tmp_stream_iots")
    // 4.2 編寫SQL執(zhí)行查詢
    val resultStreamDF: DataFrame = spark.sql(
      """
        |SELECT
        | device_type, COUNT(device_type) AS count_device, ROUND(AVG(signal), 2) AS avg_signal
        |FROM view_tmp_stream_iots
        |WHERE signal > 30 GROUP BY device_type
        |""".stripMargin)
    // 5. 啟動(dòng)流式應(yīng)用,結(jié)果輸出控制臺(tái)
    val query: StreamingQuery = resultStreamDF.writeStream
      .outputMode(OutputMode.Complete())
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        println("===========================================")
        println(s"BatchId = ${batchId}")
        println("===========================================")
        if (!batchDF.isEmpty) batchDF.coalesce(1).show(20, truncate = false)
      }
      .start()
    query.awaitTermination()
    query.stop()
  }
}

運(yùn)行流式應(yīng)用,結(jié)果如下圖所示:

大數(shù)據(jù)Spark物聯(lián)網(wǎng)設(shè)備數(shù)據(jù)分析_kafka_02

4 時(shí)間概念

在SparkStreaming中窗口統(tǒng)計(jì)分析:Window Operation(設(shè)置窗口大小WindowInterval和滑動(dòng)大小SlideInterval),按照Streaming 流式應(yīng)用接收數(shù)據(jù)的時(shí)間進(jìn)行窗口設(shè)計(jì)的,其實(shí)是不符合實(shí)際應(yīng)用場(chǎng)景的。

例如,在物聯(lián)網(wǎng)數(shù)據(jù)平臺(tái)中,每個(gè)設(shè)備產(chǎn)生的數(shù)據(jù),其中包含數(shù)據(jù)產(chǎn)生的時(shí)間,然而數(shù)據(jù)需要

經(jīng)過一系列采集傳輸才能被流式計(jì)算框架處理:SparkStreaming,此過程需要時(shí)間的,再按照處理

時(shí)間來統(tǒng)計(jì)業(yè)務(wù)的時(shí)候,準(zhǔn)確性降低,存在不合理性。

在結(jié)構(gòu)化流Structured Streaming中窗口數(shù)據(jù)統(tǒng)計(jì)時(shí)間是基于數(shù)據(jù)本身事件時(shí)間EventTime字

段統(tǒng)計(jì),更加合理性,官方文檔:

http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#window-operations-on-event-time

在Streaming流式數(shù)據(jù)處理中,按照時(shí)間處理數(shù)據(jù),其中時(shí)間有三種概念:

  •  
  • 1)、事件時(shí)間EventTime,表示數(shù)據(jù)本身產(chǎn)生的時(shí)間,該字段在數(shù)據(jù)本身中;
  • 2)、注入時(shí)間IngestionTime,表示數(shù)據(jù)到達(dá)流式系統(tǒng)時(shí)間,簡而言之就是流式處理系統(tǒng)接收到
    數(shù)據(jù)的時(shí)間;
  • 3)、處理時(shí)間ProcessingTime,表示數(shù)據(jù)被流式系統(tǒng)真正開始計(jì)算操作的時(shí)間。大數(shù)據(jù)Spark物聯(lián)網(wǎng)設(shè)備數(shù)據(jù)分析_kafka_03不同流式計(jì)算框架支持時(shí)間不一樣,SparkStreaming框架僅僅支持處理時(shí)間ProcessTime,StructuredStreaming支持事件時(shí)間和處理時(shí)間,F(xiàn)link框架支持三種時(shí)間數(shù)據(jù)操作,實(shí)際項(xiàng)目中往往針對(duì)【事件時(shí)間EventTime】進(jìn)行數(shù)據(jù)處理操作,更加合理化。

5 event-time 窗口分析

基于事件時(shí)間窗口聚合操作:基于窗口的聚合(例如每分鐘事件數(shù))只是事件時(shí)間列上特殊類型的分組和聚合,其中每個(gè)時(shí)間窗口都是一個(gè)組,并且每一行可以屬于多個(gè)窗口/組。事件時(shí)間EventTime是嵌入到數(shù)據(jù)本身中的時(shí)間,數(shù)據(jù)實(shí)際真實(shí)產(chǎn)生的時(shí)間。例如,如果希望獲得每分鐘由物聯(lián)網(wǎng)設(shè)備生成的事件數(shù),那么可能希望使用生成數(shù)據(jù)的時(shí)間(即數(shù)據(jù)中的事件時(shí)間event time),而不是Spark接收數(shù)據(jù)的時(shí)間(receive time/archive time)。

這個(gè)事件時(shí)間很自然地用這個(gè)模型表示,設(shè)備中的每個(gè)事件(Event)都是表中的一行(Row),而事件時(shí)間(Event Time)是行中的一列值(Column Value)。 

因此,這種基于事件時(shí)間窗口的聚合查詢既可以在靜態(tài)數(shù)據(jù)集(例如,從收集的設(shè)備事件日志中)上定義,也可以在數(shù)據(jù)流上定義,從而使用戶的使用更加容易。修改詞頻統(tǒng)計(jì)程序,數(shù)據(jù)流包含每行數(shù)據(jù)以及生成每行行的時(shí)間。希望在10分鐘的窗口內(nèi)對(duì)單詞進(jìn)行計(jì)數(shù),每5分鐘更新一次,如下圖所示:大數(shù)據(jù)Spark物聯(lián)網(wǎng)設(shè)備數(shù)據(jù)分析_kafka_04

單詞在10分鐘窗口【12:00-12:10、12:05-12:15、12:10-12:20】等之間接收的單詞中計(jì)數(shù)。注意,

【12:00-12:10】表示處理數(shù)據(jù)的事件時(shí)間為12:00之后但12:10之前的數(shù)據(jù)。思考一下,12:07的一條數(shù)據(jù),應(yīng)該增加對(duì)應(yīng)于兩個(gè)窗口12:00-12:10和12:05-12:15的計(jì)數(shù)?;谑录r(shí)間窗口統(tǒng)計(jì)有兩個(gè)參數(shù)索引:分組鍵(如單詞)和窗口(事件時(shí)間字段)。大數(shù)據(jù)Spark物聯(lián)網(wǎng)設(shè)備數(shù)據(jù)分析_數(shù)據(jù)_05

為了演示案例,將上述案例中的每5分鐘統(tǒng)計(jì)最近10分鐘窗口改為每5秒統(tǒng)計(jì)最近10秒窗口數(shù)

據(jù),測(cè)試數(shù)據(jù)集:

2019-10-12 09:00:02,cat dog
2019-10-12 09:00:03,dog dog
2019-10-12 09:00:07,owl cat
2019-10-12 09:00:11,dog
2019-10-12 09:00:13,owl

案例中三個(gè)時(shí)間范圍,說明如下:

1、觸發(fā)時(shí)間間隔,trigger interval:5秒 (案例:5分鐘)
2、事件時(shí)間窗口大小,window interval:10秒(案例:10分鐘)
3、滑動(dòng)大小,slider interval:5秒(案例:5分鐘)

官方案例演示代碼如下:

import java.sql.Timestamp
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 基于Structured Streaming 模塊讀取TCP Socket讀取數(shù)據(jù),進(jìn)行事件時(shí)間窗口統(tǒng)計(jì)詞頻WordCount,將結(jié)果打印到控制臺(tái)
 * TODO:每5秒鐘統(tǒng)計(jì)最近10秒內(nèi)的數(shù)據(jù)(詞頻:WordCount)
 *
 * EventTime即事件真正生成的時(shí)間:
 * 例如一個(gè)用戶在10:06點(diǎn)擊 了一個(gè)按鈕,記錄在系統(tǒng)中為10:06
 * 這條數(shù)據(jù)發(fā)送到Kafka,又到了Spark Streaming中處理,已經(jīng)是10:08,這個(gè)處理的時(shí)間就是process Time。
 *
 * 測(cè)試數(shù)據(jù):
 * 2019-10-12 09:00:02,cat dog
 * 2019-10-12 09:00:03,dog dog
 * 2019-10-12 09:00:07,owl cat
 * 2019-10-12 09:00:11,dog
 * 2019-10-12 09:00:13,owl
 */
object StructuredWindow {
  def main(args: Array[String]): Unit = {
    // 1. 構(gòu)建SparkSession實(shí)例對(duì)象,傳遞sparkConf參數(shù)
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[2]")
      .config("spark.sql.shuffle.partitions", "2")
      .getOrCreate()
    import org.apache.spark.sql.functions._
    import spark.implicits._
    // 2. 使用SparkSession從TCP Socket讀取流式數(shù)據(jù)
    val inputStreamDF: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node1.oldlu.cn")
      .option("port", 9999)
      .load()
    // 3. 針對(duì)獲取流式DStream進(jìn)行詞頻統(tǒng)計(jì)
    val resultStreamDF = inputStreamDF
      // 將DataFrame轉(zhuǎn)換為Dataset操作,Dataset是類型安全,強(qiáng)類型
      .as[String]
      .filter(line => null != line && line.trim.length > 0)
      // 將每行數(shù)據(jù)進(jìn)行分割單詞: 2019-10-12 09:00:02,cat dog
      .flatMap { line =>
        val arr = line.trim.split(",")
        arr(1).split("\s+").map(word => (Timestamp.valueOf(arr(0)), word))
      }
      // 設(shè)置列的名稱
      .toDF("insert_timestamp", "word")
      // TODO:設(shè)置基于事件時(shí)間(event time)窗口 -> insert_timestamp, 每5秒統(tǒng)計(jì)最近10秒內(nèi)數(shù)據(jù)
      /*
      1. 先按照窗口分組、2. 再對(duì)窗口中按照單詞分組、 3. 最后使用聚合函數(shù)聚合
      */
      .groupBy(
        window($"insert_timestamp", "10 seconds", "5 seconds"), $"word"
      ).count()
      .orderBy($"window") // 按照窗口字段降序排序
    /*
    root
    |-- window: struct (nullable = true)
    | |-- start: timestamp (nullable = true)
    | |-- end: timestamp (nullable = true)
    |-- word: string (nullable = true)
    |-- count: long (nullable = false)
    */
    //resultStreamDF.printSchema()
    // 4. 將計(jì)算的結(jié)果輸出,打印到控制臺(tái)
    val query: StreamingQuery = resultStreamDF.writeStream
      .outputMode(OutputMode.Complete())
      .format("console")
      .option("numRows", "100")
      .option("truncate", "false")
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .start()
    query.awaitTermination()
    query.stop()
  }
}

運(yùn)行上述基于事件時(shí)間Event Time窗口統(tǒng)計(jì)流式應(yīng)用,演示效果圖如下所示:大數(shù)據(jù)Spark物聯(lián)網(wǎng)設(shè)備數(shù)據(jù)分析_kafka_06

6 event-time 窗口生成

Structured Streaming中如何依據(jù)EventTime事件時(shí)間生成窗口的呢?查看類TimeWindowing源碼中生成窗口規(guī)則:

org.apache.spark.sql.catalyst.analysis.TimeWindowing
// 窗口個(gè)數(shù)
/* 最大的窗口數(shù) = 向上取整(窗口長度/滑動(dòng)步長) */
maxNumOverlapping <- ceil(windowDuration / slideDuration)
for (i <- 0 until maxNumOverlapping)
/**
timestamp是event-time 傳進(jìn)的時(shí)間戳
startTime是window窗口參數(shù),默認(rèn)是0 second 從時(shí)間的0s
含義:event-time從1970年...有多少個(gè)滑動(dòng)步長,如果說浮點(diǎn)數(shù)會(huì)向上取整
*/
windowId <- ceil((timestamp - startTime) / slideDuration)
/**
windowId * slideDuration 向上取能整除滑動(dòng)步長的時(shí)間
(i - maxNumOverlapping) * slideDuration 每一個(gè)窗口開始時(shí)間相差一個(gè)步長
*/
windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTime
windowEnd <- windowStart + windowDuration
return windowStart, windowEnd

將【(event-time向上取 能整除 滑動(dòng)步長的時(shí)間) - (最大窗口數(shù)×滑動(dòng)步長)】作為"初始窗口"

的開始時(shí)間,然后按照窗口滑動(dòng)寬度逐漸向時(shí)間軸前方推進(jìn),直到某個(gè)窗口不再包含該event-time

為止,最終以"初始窗口"與"結(jié)束窗口"之間的若干個(gè)窗口作為最終生成的 event-time 的時(shí)間窗口。大數(shù)據(jù)Spark物聯(lián)網(wǎng)設(shè)備數(shù)據(jù)分析_kafka_07

每個(gè)窗口的起始時(shí)間start與結(jié)束時(shí)間end都是前閉后開(左閉右開)的區(qū)間,因此初始窗口和結(jié)束窗口都不會(huì)包含 event-time,最終不會(huì)被使用。假設(shè)數(shù)據(jù)為【2019-08-14 10:50:00, dog】,按照上述規(guī)則計(jì)算窗口示意圖如下:大數(shù)據(jù)Spark物聯(lián)網(wǎng)設(shè)備數(shù)據(jù)分析_數(shù)據(jù)_08

得到窗口如下:大數(shù)據(jù)Spark物聯(lián)網(wǎng)設(shè)備數(shù)據(jù)分析_sql_09

7 延遲數(shù)據(jù)處理

Structed Streaming與Spark Streaming相比一大特性就是支持基于數(shù)據(jù)中的時(shí)間戳的數(shù)據(jù)處理。也就是在處理數(shù)據(jù)時(shí),可以對(duì)記錄中的eventTime事件時(shí)間字段進(jìn)行考慮。因?yàn)閑ventTime更好的代表數(shù)據(jù)本身的信息,且可以借助eventTime處理比預(yù)期晚到達(dá)的數(shù)據(jù),但是需要有一個(gè)限度(閾值),不能一直等,應(yīng)該要設(shè)定最多等多久。

7.1 延遲數(shù)據(jù)

在很多流計(jì)算系統(tǒng)中,數(shù)據(jù)延遲到達(dá)(the events arrives late to the application)的情況很常見,并且很多時(shí)候是不可控的,因?yàn)楹芏鄷r(shí)候是外圍系統(tǒng)自身問題造成的。Structured Streaming可以保證一條舊的數(shù)據(jù)進(jìn)入到流上時(shí),依然可以基于這些“遲到”的數(shù)據(jù)重新計(jì)算并更新計(jì)算結(jié)果。大數(shù)據(jù)Spark物聯(lián)網(wǎng)設(shè)備數(shù)據(jù)分析_sql_10

上圖中在12:04(即事件時(shí)間)生成的單詞可能在12:11被應(yīng)用程序接收,此時(shí),應(yīng)用程序應(yīng)使用時(shí)間12:04而不是12:11更新窗口12:00-12:10的舊計(jì)數(shù)。但是會(huì)出現(xiàn)如下兩個(gè)問題:

  • 問題一:延遲數(shù)據(jù)計(jì)算是否有價(jià)值
  1. 如果某些數(shù)據(jù),延遲很長時(shí)間(如30分鐘)才到達(dá)流式處理系統(tǒng),數(shù)據(jù)還需要再次計(jì)算嗎?
    計(jì)算的結(jié)果還有價(jià)值嗎?原因在于流式處理系統(tǒng)處理數(shù)據(jù)關(guān)鍵核心在于實(shí)時(shí)性;
  2. 實(shí)踐表明,流計(jì)算關(guān)注的是近期數(shù)據(jù),更新一個(gè)很早之前的狀態(tài)往往已經(jīng)不再具有很大的業(yè)務(wù)價(jià)值;
  • 問題二:以前狀態(tài)保存浪費(fèi)資源
  1. 實(shí)時(shí)統(tǒng)計(jì)來說,如果保存很久以前的數(shù)據(jù)狀態(tài),很多時(shí)候沒有作用的,反而浪費(fèi)大量資源;
    Spark 2.1引入的watermarking允許用戶指定延遲數(shù)據(jù)的閾值,也允許引擎清除掉舊的狀態(tài)。即根據(jù)watermark機(jī)制來設(shè)置和判斷消息的有效性,如可以獲取消息本身的時(shí)間戳,然后根據(jù)該時(shí)間戳來判斷消息的到達(dá)是否延遲(亂序)以及延遲的時(shí)間是否在容忍的范圍內(nèi)(延遲的數(shù)據(jù)是否處理)。

7.2 Watermarking 水位

水位watermarking官方定義:

lets the engine automatically track the current event time in the data and attempt to clean up old state accordingly.

翻譯:讓Spark SQL引擎自動(dòng)追蹤數(shù)據(jù)中當(dāng)前事件時(shí)間EventTime,依據(jù)規(guī)則清除舊的狀態(tài)數(shù)據(jù)。通過指定event-time列(上一批次數(shù)據(jù)中EventTime最大值)和預(yù)估事件的延遲時(shí)間上限(Threshold)來定義一個(gè)查詢的水位線watermark。

Watermark = MaxEventTime - Threshod
  •  
  • 第一點(diǎn):執(zhí)行第一批次數(shù)據(jù)時(shí),Watermarker為0,所以此批次中所有數(shù)據(jù)都參與計(jì)算;
  • 第二點(diǎn):Watermarker值只能逐漸增加,不能減少;
  • 第三點(diǎn):Watermark機(jī)制主要解決處理聚合延遲數(shù)據(jù)和減少內(nèi)存中維護(hù)的聚合狀態(tài);
  • 第四點(diǎn):設(shè)置Watermark以后,輸出模式OutputMode只能是Append和Update;
    如下方式設(shè)置閾值Threshold,計(jì)算每批次數(shù)據(jù)執(zhí)行時(shí)的水位Watermark:大數(shù)據(jù)Spark物聯(lián)網(wǎng)設(shè)備數(shù)據(jù)分析_sql_11

看一下官方案例:詞頻統(tǒng)計(jì)WordCount,設(shè)置閾值Threshold為10分鐘,每5分鐘觸發(fā)執(zhí)行一次。大數(shù)據(jù)Spark物聯(lián)網(wǎng)設(shè)備數(shù)據(jù)分析_spark_12

  •  
  • 延遲到達(dá)但沒有超過watermark:(12:08, dog)
    在12:20觸發(fā)執(zhí)行窗口(12:10-12:20)數(shù)據(jù)中,(12:08, dog) 數(shù)據(jù)是延遲數(shù)據(jù),閾值Threshold設(shè)定為10分鐘,此時(shí)水位線【W(wǎng)atermark = 12:14 - 10m = 12:04】,因?yàn)?2:14是上個(gè)窗口(12:05-12:15)中接收到的最大的事件時(shí)間,代表目標(biāo)系統(tǒng)最后時(shí)刻的狀態(tài),由于12:08在12:04之后,因此被視為“雖然遲到但尚且可以接收”的數(shù)據(jù)而被更新到了結(jié)果表中,也就是(12:00 - 12:10, dog, 2)和(12:05 - 12:11,dog, 3)。大數(shù)據(jù)Spark物聯(lián)網(wǎng)設(shè)備數(shù)據(jù)分析_kafka_13
  • 超出watermark:(12:04, donkey)
    在12:25觸發(fā)執(zhí)行窗口(12:15-12:25)數(shù)據(jù)中,(12:04, donkey)數(shù)據(jù)是延遲數(shù)據(jù),上個(gè)窗口中接收到最大的事件時(shí)間為12:21,此時(shí)水位線【W(wǎng)atermark = 12:21 - 10m = 12:11】,而(12:04, donkey)比這個(gè)值還要早,說明它”太舊了”,所以不會(huì)被更新到結(jié)果表中了。大數(shù)據(jù)Spark物聯(lián)網(wǎng)設(shè)備數(shù)據(jù)分析_spark_14
    設(shè)置水位線Watermark以后,不同輸出模式OutputMode,結(jié)果輸出不一樣:
  • Update模式:總是傾向于“盡可能早”的將處理結(jié)果更新到sink,當(dāng)出現(xiàn)遲到數(shù)據(jù)時(shí),早期的某個(gè)計(jì)算結(jié)果將會(huì)被更新;
  • Append模式:推遲計(jì)算結(jié)果的輸出到一個(gè)相對(duì)較晚的時(shí)刻,確保結(jié)果是穩(wěn)定的,不會(huì)再被更新,
    比如:12:00 - 12:10窗口的處理結(jié)果會(huì)等到watermark更新到12:11之后才會(huì)寫入到sink。如果用于接收處理結(jié)果的sink不支持更新操作,則只能選擇Append模式。

7.3 官方案例演示

編寫代碼,演示官方案例,如下幾點(diǎn)注意:

1、該outputMode為update模式,即只會(huì)輸出那些有更新的數(shù)據(jù)?。?
2、該開窗窗口長度為10 min,步長5 min,水印為eventtime-10 min,(需理解開窗規(guī)則)
3、官網(wǎng)案例trigger(Trigger.ProcessingTime("5 minutes")),但是測(cè)試的時(shí)候不建議使用這個(gè)
4、未輸出數(shù)據(jù)不代表已經(jīng)在內(nèi)存中被剔除,只是由于update模式的原因
5、建議比對(duì)append理解水印

測(cè)試數(shù)據(jù):

dog,2019-10-10 12:00:07
owl,2019-10-10 12:00:08
dog,2019-10-10 12:00:14
cat,2019-10-10 12:00:09
cat,2019-10-10 12:00:15
dog,2019-10-10 12:00:08
owl,2019-10-10 12:00:13
owl,2019-10-10 12:00:21
owl,2019-10-10 12:00:17

具體案例代碼如下:

import java.sql.Timestamp
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
 * 基于Structured Streaming 讀取TCP Socket讀取數(shù)據(jù),事件時(shí)間窗口統(tǒng)計(jì)詞頻,將結(jié)果打印到控制臺(tái)
 * TODO:每5秒鐘統(tǒng)計(jì)最近10秒內(nèi)的數(shù)據(jù)(詞頻:WordCount),設(shè)置水位Watermark時(shí)間為10秒
 * dog,2019-10-10 12:00:07
 * owl,2019-10-10 12:00:08
 * dog,2019-10-10 12:00:14
 * cat,2019-10-10 12:00:09
 * cat,2019-10-10 12:00:15
 * dog,2019-10-10 12:00:08
 * owl,2019-10-10 12:00:13
 * owl,2019-10-10 12:00:21
 * owl,2019-10-10 12:00:17
 */
object StructuredWatermarkUpdate {
  def main(args: Array[String]): Unit = {
    // 1. 構(gòu)建SparkSession實(shí)例對(duì)象,傳遞sparkConf參數(shù)
    val spark: SparkSession = SparkSession.builder()
      .appName(this.getClass.getSimpleName.stripSuffix("$"))
      .master("local[2]")
      .config("spark.sql.shuffle.partitions", "2")
      .getOrCreate()
    // b. 導(dǎo)入隱式轉(zhuǎn)換及函數(shù)庫
    import org.apache.spark.sql.functions._
    import spark.implicits._
    // 2. 使用SparkSession從TCP Socket讀取流式數(shù)據(jù)
    val inputStreamDF: DataFrame = spark.readStream
      .format("socket")
      .option("host", "node1.oldlu.cn")
      .option("port", 9999)
      .load()
    // 3. 針對(duì)獲取流式DStream設(shè)置EventTime窗口及Watermark水位限制
    val resultStreamDF = inputStreamDF
      // 將DataFrame轉(zhuǎn)換為Dataset操作,Dataset是類型安全,強(qiáng)類型
      .as[String]
      // 過濾無效數(shù)據(jù)
      .filter(line => null != line && line.trim.length > 0)
      // 將每行數(shù)據(jù)進(jìn)行分割單詞: 2019-10-12 09:00:02,cat dog
      .map { line =>
        val arr = line.trim.split(",")
        (arr(0), Timestamp.valueOf(arr(1)))
      }
      // 設(shè)置列的名稱
      .toDF("word", "time")
      // TODO:設(shè)置水位Watermark
      .withWatermark("time", "10 seconds")
      // TODO:設(shè)置基于事件時(shí)間(event time)窗口 -> time, 每5秒統(tǒng)計(jì)最近10秒內(nèi)數(shù)據(jù)
      .groupBy(
        window($"time", "10 seconds", "5 seconds"),
        $"word"
      ).count()
    // 4. 將計(jì)算的結(jié)果輸出,打印到控制臺(tái)
    val query: StreamingQuery = resultStreamDF.writeStream
      .outputMode(OutputMode.Update())
      .format("console")
      .option("numRows", "100")
      .option("truncate", "false")
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .start() // 流式DataFrame,需要啟動(dòng)
    // 查詢器一直等待流式應(yīng)用結(jié)束
    query.awaitTermination()
    query.stop()
  }
}