区块链技术博客
www.b2bchain.cn

彻底搞清Flink中的Window求职学习资料

本文介绍了彻底搞清Flink中的Window求职学习资料,有助于帮助完成毕业设计以及求职,是一篇很好的资料。

对技术面试,学习经验等有一些体会,在此分享。

  • 窗口
    • 窗口的组成
      • 窗口分配器
      • State
      • 窗口函数
        • 增量聚合函数(窗口只维护状态)
        • 全量聚合函数(窗口维护窗口内的数据)
      • 触发器
        • 触发器分类
          • CountTrigger
          • ProcessingTimeTrigger
          • EventTimeTrigger
          • PurgingTrigger
          • DeltaTrigger
            • DeltaTrigger 的应用
        • 触发器原型
        • 说明
    • 窗口的分类
      • 被Keys化Windows
      • 非被Keys化Windows
        • 区别
      • Time-Based window(基于时间的窗口)
        • Tumbling windows(滚动窗口)
        • Sliding windows(滑动窗口)

彻底搞清Flink中的Window

flink-window

关注公众号:Java大数据与数据仓库,回复“资料”,领取资料,学习大数据技术。

窗口

在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。

Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。

  • 一个Window代表有限对象的集合。一个窗口有一个最大的时间戳,该时间戳意味着在其代表的某时间点——所有应该进入这个窗口的元素都已经到达
  • Window就是用来对一个无限的流设置一个有限的集合,在有界的数据集上进行操作的一种机制。window又可以分为基于时间(Time-based)的window以及基于数量(Count-based)的window。
  • Flink DataStream API提供了Time和Count的window,同时增加了基于Session的window。同时,由于某些特殊的需要,DataStream API也提供了定制化的window操作,供用户自定义window。

窗口的组成

窗口分配器

  • assignWindows将某个带有时间戳timestamp的元素element分配给一个或多个窗口,并返回窗口集合

  • getDefaultTrigger 返回跟WindowAssigner关联的默认触发器

  • getWindowSerializer返回WindowAssigner分配的窗口的序列化器

    • 窗口分配器定义如何将数据元分配给窗口。这是通过WindowAssigner 在window(…)(对于被Keys化流)或windowAll()(对于非被Keys化流)调用中指定您的选择来完成的。
  • WindowAssigner负责将每个传入数据元分配给一个或多个窗口。Flink带有预定义的窗口分配器,用于最常见的用例
    即翻滚窗口, 滑动窗口,会话窗口和全局窗口。

  • 您还可以通过扩展WindowAssigner类来实现自定义窗口分配器。

  • 所有内置窗口分配器(全局窗口除外)都根据时间为窗口分配数据元,这可以是处理时间或事件时间。

State

  • 状态,用来存储窗口内的元素,如果有 AggregateFunction,则存储的是增量聚合的中间结果。

窗口函数

选择合适的计算函数,减少开发代码量提高系统性能

增量聚合函数(窗口只维护状态)

  • ReduceFunction
  • AggregateFunction
  • FoldFunction

全量聚合函数(窗口维护窗口内的数据)

  • ProcessWindowFunction
    • 全量计算
    • 支持功能更加灵活
    • 支持状态操作

触发器

彻底搞清Flink中的Window

image-20210202200655485

  • EventTimeTrigger基于事件时间的触发器,对应onEventTime

  • ProcessingTimeTrigger
    基于当前系统时间的触发器,对应onProcessingTime
    ProcessingTime 有最好的性能和最低的延迟。但在分布式计算环境中ProcessingTime具有不确定性,相同数据流多次运行有可能产生不同的计算结果。

  • ContinuousEventTimeTrigger

  • ContinuousProcessingTimeTrigger

  • CountTrigger

    • Trigger确定何时窗口函数准备好处理窗口(由窗口分配器形成)。每个都有默认值。
      如果默认触发器不符合您的需要,您可以使用指定自定义触发器。WindowAssignerTriggertrigger(…)
    • 触发器界面有五种方法可以Trigger对不同的事件做出反应:
      • onElement()为添加到窗口的每个数据元调用该方法。
      • onEventTime()在注册的事件时间计时器触发时调用该方法。
      • onProcessingTime()在注册的处理时间计时器触发时调用该方法。
      • 该onMerge()方法与状态触发器相关,并且当它们的相应窗口合并时合并两个触发器的状态,例如当使用会话窗口时。
      • 最后,该clear()方法在移除相应窗口时执行所需的任何动作。
    • 默认触发器
      • 默认触发器GlobalWindow是NeverTrigger从不触发的。因此,在使用时必须定义自定义触发器GlobalWindow。
      • 通过使用trigger()您指定触发器会覆盖a的默认触发器WindowAssigner。例如,如果指定a CountTrigger,TumblingEventTimeWindows则不再根据时间进度获取窗口,
        而是仅按计数。现在,如果你想根据时间和数量做出反应,你必须编写自己的自定义触发器。
      • event-time窗口分配器都有一个EventTimeTrigger作为默认触发器。该触发器在watermark通过窗口末尾时出发。

