道教网站符咒法事网

中和符咒:Flink 中的 EventTimeTrigger 和 ProcessingTimeTrigger 详解

符咒法事    道教网    2023-01-03    30

免费测运势 免费批八字:

免费测算批八字.jpg

师父微信: master8299


EventTimeTrigger

EventTimeTrigger 的触发完全依赖 watermark,换言之, 如果 stream 中没有 watermark,就不会触发 EventTimeTrigger中和符咒

watermark 之于事件时间就是如此重要中和符咒,来看一下 watermark 的定义先~

Watermarks 是某个 event time 窗口中所有数据都到齐的标志中和符咒

Watermarks 作为数据流的一部分流动并携带时间戳 t,Watermark(t) 断言数据流中不会再有小于时间戳 t 的事件出现中和符咒

换言之, 当 Watermark(t) 到达时,标志着所有小于时间戳 t 的事件都已到齐,可放心地对时间戳 t 之前的所有事件执行聚合、窗口关闭等动作中和符咒

来看一下 《Streaming Systems》书中关于 Watermark 原汁原味的定义:

Watermarks are temporal notions of input completeness in the eventtime domain. Worded differently, they are the way the system measures progress and completeness relative to the event times of the records being processed in a stream of events.

Watermarks are temporal notions of input completeness in the eventtime domain. Worded differently, they are the way the system measures progress and completeness relative to the event times of the records being processed in a stream of events.

That point in event time, E, is the point up to which the system believes all inputs with event times less than E have been observed. In other words, it’s an assertion that no more data with event times less than E will ever be seen again.

That point in event time, E, is the point up to which the system believes all inputs with event times less than E have been observed. In other words, it’s an assertion that no more data with event times less than E will ever be seen again.

如下图所示,watermark 和 event 一样在 pipeline 中流动,并且都携带时间戳,W(4) 表示在此之后不会再收到事件时间小于 4 的事件,W(9) 表示在此之后不会再接收到事件时间小于 9 的事件中和符咒

展开全文

watermark 示意图

假设我们准备对这个数据流做窗口聚合操作,时间窗口大小为 4 个时间单位,窗口内元素做求和聚合操作中和符咒。示例代码如下:

input .window(TumblingEventTimeWindows.of(Time.minutes(4L)))

.trigger(EventTimeTrigger.create())

.reduce(newSumReduceFunction());

中和符咒我们跟踪 Flink 源码看看 EventTimeTrigger 的实现逻辑:

A Trigger that fires once the watermark passes the end of the window to which a pane belongs.

