1 回顾

上一篇文章对Flink简单应用做了介绍,本文继续介绍Flink的核心概念。根据上一篇介绍,Flink的两个核心概念Time和Window是对最深刻的描述。先来回顾一下:

Time为数据打上一个维度标签,使得数据集合转化为。Window利用Time对进行进一步抽象,按照一定的区间组合来处理这种数据,从中发现特征,寻找商业价值。

Time是的关键维度,无Time不成。Time是Window的基础,因为Window是依据Time的值来进行划分(Global Window可以看作Time从0到无穷大)。剖析Window前,首先需要明确Time是否足够清晰。或者说上一篇文章介绍的EventTime、ProcessingTime、IngestTime是否足够完备?答案是否定的。那么Time还有哪些模糊地带,或者是还有哪些问题需要考虑?

首先无论是EventTime还是ProcessingTime或IngestTime,对最终的数据处理都存在不准确问题,下面详细分析Flink对Time的建模。

2 Time的建模

2.1 Time的完备性

先看EventTime。EventTime由事件的发生处产生,这通常和数据的采集、事件生成有关。数据采集最普遍的情况就是各种传感器,而通常他们的时间存在一定的偏差,并且,从采集节点到Flink应用的计算节点之间,也存在传输延迟,导致EventTime可能与window间隔发生交错,进而导致计算错误。

再看ProcessingTime。ProcessingTime对于计算任务的积压存在偏差,在传输延迟也同样会导致不准确。

而IngestTime也有类似的问题。

从而不难得出结论,Time具有很好的目标,但现实使得EventTime、ProcessingTime和IngestTime都无法完美的解决计算的准确性问题,从而Time的这3种表达是不完备的。如果只依赖这3种Time来建模,那么Flink基于的模型自然就是不正确的了。

2.2 Time的反演性

是否具有反演性划分是计算模型的重要分水岭。一个具有反演性的计算模型,在任何时候都可以基于事件推演出最终状态,这在现代的数据计算系统中非常重要。

例如,计算模型的修正、优化升级等等时常发生,这时就需要对过去事件数据进行重新推算。否则,计算系统的升级将不可进行或者只能接受升级前的错误计算结果。

从Time的3种形式来看,具备反演性的就只有EventTime了,只要事件数据中的EventTime作为计算的时间基准,那么该计算模型就具有反演性,计算逻辑的进化自然就不是问题了。

2.3 Flink对Time的建模

对于不需要反演性的系统,如接受历史数据不可重新计算、接受系统不可升级等可以采用ProcessingTime或IngestTime作为时间基准。而在需要反演性的系统中,只能选择EventTime作为基准,这时EventTime的不准确性将不可避免。

EventTime的不准确性表现在两个方面。首先,事件的生成端的时钟错误;其次,事件的传输延迟导致窗口错过。其中第一个问题需要通过对时解决。而第二个问题更为严重一些,例如事件发生传输延迟,直到当前计算窗口结束后才到达,这些需要计算系统给予一定的设计来处理。

下面分析Flink对窗口错过的应对方案。

Flink在选择EventTime为时间基准的系统中,引入watermark来解决窗口错过问题。watermark定义如下:

对于晚于watermark的事件,系统认为不会发生。

不难看出,watermark是在计算模型中增加一个容忍时间偏移,事件中EventTime超过该偏移则不会纳入窗口计算。所以watermark是一个需要领域知识才能获取到的值。Flink通过指定或启发式生成的方式来指定。

所以开放一个watermark给计算应用系统的设计者,是Flink给出的答案。

解决了Time的问题,Window就更加清晰了。下面再重新看看Window。

3 Window的建模

TumblingWindow, SlidingWindow, SessionWindow, GlobalWindow是Flink对的Time模型稳固后,使用计算区间来对进行进一步抽象,以便于不同计算目的的实现。

除此之外,Flink还支持countWindow,这是一种基于计数的window,在满足计数值的时候重新开窗,而这个计数是和Time无关的,但这类开窗策略对于某些预警机制非常有用。

所以,开窗条件是window的设计核心之一,上述几种窗口的介绍详见上一篇文章。

另外,还有一个窗口计算时机的设计。这就是触发器

3.1 触发器(Trigger)

触发器控制生成结果的时机,它决定了何时将窗口内的计算结果进行输出。在使用EventTime作为时间基准时,默认将使用watermark作为触发条件。这时对窗口的计算结果是准确的。