触发器分类

CountTrigger

一旦窗口中的数据元数量超过给定限制,就会触发。所以其触发机制实现在onElement中

ProcessingTimeTrigger

基于处理时间的触发。

EventTimeTrigger

根据 watermarks 度量的事件时间进度进行触发。

PurgingTrigger
  • 另一个触发器作为参数作为参数并将其转换为清除触发器。

  • 其作用是在 Trigger 触发窗口计算之后将窗口的 State 中的数据清除。

  • 彻底搞清Flink中的Window

    image-20210202200710573

    前两条数据先后于20:01和20:02进入窗口,此时 State 中的值更新为3,同时到了Trigger的触发时间,输出结果为3。

    彻底搞清Flink中的Window

    image-20210202200733128

  • 由于 PurgingTrigger 的作用,State 中的数据会被清除。

彻底搞清Flink中的Window

image-20210202200744793

DeltaTrigger
DeltaTrigger 的应用
  • 有这样一个车辆区间测试的需求,车辆每分钟上报当前位置与车速,每行进10公里,计算区间内最高车速。

彻底搞清Flink中的Window

image-20210202200802480

触发器原型

  • onElement
  • onProcessingTime
  • onEventTime
  • onMerge
  • clear

说明

  • TriggerResult可以是以下之一
    • CONTINUE 什么都不做
    • FIRE_AND_PURGE 触发计算,然后清除窗口中的元素
    • FIRE 触发计算 默认情况下,内置的触发器只返回 FIRE,不会清除窗口状态。
    • PURGE 清除窗口中的元素
  • 所有的事件时间窗口分配器都有一个 EventTimeTrigger 作为默认触发器。一旦 watermark 到达窗口末尾,这个触发器就会被触发。
  • 全局窗口(GlobalWindow)的默认触发器是永不会被触发的 NeverTrigger。因此,在使用全局窗口时,必须自定义一个触发器。
  • 通过使用 trigger() 方法指定触发器,将会覆盖窗口分配器的默认触发器。例如,如果你为 TumblingEventTimeWindows 指定 CountTrigger,
    那么不会再根据时间进度触发窗口,而只能通过计数。目前为止,如果你希望基于时间以及计数进行触发,则必须编写自己的自定义触发器。

窗口的分类

  • 根据窗口是否调用keyBy算子key化,分为被Keys化Windows和非被Keys化Windows;

彻底搞清Flink中的Window

flink window图解

  • 根据窗口的驱动方式,分为时间驱动(Time Window)、数据驱动(Count Window);
  • 根据窗口的元素分配方式,分为滚动窗口(tumbling windows)、滑动窗口(sliding windows)、会话窗口(session windows)以及全局窗口(global windows)

被Keys化Windows

可以理解为按照原始数据流中的某个key进行分类,拥有同一个key值的数据流将为进入同一个window,多个窗口并行的逻辑流

stream        .keyBy(...)               <-  keyed versus non-keyed windows        .window(...)              <-  required: "assigner"       [.trigger(...)]            <-  optional: "trigger" (else default trigger)       [.evictor(...)]            <-  optional: "evictor" (else no evictor)       [.allowedLateness(...)]    <-  optional: "lateness" (else zero)       [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)        .reduce/aggregate/fold/apply()      <-  required: "function"       [.getSideOutput(...)]      <-  optional: "output tag"

非被Keys化Windows

  • 不做分类,每进入一条数据即增加一个窗口,多个窗口并行,每个窗口处理1条数据

  • WindowAll 将元素按照某种特性聚集在一起,该函数不支持并行操作,默认的并行度就是1,所以如果使用这个算子的话需要注意一下性能问题

    stream      .windowAll(...)           <-  required: "assigner"     [.trigger(...)]            <-  optional: "trigger" (else default trigger)     [.evictor(...)]            <-  optional: "evictor" (else no evictor)     [.allowedLateness(...)]    <-  optional: "lateness" (else zero)     [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)      .reduce/aggregate/fold/apply()      <-  required: "function"     [.getSideOutput(...)]      <-  optional: "output tag"

