一、 為什么需要Window
在流處理應用中,數(shù)據(jù)是連續(xù)不斷的,有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內(nèi)有多少用戶點擊了我們的網(wǎng)頁。
在這種情況下,我們必須定義一個窗口(window),用來收集最近1分鐘內(nèi)的數(shù)據(jù),并對這個窗口內(nèi)的數(shù)據(jù)進行計算
二、Window的分類
2.1 按照time和count分類
time-window:時間窗口:根據(jù)時間劃分窗口,如:每xx分鐘統(tǒng)計最近xx分鐘的數(shù)據(jù)
count-window:數(shù)量窗口:根據(jù)數(shù)量劃分窗口,如:每xx個數(shù)據(jù)統(tǒng)計最近xx個數(shù)據(jù)
2.2 按照slide和size分類
窗口有兩個重要的屬性: 窗口大小size和滑動間隔slide,根據(jù)它們的大小關系可分為:
tumbling-window:滾動窗口:size=slide,如:每隔10s統(tǒng)計最近10s的數(shù)據(jù)
sliding-window:滑動窗口:size>slide,如:每隔5s統(tǒng)計最近10s數(shù)據(jù)
注意:當size
小結
按照上面窗口的分類方式進行組合,可以得出如下的窗口:
-
基于時間的滾動窗口tumbling-time-window--用的較多
-
基于時間的滑動窗口sliding-time-window--用的較多
-
基于數(shù)量的滾動窗口tumbling-count-window--用的較少
-
基于數(shù)量的滑動窗口sliding-count-window--用的較少
注意:Flink還支持一個特殊的窗口:Session會話窗口,需要設置一個會話超時時間,如30s,則表示30s內(nèi)沒有數(shù)據(jù)到來,則觸發(fā)上個窗口的計算
三、WindowAPI
3.1 window和windowAll
使用keyby的流,應該使用window方法
未使用keyby的流,應該調(diào)用windowAll方法
區(qū)別:
Window算子:是可以設置并行度的
WindowAll 算子:并行度始終為1
3.2 WindowAssigner
Windows Assigner的作用是指定窗口的類型,定義如何將數(shù)據(jù)流分配到一個或者多個窗口,API中通過window (WindowsAssigner assigner)指定。在Flink中支持兩種類型的窗口,一種是基于時間的窗口(TimeWindow),另一種是基于數(shù)量的窗口(countWindow)。窗口所表現(xiàn)出的類型特性取決于window assigner的定義。
Flink底層Window模型僅有TimeWindow以及GlobalWindow。
Flink提供了很多各種場景用的WindowAssigner:
如果需要自定制數(shù)據(jù)分發(fā)策略,則可以實現(xiàn)一個 class,繼承自 WindowAssigner。
3.3 evictor
evictor 主要用于做一些數(shù)據(jù)的自定義操作,可以在執(zhí)行用戶代碼之前,也可以在執(zhí)行
用戶代碼之后,更詳細的描述可以參考org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter兩個方法。
Flink 提供了如下三種通用的 evictor:
CountEvictor 保留指定數(shù)量的元素
TimeEvictor 設定一個閾值 interval,刪除所有不再 max_ts - interval 范圍內(nèi)的元
素,其中 max_ts 是窗口內(nèi)時間戳的最大值。
DeltaEvictor 通過執(zhí)行用戶給定的 DeltaFunction 以及預設的 theshold,判斷是否刪 除一個元素。
3.4 trigger
trigger 用來判斷一個窗口是否需要被觸發(fā),每個 WindowAssigner 都自帶一個默認的trigger,
如果默認的 trigger 不能滿足你的需求,則可以自定義一個類,繼承自Trigger 即可,我們詳細描述下 Trigger 的接口以及含義:
onEventTime() 當 event-time timer 被觸發(fā)的時候會調(diào)用
onElement() 每次往 window 增加一個元素的時候都會觸發(fā)
onMerge() 對兩個 `rigger 的 state 進行 merge 操作
clear() window 銷毀的時候被調(diào)用
上面的接口中前三個會返回一個 TriggerResult, TriggerResult 有如下幾種可能的選 擇:
-
CONTINUE 不做任何事情
-
FIRE 觸發(fā) window
-
PURGE 清空整個 window 的元素并銷毀窗口
-
PURGE 清空整個 window 的元素并銷毀窗口
四、WindowAPI調(diào)用案例示例
4.1 基于時間的滾動和滑動窗口
測試數(shù)據(jù)
-
信號燈編號和通過該信號燈的車的數(shù)量
-
9,3
-
9,2
-
9,7
-
4,9
-
2,6
-
1,5
-
2,3
-
5,7
-
5,4
需求1:每5秒鐘統(tǒng)計一次,最近5秒鐘內(nèi),各個路口通過紅綠燈汽車的數(shù)量--基于時間的滾動窗口
需求2:每5秒鐘統(tǒng)計一次,最近10秒鐘內(nèi),各個路口通過紅綠燈汽車的數(shù)量--基于時間的滑動窗口
-
packagecom.flink.source
-
-
importorg.apache.flink.api.common.functions.MapFunction
-
importorg.apache.flink.streaming.api.scala._
-
importorg.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows,TumblingProcessingTimeWindows}
-
importorg.apache.flink.streaming.api.windowing.time.Time;
-
-
/**
-
*@Packagecom.flink.source
-
*@File:WindowDemo_TimeWindow.java
-
*@author大數(shù)據(jù)老哥
-
*@date2021/10/2610:50
-
*@versionV1.0
-
*/
-
objectWindowDemo_TimeWindow{
-
defmain(args:Array[String]):Unit={
-
valenv=StreamExecutionEnvironment.getExecutionEnvironment
-
-
valsocketData=env.socketTextStream("192.168.100.101",9999)
-
valsocketMap=socketData.map(newMapFunction[String,CartInfo](){
-
overridedefmap(t:String):CartInfo={
-
valarr=t.split(",")
-
CartInfo(arr(0),arr(1).toInt)
-
}
-
})
-
//需求1:每5秒鐘統(tǒng)計一次,最近5秒鐘內(nèi),各個路口通過紅綠燈汽車的數(shù)量
-
valresult=socketMap.keyBy(_.sensorId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("count")
-
//需求2:每5秒鐘統(tǒng)計一次,最近10秒鐘內(nèi),各個路口通過紅綠燈汽車的數(shù)量
-
valresult2=socketMap.keyBy(_.sensorId).window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(10))).sum("count")
-
result.print()
-
result2.print()
-
env.execute("winds")
-
}
-
-
}
-
-
caseclassCartInfo(varsensorId:String,varcount:Int)
4.2 基于數(shù)量的滾動和滑動窗口
測試數(shù)據(jù)
-
信號燈編號和通過該信號燈的車的數(shù)量
-
9,3
-
9,2
-
9,7
-
4,9
-
2,6
-
1,5
-
2,3
-
5,7
-
5,4
需求1:統(tǒng)計在最近5條消息中,各自路口通過的汽車數(shù)量,相同的key每出現(xiàn)5次進行統(tǒng)計--基于數(shù)量的滾動窗口
需求2:統(tǒng)計在最近5條消息中,各自路口通過的汽車數(shù)量,相同的key每出現(xiàn)3次進行統(tǒng)計--基于數(shù)量的滑動窗口
-
packagecom.flink.source
-
-
importorg.apache.flink.api.common.functions.MapFunction
-
importorg.apache.flink.streaming.api.scala._
-
-
/**
-
*@Packagecom.flink.source
-
*@File:WindosDemoo_CountWindos.java
-
*@author大數(shù)據(jù)老哥
-
*@date2021/10/2614:04
-
*@versionV1.0
-
*/
-
objectWindowDemo_CountWindow{
-
defmain(args:Array[String]):Unit={
-
valenv=StreamExecutionEnvironment.getExecutionEnvironment
-
valsocketData=env.socketTextStream("192.168.100.101",9999)
-
valsocketMap=socketData.map(newMapFunction[String,CartInfo]{
-
overridedefmap(t:String):CartInfo={
-
valarr=t.split(",")
-
CartInfo(arr(0),arr(1).toInt)
-
}
-
})
-
//需求1:統(tǒng)計在最近5條消息中,各自路口通過的汽車數(shù)量,相同的key每出現(xiàn)5次進行統(tǒng)計
-
valresult=socketMap.keyBy(_.sensorId).countWindow(5L).sum("count")
-
//需求2:統(tǒng)計在最近5條消息中,各自路口通過的汽車數(shù)量,相同的key每出現(xiàn)3次進行統(tǒng)計
-
valresult2=socketMap.keyBy(_.sensorId).countWindow(5L,3L).sum("count")
-
result.print("result")
-
result2.print("result2")
-
env.execute()
-
-
}
-
}
-
caseclassCartInfo(varsensorId:String,varcount:Int)
case class CartInfo(var sensorId: String, var count: Int)
4.3 會話窗口
測試數(shù)據(jù)
-
信號燈編號和通過該信號燈的車的數(shù)量
-
9,3
-
9,2
-
9,7
-
4,9
-
2,6
-
1,5
-
2,3
-
5,7
-
5,4
設置會話超時時間為10s,10s內(nèi)沒有數(shù)據(jù)到來,則觸發(fā)上個窗口的計算
-
packagecom.flink.source
-
-
importorg.apache.flink.api.common.functions.MapFunction
-
importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
-
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-
importorg.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows
-
importorg.apache.flink.streaming.api.windowing.time.Time
-
-
-
/**
-
*@Packagecom.flink.source
-
*@File:WindowDemo_SessionWindow.java
-
*@author大數(shù)據(jù)老哥
-
*@date2021/11/116:10
-
*@versionV1.0
-
*/
-
objectWindowDemo_SessionWindow{
-
defmain(args:Array[String]):Unit={
-
valenv=StreamExecutionEnvironment.getExecutionEnvironment
-
-
valsocketData=env.socketTextStream("192.168.100.101",9999)
-
valsocketMap:SingleOutputStreamOperator[CartInfo]=socketData.map(newMapFunction[String,CartInfo](){
-
overridedefmap(t:String):CartInfo={
-
valarr=t.split(",")
-
CartInfo(arr(0),arr(1).toInt)
-
}
-
})
-
//設置會話超時時間為10s,10s內(nèi)沒有數(shù)據(jù)到來,則觸發(fā)上個窗口的計算
-
valresult=socketMap.keyBy(0)
-
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
-
.sum("count")
-
result.print()
-
env.execute("winds")
-
}
-
}
-
-
-
caseclassCartInfo(varsensorId:String,varcount:Int)
原文鏈接:https://mp.weixin.qq.com/s/GPIcjPQUT1xAKAG6fBmvtg