4.5 时间服务
时间是Flink中极其重要的概念,在Flink的开发层面上,会在KeyedProcessFunction中和Window中使用到时间概念。一般情况下,在KeyedProcessFunction#processElement()方法中会用到Timer,注册Timer然后重写其onTimer()方法,在Watermark超过Timer的时间点之后,触发回调onTimer()。根据时间类型的不同,可以注册事件时间和处理时间两种Timer。
说到此处,不得不提到Timer注册过程中使用到的定时器服务(TimerService)。TimeService是在算子中提供定时器的管理行为,包含定时器的注册和删除。TimerService在DataStream、State、Blink中都有应用。在DataStream和State模块中,一般会在Keyed算子中使用,在引入Blink之后,在Blink的一般算子中也会使用,如BaseTemporalSortOperator算子。
那么在执行层面上,时间服务TimerService具体是怎么发挥其作用的呢?
简单来讲,在算子中使用时间服务来创建定时器(Timer),并且在Timer触发的时候进行回调,从而进行业务逻辑处理。前边章节中延迟Join的示例中使用过Timer。
4.5.1 定时器服务
定时器服务在Flink中叫作TimerService,窗口算子(WindowOperator)中使用了InternalTimerService来管理定时器(Timer),其初始化是在WindowOperator#open()中实现的。
对于InternalTimerService而言,有几个元素比较重要:名称、命名空间类型N(及其序列化器)、键类型K(及其序列化器)和Triggerable对象(支持延时计算的算子,继承了Triggerable接口来实现回调)。
如代码清单4-5所示。
代码清单4-5 注册Timer入口
一个算子中可以有多个InternalTimeService,通过名称进行区分,如在WindowOperator中,InternalTimeService的名称是“window-timers”,在KeyedProcessOperator中名称是“user-timers”,在CepOperator中名称是“watermark-callbacks”。
InternalTimerService接口的实现类是InternalTimerServiceImpl,Timer的实现类是InternalTimer。InternalTimerServiceImpl使用了两个TimerHeapInternalTimer的优先队列(HeapPriorityQueueSet,该优先队列是Flink自己实现的),分别用于维护事件时间和处理时间的Timer。
InternalTimeServiceManager是Task级别提供的InternalTimeService集中管理器,其使用Map保存了当前所有的InternalTimeService,Map的Key是InternalTimerService的名字。
4.5.2 定时器
定时器在Flink中叫作Timer。窗口的触发器与定时器是紧密联系的。
Flink的定时器使用InternalTimer接口定义行为,如代码清单4-6所示。
代码清单4-6 InternalTimer接口
前面提到了Timer的注册和保存,那么Timer到底是如何触发然后回调用户逻辑的呢?答案在InternalTimerServiceImpl中。
对于事件时间,会根据Watermark的时间,从事件时间的定时器队列中找到比给定时间小的所有定时器,触发该Timer所在的算子,然后由算子去调用UDF中的onTime()方法,如代码清单4-7所示。
代码清单4-7 事件时间触发与回调
处理时间也是类似的逻辑,区别在于,处理时间是从处理时间Timer优先级队列中找到Timer。处理时间因为依赖于当前系统,所以其使用的是周期性调度。
4.5.3 优先级队列
直接使用Java的PriorityQueue看起来也能实现InternalTimer的需求,但是Flink在优先级队列中使用了KeyGroup,是按照KeGroup去重的,并不是按照全局的Key去重,如图4-13所示。
图4-13 Flink实现的优先级队列体系
Flink自己实现了优先级队列来管理Timer,共有2种实现。
1)基于堆内存的优先级队列HeapPriorityQueueSet:基于Java堆内存的优先级队列,其实现思路与Java的PriorityQueue类似,使用了二叉树。
2)基于RocksDB的优先级队列:分为Cache+RocksDB量级,Cache中保存了前N个元素,其余的保存在RocksDB中。写入的时候采用Write-through策略,即写入Cache的同时要更新RocksDB中的数据,可能需要访问磁盘。
基于堆内存的优先级队列比基于RocksDB的优先级队列性能好,但是受限于内存大小,无法容纳太多的数据;基于RocksDB的优先级队列牺牲了部分性能,可以容纳大量的数据。