激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - 編程技術 - 徹底搞清 Flink 中的 Window 機制

徹底搞清 Flink 中的 Window 機制

2021-11-02 23:27大數(shù)據(jù)老哥 編程技術

在流處理應用中,數(shù)據(jù)是連續(xù)不斷的,有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內(nèi)有多少用戶點擊了我們的網(wǎng)頁。

徹底搞清 Flink 中的 Window 機制

一、 為什么需要Window

在流處理應用中,數(shù)據(jù)是連續(xù)不斷的,有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內(nèi)有多少用戶點擊了我們的網(wǎng)頁。

在這種情況下,我們必須定義一個窗口(window),用來收集最近1分鐘內(nèi)的數(shù)據(jù),并對這個窗口內(nèi)的數(shù)據(jù)進行計算

二、Window的分類

2.1 按照time和count分類

time-window:時間窗口:根據(jù)時間劃分窗口,如:每xx分鐘統(tǒng)計最近xx分鐘的數(shù)據(jù)

count-window:數(shù)量窗口:根據(jù)數(shù)量劃分窗口,如:每xx個數(shù)據(jù)統(tǒng)計最近xx個數(shù)據(jù)

徹底搞清 Flink 中的 Window 機制

2.2 按照slide和size分類

窗口有兩個重要的屬性: 窗口大小size和滑動間隔slide,根據(jù)它們的大小關系可分為:

tumbling-window:滾動窗口:size=slide,如:每隔10s統(tǒng)計最近10s的數(shù)據(jù)

徹底搞清 Flink 中的 Window 機制

sliding-window:滑動窗口:size>slide,如:每隔5s統(tǒng)計最近10s數(shù)據(jù)

徹底搞清 Flink 中的 Window 機制

注意:當size

小結

按照上面窗口的分類方式進行組合,可以得出如下的窗口:

  • 基于時間的滾動窗口tumbling-time-window--用的較多
  • 基于時間的滑動窗口sliding-time-window--用的較多
  • 基于數(shù)量的滾動窗口tumbling-count-window--用的較少
  • 基于數(shù)量的滑動窗口sliding-count-window--用的較少

注意:Flink還支持一個特殊的窗口:Session會話窗口,需要設置一個會話超時時間,如30s,則表示30s內(nèi)沒有數(shù)據(jù)到來,則觸發(fā)上個窗口的計算

三、WindowAPI

3.1 window和windowAll

徹底搞清 Flink 中的 Window 機制

使用keyby的流,應該使用window方法

未使用keyby的流,應該調(diào)用windowAll方法

區(qū)別:

Window算子:是可以設置并行度的

WindowAll 算子:并行度始終為1

3.2 WindowAssigner

Windows Assigner的作用是指定窗口的類型,定義如何將數(shù)據(jù)流分配到一個或者多個窗口,API中通過window (WindowsAssigner assigner)指定。在Flink中支持兩種類型的窗口,一種是基于時間的窗口(TimeWindow),另一種是基于數(shù)量的窗口(countWindow)。窗口所表現(xiàn)出的類型特性取決于window assigner的定義。

徹底搞清 Flink 中的 Window 機制

Flink底層Window模型僅有TimeWindow以及GlobalWindow。

徹底搞清 Flink 中的 Window 機制

Flink提供了很多各種場景用的WindowAssigner:

徹底搞清 Flink 中的 Window 機制

如果需要自定制數(shù)據(jù)分發(fā)策略,則可以實現(xiàn)一個 class,繼承自 WindowAssigner。

3.3 evictor

evictor 主要用于做一些數(shù)據(jù)的自定義操作,可以在執(zhí)行用戶代碼之前,也可以在執(zhí)行

用戶代碼之后,更詳細的描述可以參考org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter兩個方法。

Flink 提供了如下三種通用的 evictor:

CountEvictor 保留指定數(shù)量的元素

TimeEvictor 設定一個閾值 interval,刪除所有不再 max_ts - interval 范圍內(nèi)的元

素,其中 max_ts 是窗口內(nèi)時間戳的最大值。

DeltaEvictor 通過執(zhí)行用戶給定的 DeltaFunction 以及預設的 theshold,判斷是否刪 除一個元素。

3.4 trigger

trigger 用來判斷一個窗口是否需要被觸發(fā),每個 WindowAssigner 都自帶一個默認的trigger,

如果默認的 trigger 不能滿足你的需求,則可以自定義一個類,繼承自Trigger 即可,我們詳細描述下 Trigger 的接口以及含義:

onEventTime() 當 event-time timer 被觸發(fā)的時候會調(diào)用

onElement() 每次往 window 增加一個元素的時候都會觸發(fā)

onMerge() 對兩個 `rigger 的 state 進行 merge 操作

clear() window 銷毀的時候被調(diào)用

上面的接口中前三個會返回一個 TriggerResult, TriggerResult 有如下幾種可能的選 擇:

  • CONTINUE 不做任何事情
  • FIRE 觸發(fā) window
  • PURGE 清空整個 window 的元素并銷毀窗口
  • PURGE 清空整個 window 的元素并銷毀窗口

四、WindowAPI調(diào)用案例示例

4.1 基于時間的滾動和滑動窗口

測試數(shù)據(jù)

  1. 信號燈編號和通過該信號燈的車的數(shù)量
  2. 9,3
  3. 9,2
  4. 9,7
  5. 4,9
  6. 2,6
  7. 1,5
  8. 2,3
  9. 5,7
  10. 5,4

需求1:每5秒鐘統(tǒng)計一次,最近5秒鐘內(nèi),各個路口通過紅綠燈汽車的數(shù)量--基于時間的滾動窗口

需求2:每5秒鐘統(tǒng)計一次,最近10秒鐘內(nèi),各個路口通過紅綠燈汽車的數(shù)量--基于時間的滑動窗口

  1. packagecom.flink.source
  2. importorg.apache.flink.api.common.functions.MapFunction
  3. importorg.apache.flink.streaming.api.scala._
  4. importorg.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows,TumblingProcessingTimeWindows}
  5. importorg.apache.flink.streaming.api.windowing.time.Time;
  6. /**
  7. *@Packagecom.flink.source
  8. *@File:WindowDemo_TimeWindow.java
  9. *@author大數(shù)據(jù)老哥
  10. *@date2021/10/2610:50
  11. *@versionV1.0
  12. */
  13. objectWindowDemo_TimeWindow{
  14. defmain(args:Array[String]):Unit={
  15. valenv=StreamExecutionEnvironment.getExecutionEnvironment
  16. valsocketData=env.socketTextStream("192.168.100.101",9999)
  17. valsocketMap=socketData.map(newMapFunction[String,CartInfo](){
  18. overridedefmap(t:String):CartInfo={
  19. valarr=t.split(",")
  20. CartInfo(arr(0),arr(1).toInt)
  21. }
  22. })
  23. //需求1:每5秒鐘統(tǒng)計一次,最近5秒鐘內(nèi),各個路口通過紅綠燈汽車的數(shù)量
  24. valresult=socketMap.keyBy(_.sensorId).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("count")
  25. //需求2:每5秒鐘統(tǒng)計一次,最近10秒鐘內(nèi),各個路口通過紅綠燈汽車的數(shù)量
  26. valresult2=socketMap.keyBy(_.sensorId).window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(10))).sum("count")
  27. result.print()
  28. result2.print()
  29. env.execute("winds")
  30. }
  31. }
  32. caseclassCartInfo(varsensorId:String,varcount:Int)

4.2 基于數(shù)量的滾動和滑動窗口

測試數(shù)據(jù)

  1. 信號燈編號和通過該信號燈的車的數(shù)量
  2. 9,3
  3. 9,2
  4. 9,7
  5. 4,9
  6. 2,6
  7. 1,5
  8. 2,3
  9. 5,7
  10. 5,4

需求1:統(tǒng)計在最近5條消息中,各自路口通過的汽車數(shù)量,相同的key每出現(xiàn)5次進行統(tǒng)計--基于數(shù)量的滾動窗口

需求2:統(tǒng)計在最近5條消息中,各自路口通過的汽車數(shù)量,相同的key每出現(xiàn)3次進行統(tǒng)計--基于數(shù)量的滑動窗口

  1. packagecom.flink.source
  2. importorg.apache.flink.api.common.functions.MapFunction
  3. importorg.apache.flink.streaming.api.scala._
  4. /**
  5. *@Packagecom.flink.source
  6. *@File:WindosDemoo_CountWindos.java
  7. *@author大數(shù)據(jù)老哥
  8. *@date2021/10/2614:04
  9. *@versionV1.0
  10. */
  11. objectWindowDemo_CountWindow{
  12. defmain(args:Array[String]):Unit={
  13. valenv=StreamExecutionEnvironment.getExecutionEnvironment
  14. valsocketData=env.socketTextStream("192.168.100.101",9999)
  15. valsocketMap=socketData.map(newMapFunction[String,CartInfo]{
  16. overridedefmap(t:String):CartInfo={
  17. valarr=t.split(",")
  18. CartInfo(arr(0),arr(1).toInt)
  19. }
  20. })
  21. //需求1:統(tǒng)計在最近5條消息中,各自路口通過的汽車數(shù)量,相同的key每出現(xiàn)5次進行統(tǒng)計
  22. valresult=socketMap.keyBy(_.sensorId).countWindow(5L).sum("count")
  23. //需求2:統(tǒng)計在最近5條消息中,各自路口通過的汽車數(shù)量,相同的key每出現(xiàn)3次進行統(tǒng)計
  24. valresult2=socketMap.keyBy(_.sensorId).countWindow(5L,3L).sum("count")
  25. result.print("result")
  26. result2.print("result2")
  27. env.execute()
  28. }
  29. }
  30. caseclassCartInfo(varsensorId:String,varcount:Int)

case class CartInfo(var sensorId: String, var count: Int)

4.3 會話窗口

測試數(shù)據(jù)

  1. 信號燈編號和通過該信號燈的車的數(shù)量
  2. 9,3
  3. 9,2
  4. 9,7
  5. 4,9
  6. 2,6
  7. 1,5
  8. 2,3
  9. 5,7
  10. 5,4

設置會話超時時間為10s,10s內(nèi)沒有數(shù)據(jù)到來,則觸發(fā)上個窗口的計算

  1. packagecom.flink.source
  2. importorg.apache.flink.api.common.functions.MapFunction
  3. importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
  4. importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment
  5. importorg.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows
  6. importorg.apache.flink.streaming.api.windowing.time.Time
  7. /**
  8. *@Packagecom.flink.source
  9. *@File:WindowDemo_SessionWindow.java
  10. *@author大數(shù)據(jù)老哥
  11. *@date2021/11/116:10
  12. *@versionV1.0
  13. */
  14. objectWindowDemo_SessionWindow{
  15. defmain(args:Array[String]):Unit={
  16. valenv=StreamExecutionEnvironment.getExecutionEnvironment
  17. valsocketData=env.socketTextStream("192.168.100.101",9999)
  18. valsocketMap:SingleOutputStreamOperator[CartInfo]=socketData.map(newMapFunction[String,CartInfo](){
  19. overridedefmap(t:String):CartInfo={
  20. valarr=t.split(",")
  21. CartInfo(arr(0),arr(1).toInt)
  22. }
  23. })
  24. //設置會話超時時間為10s,10s內(nèi)沒有數(shù)據(jù)到來,則觸發(fā)上個窗口的計算
  25. valresult=socketMap.keyBy(0)
  26. .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
  27. .sum("count")
  28. result.print()
  29. env.execute("winds")
  30. }
  31. }
  32. caseclassCartInfo(varsensorId:String,varcount:Int)

原文鏈接:https://mp.weixin.qq.com/s/GPIcjPQUT1xAKAG6fBmvtg

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 毛片在线视频观看 | 欧美性受xxx黑人xyx性爽 | 一区二区三区精品国产 | 亚洲成人福利在线观看 | 国产91片 | 成人福利在线看 | 国内精品久久久久久久久久 | 农村少妇吞精夜夜爽视频 | 黄色淫片 | 久久久一区二区精品 | 操毛片| 黑人一区二区三区四区五区 | 成人偷拍片视频在线观看 | 久久久人人爽 | 狠狠干五月天 | 亚洲福利视频52 | 激情小说激情电影 | 欧美一级小视频 | 国产免费永久在线观看 | 黑人一区 | 在线成人免费网站 | 性欧美大战久久久久久久免费观看 | 天天操天天插天天干 | 逼特逼视频在线观看 | 91成人在线免费观看 | 九九视频精品在线观看 | 黄色免费影片 | 欧美视屏一区二区 | 国产品久久 | 国产美女视频一区二区三区 | 欧美成人午夜影院 | 性aaa| 免费视频www在线观看 | 91,视频免费看 | 午夜a狂野欧美一区二区 | 中文字幕一区在线观看视频 | 久久国产精品久久精品国产演员表 | 亚洲精品日韩色噜噜久久五月 | a视频网站| 91在线播放国产 | 日韩大片在线永久观看视频网站免费 |