区别

  • 对于被Key化的数据流,可以将传入事件的任何属性用作键(此处有更多详细信息)。
  • 拥有被Key化的数据流将允许您的窗口计算由多个任务并行执行,因为每个逻辑被Key化的数据流可以独立于其余任务进行处理。
    引用相同Keys的所有数据元将被发送到同一个并行任务。

Time-Based window(基于时间的窗口)

每一条记录来了以后会根据时间属性值采用不同的window assinger 方法分配给一个或者多个窗口,分为滚动窗口(Tumbling windows)和滑动窗口(Sliding windows)。

  • EventTime 数据本身携带的时间,默认的时间属性;

  • ProcessingTime 处理时间;

  • IngestionTime 数据进入flink程序的时间;

Tumbling windows(滚动窗口)

滚动窗口下窗口之间不重叠,且窗口长度是固定的。我们可以用TumblingEventTimeWindows和TumblingProcessingTimeWindows创建一个基于Event Time或Processing Time的滚动时间窗口。

彻底搞清Flink中的Window

tumb-window

下面示例以滚动时间窗口(TumblingEventTimeWindows)为例,默认模式是TimeCharacteristic.ProcessingTime处理时间

/** The time characteristic that is used if none other is set. */ private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;

所以如果使用Event Time即数据的实际产生时间,需要通过senv.setStreamTimeCharacteristic指定

// 指定使用数据的实际时间 senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  DataStream<T> input = ...;  // tumbling event-time windows input     .keyBy(<key selector>)     .window(TumblingEventTimeWindows.of(Time.seconds(5)))     .<windowed transformation>(<window function>);  // tumbling processing-time windows input     .keyBy(<key selector>)     .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))     .<windowed transformation>(<window function>);  // 这里减去8小时,表示用UTC世界时间 input     .keyBy(<key selector>)     .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))     .<windowed transformation>(<window function>);

Sliding windows(滑动窗口)

滑动窗口以一个步长(Slide)不断向前滑动,窗口的长度固定。使用时,我们要设置Slide和Size。Slide的大小决定了Flink以多大的频率来创建新的窗口,Slide较小,窗口的个数会很多。Slide小于窗口的Size时,相邻窗口会重叠,一个事件会被分配到多个窗口;Slide大于Size,有些事件可能被丢掉。

彻底搞清Flink中的Window

slide-window

同理,如果是滑动时间窗口,也是类似的:

// 窗口的大小是10s,每5s滑动一次,也就是5s计算一次 .timeWindow(Time.seconds(10), Time.seconds(5))

这里使用的是timeWindow,通常使用window,那么两者的区别是什么呢?

timeWindow其实判断时间的处理模式是ProcessingTime还是SlidingEventTimeWindows,帮我们判断好了,调用方法直接传入(Time size, Time slide)这两个参数就好了,如果是使用.window方法,则需要自己来判断,就是前者写法更简单一些。