在Flink的设计中,还会提供触发器之前就输出近似计算结果的机制,这将由自定义触发器来实现。

例如:

<window>.trigger(ContinuousEventTimeTrigger.of(Time.seconds(1)))

将会在收到事件后1秒时计算近似结果。

Flink有如下内建触发器。

3.1.1 EventTimeTrigger

该触发器以EventTime为基准,由watermark机制触发。即watermark确定的时间到了就触发。

3.1.2 ProcessingTimeTrigger

该触发器以ProcessingTime为基准,时间到了就触发。

3.1.3 CountTrigger

该触发器以当前窗口的元素个数为触发条件,超过指定的Count就触发。

3.1.4 PurgingTrigger

当前窗口被标记为清除时触发。

除此之外,Flink还可以自定义触发器,这里暂不展开。

3.2 窗口函数(WindowFunction)

触发器决定了何时该对window内的数据进行计算。而窗口函数则定义了如何进行计算,Flink将窗口函数分为4种形式。

3.2.1 ReduceFunction

Reduce函数主要用于将当前窗口内的两个元素转化为一个元素。而转化的计算逻辑可以自己定义。如下所示。

<window>.reduce { (v1, v2) => (a1, a2) }

3.2.2 AggregateFunction

Aggregate函数是更通用的Reduce函数。将会实现更复杂的窗口计算逻辑。参考官方文档,代码如下所示。

class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
  override def createAccumulator() = (0L, 0L)
  override def add(value: (String, Long), accumulator: (Long, Long)) =
    (accumulator._1 + value._2, accumulator._2 + 1L)
  override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
  override def merge(a: (Long, Long), b: (Long, Long)) =
    (a._1 + b._1, a._2 + b._2)
}
val input: DataStream[(String, Long)] = ...
input
  .keyBy(<key selector>)
  .window(<window assigner>)
  .aggregate(new AverageAggregate)

AverageAggregate通过override几个函数来实现对Add/Merge/Result的计算,来最终实现求平均值。

3.2.3 FoldFunction

Fold函数是逐步将窗口中的元素转换成计算结果的处理函数。通过指定窗口的输入元素如何与输出类型的元素组合来完成。对于添加到窗口的每个元素和当前输出值,将逐步调用Fold函数。第一个元素与输出类型的预定义初始值组合。

val input: DataStream[(String, Long)] = ...

input
  .keyBy(<key selector>)
  .window(<window assigner>)
  .fold("") { (acc, v) => acc + v._2 }

该代码将fold初始值置为"",然后每个窗口中的元素v将与上一次的累计结果acc进行acc+v._2操作合并到新的acc中。所以上述代码就是

result = ""
for i in window:
  result += i[2]

3.2.4 ProcessWindowFunction

ProcessWindow是最通用的窗口处理函数。用法如下所示。

val input: DataStream[(String, Long)] = ...
input
  .keyBy(_._1)
  .timeWindow(Time.minutes(5))
  .process(new MyProcessWindowFunction())

class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {

  def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = {
    var count = 0L
    for (in <- input) {
      count = count + 1
    }
    out.collect(s"Window ${context.window} count: $count")
  }
}

通过在process中定义计算逻辑,来进行count运算。

3.3 逐出器(Evictor)

逐出器在触发器触发时开始工作,并且可以在窗口函数执行前或执行后进行操作。主要是对窗口内的元素进行移除操作(即逐出)。

3.4 再看迟到的事件

新版本的Flink中提供了Lateness来处理迟到的事件。在默认情况下Lateness值为0,即watermark之后的事件不会纳入窗口进行计算。但Lateness机制的存在可以使得迟到的元素得以处理。参考官方文档,有如下代码来获取迟到的事件。

val lateOutputTag = OutputTag[T]("late-data")
val input: DataStream[T] = ...
val result = input
  .keyBy(<key selector>)
  .window(<window assigner>)
  .allowedLateness(<time>)
  .sideOutputLateData(lateOutputTag)
  .<windowed transformation>(<window function>)

val lateStream = result.getSideOutput(lateOutputTag)

4 我们走到了哪里

Flink对有着深刻的理解,并基于此构建了坚实的Time模型和灵活的Window模型。通过提供Window模型上的丰富计算机制,让的计算变得简单而生动。

本篇对Flink中Time和Window的机制进行了进一步的分析,而这些都是使用Flink进行复杂计算的基石。接下来将继续探讨Flink中的其他核心概念。