publicclassEventTimeTriggerextendsTriggerObject,TimeWindow{

privatestaticfinallongserialVersionUID =1L;

privateEventTimeTrigger(){}

@Override

publicTriggerResultonElement(

Objectelement ,longtimestamp ,TimeWindowwindow ,TriggerContextctx )

throwsException{

if(window .maxTimestamp()=ctx .getCurrentWatermark()){

// if the watermark is already past the window fire immediately

returnTriggerResult.FIRE ;

}else{

ctx .registerEventTimeTimer(window .maxTimestamp());

returnTriggerResult.CONTINUE ;

@Override

publicTriggerResultonEventTime(longtime ,TimeWindowwindow ,TriggerContextctx ){

returntime ==window .maxTimestamp()?TriggerResult.FIRE :TriggerResult.CONTINUE ;

publicstaticEventTimeTriggercreate(){

returnnewEventTimeTrigger();

onElement方法在每次数据进入该 window 时都会触发:首先判断当前的 watermark 是否已经超过了 window 的最大时间(窗口右界时间),如果已经超过,返回触发结果 FIRE;如果尚未超过,则根据窗口右界时间注册一个事件时间定时器,标记触发结果为 CONTINUE中和符咒

继续跟踪 registerEventTimeTimer 的动作,发现原来是将定时器放到了一个优先队列 eventTimeTimersQueue 中中和符咒。优先队列的权重为定时器的时间戳,时间越小越希望先被触发,自然排在队列的前面。

onEventTime方法是在什么时候被调用呢,一路跟踪来到了 InternalTimerServiceImpl 类,发现只有在 advanceWatermark 的时候会触发 onEventTime,具体逻辑是:当 watermark 到来时,根据 watermark 携带的时间戳 t,从事件时间定时器队列中出队所有时间戳小于 t 的定时器,然后触发 onEventTime中和符咒。onEventTime 方法的具体实现中,首先比较触发时间是否是自己当前窗口的结束时间,是则 FIRE,否则继续 CONTINUE。

publicclassInternalTimerServiceImplK,NimplementsInternalTimerServiceN{

/ Event time timers that are currently inflight. /

privatefinalKeyGroupedInternalPriorityQueueTimerHeapInternalTimerK,N

eventTimeTimersQueue ;

@Override

publicvoidregisterEventTimeTimer(Nnamespace ,longtime ){

eventTimeTimersQueue .add(

newTimerHeapInternalTimer(time ,(K)keyContext .getCurrentKey(),namespace ));

publicvoidadvanceWatermark(longtime )throwsException{

currentWatermark =time ;

InternalTimerK,Ntimer ;

while((timer =eventTimeTimersQueue .peek())!=nulltimer .getTimestamp()=time ){

eventTimeTimersQueue .poll();

keyContext .setCurrentKey(timer .getKey());

triggerTarget .onEventTime(timer );

我们以上述示例数据为例中和符咒,看一下具体的执行过程:

当事件到达时中和符咒,根据事件时间将事件分配到相应的窗口中,事件 '2' 被分配到时间窗口 T1T4 中,如下图所示:

后续时间陆续到达,事件 '2','3','1','3' 都陆续被分配到时间窗口 T1T4 中,由于 currentWatermark 未超过窗口结束时间 4 ,因此注册事件时间定时器 Timer(4);事件 '7' 被分配到窗口 T5T8 中,也因为 currentWatermark 未超过窗口结束时间 8,因此注册事件时间定时器 Timer(8)中和符咒。目前为止事件时间定时器队列中有 2 个定时器。

重点来了,接下来到达的是 watermark(4),那么就去事件时间定时器队列中找到所有定时时间小于等于 4 的定时器,Timer(4) 出队,触发 onEventTime 方法,窗口 T1T4 的结束时间和 Timer(4) 的时间戳相等,FIRE 窗口 T1T4(图中标记为绿色表示 FIRE)中和符咒。此时定时器队列中只剩下 1 个定时器。

重复同样过程,事件 '5','9','6' 接踵而至,'5', '6' 被分配到窗口 T5T8 中,'9' 则被分配到新窗口 T9T12 中中和符咒。'5','6' 对应的注册定时器为 Timer(8),'9' 注册了一个新定时器 Timer(12)。此时定时器队列中剩下 2 个定时器。

watermark(9) 的到达会促使定时器 Timer(8) 出队,进而 FIRE 窗口 T5T8中和符咒。以此类推。

ProcessingTimeTrigger

与 EventTimeTrigger 相比,ProcessingTimeTrigger 就相对简单了,它的触发只依赖系统时间中和符咒。我们跟踪 Flink 源码看看 ProcessingTimeTrigger 的实现逻辑:

A Trigger that fires once the current system time passes the end of the window to which a pane belongs.

publicclassProcessingTimeTriggerextendsTriggerObject,TimeWindow{

privatestaticfinallongserialVersionUID =1L;

privateProcessingTimeTrigger(){}

@Override

publicTriggerResultonElement(

Objectelement ,longtimestamp ,TimeWindowwindow ,TriggerContextctx ){

ctx .registerProcessingTimeTimer(window .maxTimestamp());

returnTriggerResult.CONTINUE ;

@Override

publicTriggerResultonProcessingTime(longtime ,TimeWindowwindow ,TriggerContextctx ){

returnTriggerResult.FIRE ;

/ Creates a new trigger that fires once system time passes the end of the window. /

publicstaticProcessingTimeTriggercreate(){

returnnewProcessingTimeTrigger();

onElement方法在每次数据进入该 window 时都会触发:直接根据窗口结束时间注册一个处理时间定时器中和符咒。同样是放到一个处理时间定时器优先队列中。

系统根据系统时间定时地从处理时间定时器队列中取出小于当前系统时间的定时器,然后调用 onProcessingTime方法,FIRE 窗口中和符咒

publicclassInternalTimerServiceImplK,NimplementsInternalTimerServiceN{

/ Processing time timers that are currently inflight. /

privatefinalKeyGroupedInternalPriorityQueueTimerHeapInternalTimerK,N

@Override

publicvoidregisterProcessingTimeTimer(Nnamespace ,longtime ){

InternalTimerK,NoldHead =processingTimeTimersQueue .peek();

if(processingTimeTimersQueue .add(

newTimerHeapInternalTimer(time ,(K)keyContext .getCurrentKey(),namespace ))){

longnextTriggerTime =oldHead !=null?oldHead .getTimestamp():Long.MAX_VALUE ;

// check if we need to reschedule our timer to earlier

if(time nextTriggerTime ){

if(nextTimer !=null){

nextTimer .cancel(false);

nextTimer =processingTimeService .registerTimer(time ,this::onProcessingTime);

privatevoidonProcessingTime(longtime )throwsException{

// null out the timer in case the Triggerable calls registerProcessingTimeTimer

// inside the callback.

nextTimer =null;

InternalTimerK,Ntimer ;

while((timer =processingTimeTimersQueue .peek())!=nulltimer .getTimestamp()=time ){

processingTimeTimersQueue .poll();

keyContext .setCurrentKey(timer .getKey());

triggerTarget .onProcessingTime(timer );

if(timer !=nullnextTimer ==null){

nextTimer =

processingTimeService .registerTimer(

timer .getTimestamp(),this::onProcessingTime);

本文链接:https://www.daojiaowz.com/index.php/post/111370.html

转载声明:本站发布文章及版权归原作者所有,转载本站文章请注明文章来源!

上一篇   下一篇

相关文章