“`java
public WindowedStream timeWindow(Time size, Time slide) {

  • 窗口
    • 窗口的组成
      • 窗口分配器
      • State
      • 窗口函数
        • 增量聚合函数(窗口只维护状态)
        • 全量聚合函数(窗口维护窗口内的数据)
      • 触发器
        • 触发器分类
          • CountTrigger
          • ProcessingTimeTrigger
          • EventTimeTrigger
          • PurgingTrigger
          • DeltaTrigger
            • DeltaTrigger 的应用
        • 触发器原型
        • 说明
    • 窗口的分类
      • 被Keys化Windows
      • 非被Keys化Windows
        • 区别
      • Time-Based window(基于时间的窗口)
        • Tumbling windows(滚动窗口)
        • Sliding windows(滑动窗口)

彻底搞清Flink中的Window

flink-window

关注公众号:Java大数据与数据仓库,回复“资料”,领取资料,学习大数据技术。

窗口

在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。

Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。

  • 一个Window代表有限对象的集合。一个窗口有一个最大的时间戳,该时间戳意味着在其代表的某时间点——所有应该进入这个窗口的元素都已经到达
  • Window就是用来对一个无限的流设置一个有限的集合,在有界的数据集上进行操作的一种机制。window又可以分为基于时间(Time-based)的window以及基于数量(Count-based)的window。
  • Flink DataStream API提供了Time和Count的window,同时增加了基于Session的window。同时,由于某些特殊的需要,DataStream API也提供了定制化的window操作,供用户自定义window。

窗口的组成

窗口分配器

  • assignWindows将某个带有时间戳timestamp的元素element分配给一个或多个窗口,并返回窗口集合

  • getDefaultTrigger 返回跟WindowAssigner关联的默认触发器

  • getWindowSerializer返回WindowAssigner分配的窗口的序列化器

    • 窗口分配器定义如何将数据元分配给窗口。这是通过WindowAssigner 在window(…)(对于被Keys化流)或windowAll()(对于非被Keys化流)调用中指定您的选择来完成的。
  • WindowAssigner负责将每个传入数据元分配给一个或多个窗口。Flink带有预定义的窗口分配器,用于最常见的用例
    即翻滚窗口, 滑动窗口,会话窗口和全局窗口。

  • 您还可以通过扩展WindowAssigner类来实现自定义窗口分配器。

  • 所有内置窗口分配器(全局窗口除外)都根据时间为窗口分配数据元,这可以是处理时间或事件时间。

State

  • 状态,用来存储窗口内的元素,如果有 AggregateFunction,则存储的是增量聚合的中间结果。

窗口函数

选择合适的计算函数,减少开发代码量提高系统性能

增量聚合函数(窗口只维护状态)

  • ReduceFunction
  • AggregateFunction
  • FoldFunction

全量聚合函数(窗口维护窗口内的数据)

  • ProcessWindowFunction
    • 全量计算
    • 支持功能更加灵活
    • 支持状态操作

触发器

彻底搞清Flink中的Window

image-20210202200655485

  • EventTimeTrigger基于事件时间的触发器,对应onEventTime

  • ProcessingTimeTrigger
    基于当前系统时间的触发器,对应onProcessingTime
    ProcessingTime 有最好的性能和最低的延迟。但在分布式计算环境中ProcessingTime具有不确定性,相同数据流多次运行有可能产生不同的计算结果。

  • ContinuousEventTimeTrigger

  • ContinuousProcessingTimeTrigger

  • CountTrigger

    • Trigger确定何时窗口函数准备好处理窗口(由窗口分配器形成)。每个都有默认值。
      如果默认触发器不符合您的需要,您可以使用指定自定义触发器。WindowAssignerTriggertrigger(…)
    • 触发器界面有五种方法可以Trigger对不同的事件做出反应:
      • onElement()为添加到窗口的每个数据元调用该方法。
      • onEventTime()在注册的事件时间计时器触发时调用该方法。
      • onProcessingTime()在注册的处理时间计时器触发时调用该方法。
      • 该onMerge()方法与状态触发器相关,并且当它们的相应窗口合并时合并两个触发器的状态,例如当使用会话窗口时。
      • 最后,该clear()方法在移除相应窗口时执行所需的任何动作。
    • 默认触发器
      • 默认触发器GlobalWindow是NeverTrigger从不触发的。因此,在使用时必须定义自定义触发器GlobalWindow。
      • 通过使用trigger()您指定触发器会覆盖a的默认触发器WindowAssigner。例如,如果指定a CountTrigger,TumblingEventTimeWindows则不再根据时间进度获取窗口,
        而是仅按计数。现在,如果你想根据时间和数量做出反应,你必须编写自己的自定义触发器。
      • event-time窗口分配器都有一个EventTimeTrigger作为默认触发器。该触发器在watermark通过窗口末尾时出发。

触发器分类

CountTrigger

一旦窗口中的数据元数量超过给定限制,就会触发。所以其触发机制实现在onElement中

ProcessingTimeTrigger

基于处理时间的触发。

EventTimeTrigger

根据 watermarks 度量的事件时间进度进行触发。

PurgingTrigger
  • 另一个触发器作为参数作为参数并将其转换为清除触发器。

  • 其作用是在 Trigger 触发窗口计算之后将窗口的 State 中的数据清除。

  • 彻底搞清Flink中的Window

    image-20210202200710573

    前两条数据先后于20:01和20:02进入窗口,此时 State 中的值更新为3,同时到了Trigger的触发时间,输出结果为3。

    彻底搞清Flink中的Window

    image-20210202200733128

  • 由于 PurgingTrigger 的作用,State 中的数据会被清除。

彻底搞清Flink中的Window

image-20210202200744793

DeltaTrigger
DeltaTrigger 的应用
  • 有这样一个车辆区间测试的需求,车辆每分钟上报当前位置与车速,每行进10公里,计算区间内最高车速。

彻底搞清Flink中的Window

image-20210202200802480

触发器原型

  • onElement
  • onProcessingTime
  • onEventTime
  • onMerge
  • clear

说明

  • TriggerResult可以是以下之一
    • CONTINUE 什么都不做
    • FIRE_AND_PURGE 触发计算,然后清除窗口中的元素
    • FIRE 触发计算 默认情况下,内置的触发器只返回 FIRE,不会清除窗口状态。
    • PURGE 清除窗口中的元素
  • 所有的事件时间窗口分配器都有一个 EventTimeTrigger 作为默认触发器。一旦 watermark 到达窗口末尾,这个触发器就会被触发。
  • 全局窗口(GlobalWindow)的默认触发器是永不会被触发的 NeverTrigger。因此,在使用全局窗口时,必须自定义一个触发器。
  • 通过使用 trigger() 方法指定触发器,将会覆盖窗口分配器的默认触发器。例如,如果你为 TumblingEventTimeWindows 指定 CountTrigger,
    那么不会再根据时间进度触发窗口,而只能通过计数。目前为止,如果你希望基于时间以及计数进行触发,则必须编写自己的自定义触发器。

窗口的分类

  • 根据窗口是否调用keyBy算子key化,分为被Keys化Windows和非被Keys化Windows;

彻底搞清Flink中的Window

flink window图解

  • 根据窗口的驱动方式,分为时间驱动(Time Window)、数据驱动(Count Window);
  • 根据窗口的元素分配方式,分为滚动窗口(tumbling windows)、滑动窗口(sliding windows)、会话窗口(session windows)以及全局窗口(global windows)

被Keys化Windows

可以理解为按照原始数据流中的某个key进行分类,拥有同一个key值的数据流将为进入同一个window,多个窗口并行的逻辑流

stream        .keyBy(...)               <-  keyed versus non-keyed windows        .window(...)              <-  required: "assigner"       [.trigger(...)]            <-  optional: "trigger" (else default trigger)       [.evictor(...)]            <-  optional: "evictor" (else no evictor)       [.allowedLateness(...)]    <-  optional: "lateness" (else zero)       [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)        .reduce/aggregate/fold/apply()      <-  required: "function"       [.getSideOutput(...)]      <-  optional: "output tag"

非被Keys化Windows

  • 不做分类,每进入一条数据即增加一个窗口,多个窗口并行,每个窗口处理1条数据

  • WindowAll 将元素按照某种特性聚集在一起,该函数不支持并行操作,默认的并行度就是1,所以如果使用这个算子的话需要注意一下性能问题

    stream      .windowAll(...)           <-  required: "assigner"     [.trigger(...)]            <-  optional: "trigger" (else default trigger)     [.evictor(...)]            <-  optional: "evictor" (else no evictor)     [.allowedLateness(...)]    <-  optional: "lateness" (else zero)     [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)      .reduce/aggregate/fold/apply()      <-  required: "function"     [.getSideOutput(...)]      <-  optional: "output tag"

区别

  • 对于被Key化的数据流,可以将传入事件的任何属性用作键(此处有更多详细信息)。
  • 拥有被Key化的数据流将允许您的窗口计算由多个任务并行执行,因为每个逻辑被Key化的数据流可以独立于其余任务进行处理。
    引用相同Keys的所有数据元将被发送到同一个并行任务。

Time-Based window(基于时间的窗口)

每一条记录来了以后会根据时间属性值采用不同的window assinger 方法分配给一个或者多个窗口,分为滚动窗口(Tumbling windows)和滑动窗口(Sliding windows)。

  • EventTime 数据本身携带的时间,默认的时间属性;

  • ProcessingTime 处理时间;

  • IngestionTime 数据进入flink程序的时间;

Tumbling windows(滚动窗口)

滚动窗口下窗口之间不重叠,且窗口长度是固定的。我们可以用TumblingEventTimeWindows和TumblingProcessingTimeWindows创建一个基于Event Time或Processing Time的滚动时间窗口。

彻底搞清Flink中的Window

tumb-window

下面示例以滚动时间窗口(TumblingEventTimeWindows)为例,默认模式是TimeCharacteristic.ProcessingTime处理时间

/** The time characteristic that is used if none other is set. */ private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;

所以如果使用Event Time即数据的实际产生时间,需要通过senv.setStreamTimeCharacteristic指定

// 指定使用数据的实际时间 senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  DataStream<T> input = ...;  // tumbling event-time windows input     .keyBy(<key selector>)     .window(TumblingEventTimeWindows.of(Time.seconds(5)))     .<windowed transformation>(<window function>);  // tumbling processing-time windows input     .keyBy(<key selector>)     .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))     .<windowed transformation>(<window function>);  // 这里减去8小时,表示用UTC世界时间 input     .keyBy(<key selector>)     .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))     .<windowed transformation>(<window function>);

Sliding windows(滑动窗口)

滑动窗口以一个步长(Slide)不断向前滑动,窗口的长度固定。使用时,我们要设置Slide和Size。Slide的大小决定了Flink以多大的频率来创建新的窗口,Slide较小,窗口的个数会很多。Slide小于窗口的Size时,相邻窗口会重叠,一个事件会被分配到多个窗口;Slide大于Size,有些事件可能被丢掉。

彻底搞清Flink中的Window

slide-window

同理,如果是滑动时间窗口,也是类似的:

// 窗口的大小是10s,每5s滑动一次,也就是5s计算一次 .timeWindow(Time.seconds(10), Time.seconds(5))

这里使用的是timeWindow,通常使用window,那么两者的区别是什么呢?

timeWindow其实判断时间的处理模式是ProcessingTime还是SlidingEventTimeWindows,帮我们判断好了,调用方法直接传入(Time size, Time slide)这两个参数就好了,如果是使用.window方法,则需要自己来判断,就是前者写法更简单一些。

“`java
public WindowedStream timeWindow(Time size, Time slide) {

  • 窗口
    • 窗口的组成
      • 窗口分配器
      • State
      • 窗口函数
        • 增量聚合函数(窗口只维护状态)
        • 全量聚合函数(窗口维护窗口内的数据)
      • 触发器
        • 触发器分类
          • CountTrigger
          • ProcessingTimeTrigger
          • EventTimeTrigger
          • PurgingTrigger
          • DeltaTrigger
            • DeltaTrigger 的应用
        • 触发器原型
        • 说明
    • 窗口的分类
      • 被Keys化Windows
      • 非被Keys化Windows
        • 区别
      • Time-Based window(基于时间的窗口)
        • Tumbling windows(滚动窗口)
        • Sliding windows(滑动窗口)

彻底搞清Flink中的Window

flink-window

关注公众号:Java大数据与数据仓库,回复“资料”,领取资料,学习大数据技术。

窗口

在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。

Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。

  • 一个Window代表有限对象的集合。一个窗口有一个最大的时间戳,该时间戳意味着在其代表的某时间点——所有应该进入这个窗口的元素都已经到达
  • Window就是用来对一个无限的流设置一个有限的集合,在有界的数据集上进行操作的一种机制。window又可以分为基于时间(Time-based)的window以及基于数量(Count-based)的window。
  • Flink DataStream API提供了Time和Count的window,同时增加了基于Session的window。同时,由于某些特殊的需要,DataStream API也提供了定制化的window操作,供用户自定义window。

窗口的组成

窗口分配器

  • assignWindows将某个带有时间戳timestamp的元素element分配给一个或多个窗口,并返回窗口集合

  • getDefaultTrigger 返回跟WindowAssigner关联的默认触发器

  • getWindowSerializer返回WindowAssigner分配的窗口的序列化器

    • 窗口分配器定义如何将数据元分配给窗口。这是通过WindowAssigner 在window(…)(对于被Keys化流)或windowAll()(对于非被Keys化流)调用中指定您的选择来完成的。
  • WindowAssigner负责将每个传入数据元分配给一个或多个窗口。Flink带有预定义的窗口分配器,用于最常见的用例
    即翻滚窗口, 滑动窗口,会话窗口和全局窗口。

  • 您还可以通过扩展WindowAssigner类来实现自定义窗口分配器。

  • 所有内置窗口分配器(全局窗口除外)都根据时间为窗口分配数据元,这可以是处理时间或事件时间。

State

  • 状态,用来存储窗口内的元素,如果有 AggregateFunction,则存储的是增量聚合的中间结果。

窗口函数

选择合适的计算函数,减少开发代码量提高系统性能

增量聚合函数(窗口只维护状态)

  • ReduceFunction
  • AggregateFunction
  • FoldFunction

全量聚合函数(窗口维护窗口内的数据)

  • ProcessWindowFunction
    • 全量计算
    • 支持功能更加灵活
    • 支持状态操作

触发器

彻底搞清Flink中的Window

image-20210202200655485

  • EventTimeTrigger基于事件时间的触发器,对应onEventTime

  • ProcessingTimeTrigger
    基于当前系统时间的触发器,对应onProcessingTime
    ProcessingTime 有最好的性能和最低的延迟。但在分布式计算环境中ProcessingTime具有不确定性,相同数据流多次运行有可能产生不同的计算结果。

  • ContinuousEventTimeTrigger

  • ContinuousProcessingTimeTrigger

  • CountTrigger

    • Trigger确定何时窗口函数准备好处理窗口(由窗口分配器形成)。每个都有默认值。
      如果默认触发器不符合您的需要,您可以使用指定自定义触发器。WindowAssignerTriggertrigger(…)
    • 触发器界面有五种方法可以Trigger对不同的事件做出反应:
      • onElement()为添加到窗口的每个数据元调用该方法。
      • onEventTime()在注册的事件时间计时器触发时调用该方法。
      • onProcessingTime()在注册的处理时间计时器触发时调用该方法。
      • 该onMerge()方法与状态触发器相关,并且当它们的相应窗口合并时合并两个触发器的状态,例如当使用会话窗口时。
      • 最后,该clear()方法在移除相应窗口时执行所需的任何动作。
    • 默认触发器
      • 默认触发器GlobalWindow是NeverTrigger从不触发的。因此,在使用时必须定义自定义触发器GlobalWindow。
      • 通过使用trigger()您指定触发器会覆盖a的默认触发器WindowAssigner。例如,如果指定a CountTrigger,TumblingEventTimeWindows则不再根据时间进度获取窗口,
        而是仅按计数。现在,如果你想根据时间和数量做出反应,你必须编写自己的自定义触发器。
      • event-time窗口分配器都有一个EventTimeTrigger作为默认触发器。该触发器在watermark通过窗口末尾时出发。

触发器分类

CountTrigger

一旦窗口中的数据元数量超过给定限制,就会触发。所以其触发机制实现在onElement中

ProcessingTimeTrigger

基于处理时间的触发。

EventTimeTrigger

根据 watermarks 度量的事件时间进度进行触发。

PurgingTrigger
  • 另一个触发器作为参数作为参数并将其转换为清除触发器。

  • 其作用是在 Trigger 触发窗口计算之后将窗口的 State 中的数据清除。

  • 彻底搞清Flink中的Window

    image-20210202200710573

    前两条数据先后于20:01和20:02进入窗口,此时 State 中的值更新为3,同时到了Trigger的触发时间,输出结果为3。

    彻底搞清Flink中的Window

    image-20210202200733128

  • 由于 PurgingTrigger 的作用,State 中的数据会被清除。

彻底搞清Flink中的Window

image-20210202200744793

DeltaTrigger
DeltaTrigger 的应用
  • 有这样一个车辆区间测试的需求,车辆每分钟上报当前位置与车速,每行进10公里,计算区间内最高车速。

彻底搞清Flink中的Window

image-20210202200802480

触发器原型

  • onElement
  • onProcessingTime
  • onEventTime
  • onMerge
  • clear

说明

  • TriggerResult可以是以下之一
    • CONTINUE 什么都不做
    • FIRE_AND_PURGE 触发计算,然后清除窗口中的元素
    • FIRE 触发计算 默认情况下,内置的触发器只返回 FIRE,不会清除窗口状态。
    • PURGE 清除窗口中的元素
  • 所有的事件时间窗口分配器都有一个 EventTimeTrigger 作为默认触发器。一旦 watermark 到达窗口末尾,这个触发器就会被触发。
  • 全局窗口(GlobalWindow)的默认触发器是永不会被触发的 NeverTrigger。因此,在使用全局窗口时,必须自定义一个触发器。
  • 通过使用 trigger() 方法指定触发器,将会覆盖窗口分配器的默认触发器。例如,如果你为 TumblingEventTimeWindows 指定 CountTrigger,
    那么不会再根据时间进度触发窗口,而只能通过计数。目前为止,如果你希望基于时间以及计数进行触发,则必须编写自己的自定义触发器。

窗口的分类

  • 根据窗口是否调用keyBy算子key化,分为被Keys化Windows和非被Keys化Windows;

彻底搞清Flink中的Window

flink window图解

  • 根据窗口的驱动方式,分为时间驱动(Time Window)、数据驱动(Count Window);
  • 根据窗口的元素分配方式,分为滚动窗口(tumbling windows)、滑动窗口(sliding windows)、会话窗口(session windows)以及全局窗口(global windows)

被Keys化Windows

可以理解为按照原始数据流中的某个key进行分类,拥有同一个key值的数据流将为进入同一个window,多个窗口并行的逻辑流

stream        .keyBy(...)               <-  keyed versus non-keyed windows        .window(...)              <-  required: "assigner"       [.trigger(...)]            <-  optional: "trigger" (else default trigger)       [.evictor(...)]            <-  optional: "evictor" (else no evictor)       [.allowedLateness(...)]    <-  optional: "lateness" (else zero)       [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)        .reduce/aggregate/fold/apply()      <-  required: "function"       [.getSideOutput(...)]      <-  optional: "output tag"

非被Keys化Windows

  • 不做分类,每进入一条数据即增加一个窗口,多个窗口并行,每个窗口处理1条数据

  • WindowAll 将元素按照某种特性聚集在一起,该函数不支持并行操作,默认的并行度就是1,所以如果使用这个算子的话需要注意一下性能问题

    stream      .windowAll(...)           <-  required: "assigner"     [.trigger(...)]            <-  optional: "trigger" (else default trigger)     [.evictor(...)]            <-  optional: "evictor" (else no evictor)     [.allowedLateness(...)]    <-  optional: "lateness" (else zero)     [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)      .reduce/aggregate/fold/apply()      <-  required: "function"     [.getSideOutput(...)]      <-  optional: "output tag"

区别

  • 对于被Key化的数据流,可以将传入事件的任何属性用作键(此处有更多详细信息)。
  • 拥有被Key化的数据流将允许您的窗口计算由多个任务并行执行,因为每个逻辑被Key化的数据流可以独立于其余任务进行处理。
    引用相同Keys的所有数据元将被发送到同一个并行任务。

Time-Based window(基于时间的窗口)

每一条记录来了以后会根据时间属性值采用不同的window assinger 方法分配给一个或者多个窗口,分为滚动窗口(Tumbling windows)和滑动窗口(Sliding windows)。

  • EventTime 数据本身携带的时间,默认的时间属性;

  • ProcessingTime 处理时间;

  • IngestionTime 数据进入flink程序的时间;

Tumbling windows(滚动窗口)

滚动窗口下窗口之间不重叠,且窗口长度是固定的。我们可以用TumblingEventTimeWindows和TumblingProcessingTimeWindows创建一个基于Event Time或Processing Time的滚动时间窗口。

彻底搞清Flink中的Window

tumb-window

下面示例以滚动时间窗口(TumblingEventTimeWindows)为例,默认模式是TimeCharacteristic.ProcessingTime处理时间

/** The time characteristic that is used if none other is set. */ private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;

所以如果使用Event Time即数据的实际产生时间,需要通过senv.setStreamTimeCharacteristic指定

// 指定使用数据的实际时间 senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  DataStream<T> input = ...;  // tumbling event-time windows input     .keyBy(<key selector>)     .window(TumblingEventTimeWindows.of(Time.seconds(5)))     .<windowed transformation>(<window function>);  // tumbling processing-time windows input     .keyBy(<key selector>)     .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))     .<windowed transformation>(<window function>);  // 这里减去8小时,表示用UTC世界时间 input     .keyBy(<key selector>)     .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))     .<windowed transformation>(<window function>);

Sliding windows(滑动窗口)

滑动窗口以一个步长(Slide)不断向前滑动,窗口的长度固定。使用时,我们要设置Slide和Size。Slide的大小决定了Flink以多大的频率来创建新的窗口,Slide较小,窗口的个数会很多。Slide小于窗口的Size时,相邻窗口会重叠,一个事件会被分配到多个窗口;Slide大于Size,有些事件可能被丢掉。

彻底搞清Flink中的Window

slide-window

同理,如果是滑动时间窗口,也是类似的:

// 窗口的大小是10s,每5s滑动一次,也就是5s计算一次 .timeWindow(Time.seconds(10), Time.seconds(5))

这里使用的是timeWindow,通常使用window,那么两者的区别是什么呢?

timeWindow其实判断时间的处理模式是ProcessingTime还是SlidingEventTimeWindows,帮我们判断好了,调用方法直接传入(Time size, Time slide)这两个参数就好了,如果是使用.window方法,则需要自己来判断,就是前者写法更简单一些。

“`java
public WindowedStream timeWindow(Time size, Time slide) {

部分转自互联网,侵权删除联系

赞(0) 打赏
部分文章转自网络,侵权联系删除b2bchain区块链学习技术社区 » 彻底搞清Flink中的Window求职学习资料
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

b2b链

联系我们联系我们