Kafka中的时间轮算法

[TOC]
在kafka中,有许多请求并不是立即返回,而且处理完一些异步操作或者等待某些条件达成后才返回,这些请求一般都会带有timeout参数,表示如果timeout时间后服务端还不满足返回的条件,就判定此次请求为超时,这时候kafka同样要返回超时的响应给客户端,这样客户端才知道此次请求超时了。比如ack=-1的producer请求,就需要等待所有的isr备份完成了才可以返回给客户端,或者到达timeout时间了返回超时响应给客户端。

上面的场景,可以用延迟任务来实现。也就是定义一个任务,在timeout时间后执行,执行的内容一般就是先检查返回条件是否满足,满足的话就返回客户端需要的响应,如果还是不满足,就发送超时响应给客户端。

对于延迟操作,java自带的实现有Timer和ScheduledThreadPoolExecutor。这两个的底层数据结构都是基于一个延迟队列,在准备执行一个延迟任务时,将其插入到延迟队列中。这些延迟队列其实就是一个用最小堆实现的优先级队列,因此,插入一个任务的时间复杂度是O(logN),取出一个任务执行后调整堆的时间也是O(logN)。

如果要执行的延迟任务不多,O(logN)的速度已经够快了。但是对于kafka这样一个高吞吐量的系统来说,O(logN)的速度还不够,为了追求更快的速度,kafka的设计者使用了Timing Wheel的数据结构,让任务的插入时间复杂度达到了O(1)。

Kafka中的时间轮算法_第1张图片
image.png

上面是时间轮的一个结构图,该时间轮有8个槽,当前时间指向0号槽。

我们再看一下Kafka里面TimingWheel的数据结构

private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {

  private[this] val interval = tickMs * wheelSize
  private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }

  private[this] var currentTime = startMs - (startMs % tickMs) // rounding down to multiple of tickMs
}

tickMs:表示一个槽所代表的时间范围,kafka的默认值的1ms

wheelSize:表示该时间轮有多少个槽,kafka的默认值是20

startMs:表示该时间轮的开始时间

taskCounter:表示该时间轮的任务总数

queue:是一个TimerTaskList的延迟队列。每个槽都有它一个对应的TimerTaskList,TimerTaskList是一个双向链表,有一个expireTime的值,这些TimerTaskList都被加到这个延迟队列中,expireTime最小的槽会排在队列的最前面。

interval:时间轮所能表示的时间跨度,也就是tickMs*wheelSize

buckets:表示TimerTaskList的数组,即各个槽。

currentTime:表示当前时间,也就是时间轮指针指向的时间

运行原理

当新增一个延迟任务时,通过buckets[expiration / tickMs % wheelSize]先计算出它应该属于哪个槽。比如延迟任务的delayMs=2ms,当前时间currentTime是0ms,则expiration=delayMs+startMs=2ms,通过前面的公式算出它应该落于2号槽。并把任务封装成TimerTaskEntry然后加入到TimerTaskList链表中。

之后,kafka会启动一个线程,去推动时间轮的指针转动。其实现原理其实就是通过queue.poll()取出放在最前面的槽的TimerTaskList。由于queue是一个延迟队列,如果队列中的expireTime没有到达,该操作会阻塞住,直到expireTime到达。如果通过queue.poll()取到了TimerTaskList,说明该槽里面的任务时间都已经到达。这时候就可以遍历该TimerTaskList中的任务,然后执行对应的操作了。

针对上面的例子,就2号槽有任务,所以当取出2号槽的TimerTaskList后,会先将currentTime = timeMs - (timeMs % tickMs),其中timeMs也就是该TimerTaskList的expireTime,也就是2Ms。所以,这时currentTime=2ms,也就是时间轮指针指向2Ms。

时间溢出处理

在kafka的默认实现中,tickMs=1Ms,wheelSize=20,这就表示该时间轮所能表示的延迟时间范围是0~20Ms,那如果延迟时间超过20Ms要如何处理呢?Kafka对时间轮做了一层改进,使时间轮变成层级的时间轮。

一开始,第一层的时间轮所能表示时间范围是0~20Ms之间,假设现在出现一个任务的延迟时间是200Ms,那么kafka会再创建一层时间轮,我们称之为第二层时间轮。

第二层时间轮的创建代码如下

overflowWheel = new TimingWheel(
          tickMs = interval,
          wheelSize = wheelSize,
          startMs = currentTime,
          taskCounter = taskCounter,
          queue
)

也就是第二层时间轮每一个槽所能表示的时间是第一层时间轮所能表示的时间范围,也就是20Ms。槽的数量还是一样,其他的属性也是继承自第一层时间轮。这时第二层时间轮所能表示的时间范围就是0~400Ms了。

之后通过buckets[expiration / tickMs % wheelSize]算出延迟时间为200Ms的任务应该位于第二层时间轮的10号槽位。

同理,如果第二层时间轮的时间范围还容纳不了新的延迟任务,就会创建第三层、第四层...

值得注意的是,只有当前时间轮无法容纳目标延迟任务所能表示的时间时,才需要创建更高一级的时间轮,或者说把该任务加到更高一级的时间轮中(如果该时间轮已创建)。

一些细节
当时间轮的指针指向1号槽时,即currentTime=1Ms,说明0号槽的任务都已经到期了,这时0号槽就会被拿出来复用,可以容纳20~21Ms延迟时间的任务。也就是说,如果currentTime=0Ms时进来一个21Ms的延迟任务,就需要创建更高一级的时间轮,但是如果currentTime=1Ms时进来一个21Ms的延迟任务,就可以直接把它放到0号槽中,当currentTime=21时,指针又指向0号槽
细心的同学可能发现,第一层的0号槽所能表示的任务延迟时间范围是01Ms,对应的TimerTaskList的expireTime是0Ms。第二层的0号槽锁能表示的任务延迟时间范围是020Ms,对应的TimerTaskList的expireTime也是0Ms。他们的TimerTaskList又都是放在一个延迟队列中。这时候执行queue.poll()会把这两个TimerTaskList都取出来,然后遍历链表的时候还会判断该任务是否达到执行时间了,如果没有的话,这些任务还会被塞回时间轮中。这时由于第一层指针的转动,原先处于第二层时间轮中的任务可能会重新落到第一层时间轮上面。

源码解析

添加新的延迟任务

//SystemTimer.scala  
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
    if (!timingWheel.add(timerTaskEntry)) {
      // Already expired or cancelled
      if (!timerTaskEntry.cancelled)
        taskExecutor.submit(timerTaskEntry.timerTask)
    }
  }

往时间轮添加新的任务

//TimingWheel
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
    //获取任务的延迟时间
    val expiration = timerTaskEntry.expirationMs
    //先判断任务是否已经完成
    if (timerTaskEntry.cancelled) {
      false
      //如果任务已经到期
    } else if (expiration < currentTime + tickMs) {
      false
      //判断当前时间轮所能表示的时间范围是否可以容纳该任务
    } else if (expiration < currentTime + interval) {
      // 根据任务的延迟时间算出应该位于哪个槽
      val virtualId = expiration / tickMs
      val bucket = buckets((virtualId % wheelSize.toLong).toInt)
      bucket.add(timerTaskEntry)

      // 设置TimerTaskList的expireTime
      if (bucket.setExpiration(virtualId * tickMs)) {
        //把TimerTaskList加入到延迟队列
        queue.offer(bucket)
      }
      true
    } else {
      //如果时间超出当前所能表示的最大范围,则创建新的时间轮,并把任务添加到那个时间轮上面
      if (overflowWheel == null) addOverflowWheel()
      overflowWheel.add(timerTaskEntry)
    }
  }
  private[this] def addOverflowWheel(): Unit = {
    synchronized {
      if (overflowWheel == null) {
        overflowWheel = new TimingWheel(
          tickMs = interval,
          wheelSize = wheelSize,
          startMs = currentTime,
          taskCounter = taskCounter,
          queue
        )
      }
    }
  }

从上面的代码可以看出,对于当前时间轮是否可以容纳目标任务,是通过expiration < currentTime + interval来计算的,也就是根据时间轮的指针往后推interval时间就是时间轮所能表示的时间范围。

时间轮指针的推进

 //SystemTimer.scala 
def advanceClock(timeoutMs: Long): Boolean = {
      //从延迟队列中取出最近的一个槽,如果槽的expireTime没到,此操作会阻塞timeoutMs
    var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
    if (bucket != null) {
      writeLock.lock()
      try {
        while (bucket != null) {
            //推进时间轮的指针
          timingWheel.advanceClock(bucket.getExpiration())
            //把TimerTaskList的任务都取出来重新add一遍,add的时候会检查任务是否已经到期
          bucket.flush(reinsert)
          bucket = delayQueue.poll()
        }
      } finally {
        writeLock.unlock()
      }
      true
    } else {
      false
    }
  }
//TimingWheel
def advanceClock(timeMs: Long): Unit = {
    if (timeMs >= currentTime + tickMs) {
        //推进时间轮的指针
      currentTime = timeMs - (timeMs % tickMs)

      // 推进上层时间轮的指针
      if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
    }
  }

相比于常用的DelayQueue的时间复杂度O(logN),TimingWheel的数据结构在插入任务时只要O(1),获取到达任务的时间复杂度也远低于O(logN)。另外,kafka的TimingWheel在插入任务之前还会先检查任务是否完成,对于那些在任务超时直接就完成指定操作的场景,TimingWheel的表现更加优秀。

零、时间轮定义

Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务TimerTask。

Kafka中的时间轮算法_第2张图片
image.png

时间轮定时器最大的优点:

  1. 是任务的添加与移除,都是O(1)级的复杂度;
  2. 不会占用大量的资源;
  3. 只需要有一个线程去推进时间轮就可以工作了。

我们将对时间轮做层层推进的解析:

一、为什么使用环形队列

假设我们现在有一个很大的数组,专门用于存放延时任务。它的精度达到了毫秒级!那么我们的延迟任务实际上需要将定时的那个时间简单转换为毫秒即可,然后将定时任务存入其中:

比如说当前的时间是2018/10/24 19:43:45,那么就将任务存入Task[1540381425000],value则是定时任务的内容。

private Task[很长] tasks;

public List getTaskList(long timestamp) {
    return task.get(timestamp)
}

// 假装这里真的能一毫秒一个循环
public void run(){
    while (true){
        getTaskList(System.currentTimeMillis()).后台执行()
        Thread.sleep(1);
    }
}

假如这个数组长度达到了亿亿级,我们确实可以这么干。 那如果将精度缩减到秒级呢?我们也需要一个百亿级长度的数组。

先不说内存够不够,显然你的定时器要这么大的内存显然很浪费。

当然如果我们自己写一个map,并保证它不存在hash冲突问题,那也是完全可行的。(我不确定我的想法是否正确,如果错误,请指出)

/* 一个精度为秒级的延时任务管理类 */
private Map taskMap;

public List getTaskList(long timestamp) {
    return taskMap.get(timestamp - timestamp % 1000)
}

// 新增一个任务
public void addTask(long timestamp, Task task) {
    List taskList = getTaskList(timestamp - timestamp % 1000);
        if (taskList == null){
            taskList = new ArrayList();
        }
    taskList.add(task);
}

// 假装这里真的能一秒一个循环
public void run(){
    while (true){
        getTaskList(System.currentTimeMillis()).后台执行()
        Thread.sleep(1000);
    }
}

其实时间轮就是一个不存在hash冲突的数据结构

抛开其他疑问,我们看看手腕上的手表(如果没有去找个钟表,或者想象一个),是不是无论当前是什么时间,总能用我们的表盘去表示它(忽略精度)

就拿秒表来说,它总是落在 0 - 59 秒,每走一圈,又会重新开始。

用伪代码模拟一下我们这个秒表:

private Bucket[60] buckets;// 表示60秒

public void addTask(long timestamp, Task task) {
    Bucket bucket = buckets[timestamp / 1000 % 60];
    bucket.add(task);
}

public Bucket getBucket(long timestamp) {
    return buckets[timestamp / 1000 % 60];
}

// 假装这里真的能一秒一个循环
public void run(){
    while (true){
        getBucket(System.currentTimeMillis()).后台执行()
        Thread.sleep(1000);
    }
}

这样,我们的时间总能落在0 - 59任意一个bucket上,就如同我们的秒钟总是落在0 - 59刻度上一样,这便是时间轮的环形队列。

二、表示的时间有限

但是细心的小伙伴也会发现这么一个问题:如果只能表示60秒内的定时任务应该怎么存储与取出,那是不是太有局限性了?如果想要加入一小时后的延迟任务,该怎么办?

其实还是可以看一看钟表,对于只有三个指针的表(一般的表)来说,最大能表示12个小时,超过了12小时这个范围,时间就会产生歧义。如果我们加多几个指针呢?比如说我们有秒针,分针,时针,上下午针,天针,月针,年针...... 那不就能表示很长很长的一段时间了?而且,它并不需要占用很大的内存。

比如说秒针我们可以用一个长度为60的数组来表示,分针也同样可以用一个长度为60的数组来表示,时针可以用一个长度为24的数组来表示。那么表示一天内的所有时间,只需要三个数组即可。

动手来做吧,我们将这个数据结构称作时间轮,tickMs表示一个刻度,比如说上面说的一秒。wheelSize表示一圈有多少个刻度,即上面说的60。interval表示一圈能表示多少时间,即 tickMs * wheelSize = 60秒。

overflowWheel表示上一层的时间轮,比如说,对于秒钟来说,overflowWheel就表示分钟,以此类推。

public class TimeWheel {

    /** 一个时间槽的时间 */
    private long tickMs;

    /** 时间轮大小 */
    private int wheelSize;

    /** 时间跨度 */
    private long interval;

    /** 槽 */
    private Bucket[] buckets;

    /** 时间轮指针 */
    private long currentTimestamp;

    /** 上层时间轮 */
    private volatile TimeWheel overflowWheel;

    public TimeWheel(long tickMs, int wheelSize, long currentTimestamp) {
        this.currentTimestamp = currentTimestamp;
        this.tickMs = tickMs;
        this.wheelSize = wheelSize;
        this.interval = tickMs * wheelSize;
        this.buckets = new Bucket[wheelSize];
        this.currentTimestamp = currentTimestamp - (currentTimestamp % tickMs);

        for (int i = 0; i < wheelSize; i++) {
            buckets[i] = new Bucket();
        }
    }
}

将任务添加到时间轮中十分简单,对于每个时间轮来说,比如说秒级时间轮,和分级时间轮,都有它自己的过期槽。也就是delayMs < tickMs的时候。

添加延时任务的时候一共就这几种情况:

  • 一、时间到期
    1)比如说有一个任务要在 16:29:07 执行,从秒级时间轮中来看,当我们的当前时间走到16:29:06的时候,则表示这个任务已经过期了。因为它的delayMs = 1000ms,小于了我们的秒级时间轮的tickMs(1000ms)。
  1. 比如说有一个任务要在 16:41:25 执行,从分级时间轮中来看,当我们的当前时间走到 16:41的时候(分级时间轮没有秒针!它的最小精度是分钟(一定要理解这一点)),则表示这个任务已经到期,因为它的delayMs = 25000ms,小于了我们的分级时间轮的tickMs(60000ms)。

二、时间未到期,且delayMs小于interval。
对于秒级时间轮来说,就是延迟时间小于60s,那么肯定能找到一个秒钟槽扔进去。

三、时间未到期,且delayMs大于interval。
对于妙级时间轮来说,就是延迟时间大于等于60s,这时候就需要借助上层时间轮的力量了,很简单的代码实现,就是拿到上层时间轮,然后类似递归一样,把它扔进去。

比如说一个有一个延时为一年后的定时任务,就会在这个递归中不断创建更上层的时间轮,直到找到满足delayMs小于interval的那个时间轮。

这里为了不把代码写的那么复杂,我们每一层时间轮的刻度都一样,也就是秒级时间轮表示60秒,上面则表示60分钟,再上面则表示60小时,再上层则表示60个60小时,再上层则表示60个60个60小时 = 216000小时。

也就是如果将最底层时间轮的tickMs(精度)设置为1000ms。wheelSize设置为60。那么只需要5层时间轮,可表示的时间跨度已经长达24年(216000小时)。

 /**
     * 添加任务到某个时间轮
     */
    public boolean addTask(TimedTask timedTask) {
        long expireTimestamp = timedTask.getExpireTimestamp();
        long delayMs = expireTimestamp - currentTimestamp;
        if (delayMs < tickMs) {// 到期了
            return false;
        } else {

            // 扔进当前时间轮的某个槽中,只有时间【大于某个槽】,才会放进去
            if (delayMs < interval) {
                int bucketIndex = (int) (((delayMs + currentTimestamp) / tickMs) % wheelSize);

                Bucket bucket = buckets[bucketIndex];
                bucket.addTask(timedTask);
            } else {
            // 当maybeInThisBucket大于等于wheelSize时,需要将它扔到上一层的时间轮
                TimeWheel timeWheel = getOverflowWheel();
                timeWheel.addTask(timedTask);
            }
        }
        return true;
    }


   /**
     * 获取或创建一个上层时间轮
     */
    private TimeWheel getOverflowWheel() {
        if (overflowWheel == null) {
            synchronized (this) {
                if (overflowWheel == null) {
                    overflowWheel = new TimeWheel(interval, wheelSize, currentTimestamp, delayQueue);
                }
            }
        }
        return overflowWheel;
    }

当然我们的时间轮还需要一个指针的推进机制,总不能让时间永远停留在当前吧?推进的时候,同时类似递归,去推进一下上一层的时间轮。

注意:要强调一点的是,我们这个时间轮更像是电子表,它不存在时间的中间状态,也就是精度这个概念一定要理解好。比如说,对于秒级时间轮来说,它的精度只能保证到1秒,小于1秒的,都会当成是已到期

对于分级时间轮来说,它的精度只能保证到1分,小于1分的,都会当成是已到期

  /**
     * 尝试推进一下指针
     */
    public void advanceClock(long timestamp) {
        if (timestamp >= currentTimestamp + tickMs) {
            currentTimestamp = timestamp - (timestamp % tickMs);

            if (overflowWheel != null) {
                this.getOverflowWheel()
                    .advanceClock(timestamp);
            }
        }
    }
  • 三、对于高层时间轮来说,精度越来越不准,会不会有影响?
    上面说到,分级时间轮,精度只有分钟级,总不能延迟1秒的定时任务和延迟59秒的定时任务同时执行吧?

有这个疑问的同学很好!实际上很好解决,只需再入时间轮即可。比如说,对于分钟级时间轮来说,delayMs为1秒和delayMs为59秒的都已经过期,我们将其取出,再扔进底层的时间轮不就可以了?

1秒的会被扔到秒级时间轮的下一个执行槽中,而59秒的会被扔到秒级时间轮的后59个时间槽中。

细心的同学会发现,我们的添加任务方法,返回的是一个bool

public boolean addTask(TimedTask timedTask)

再倒回去好好看看,添加到最底层时间轮失败的(我们只能直接操作最底层的时间轮,不能直接操作上层的时间轮),是不是会直接返回flase?对于再入失败的任务,我们直接执行即可。

    /**
     * 将任务添加到时间轮
     */
    public void addOrSubmitTask(TimedTask timedTask) {
        if (!timeWheel.addTask(timedTask)) {
            taskExecutor.submit(timedTask.getTask());
        }
    }
    

  • 四、如何知道一个任务已经过期?
    记得我们将任务存储在槽中嘛?比如说秒级时间轮中,有60个槽,那么一共有60个槽。如果时间轮共有两层,也仅仅只有120个槽。我们只需将槽扔进一个delayedQueue之中即可。

我们轮询地从delayedQueue取出已经过期的槽即可。(前面的所有代码,为了简单说明,并没有引入这个DelayQueue的概念,所以不用去上面翻了,并没有。博主觉得...已经看到这里了,应该很明白这个DelayQueue的意义了。)

其实简单来说,实际上定时任务单单使用DelayQueue来实现,也是可以的,但是一旦任务的数量多了起来,达到了百万级,千万级,针对这个delayQueue的增删,将非常的慢。

一、面向槽的delayQueue

而对于时间轮来说,它只需要往delayQueue里面扔各种槽即可,比如我们的定时任务长短不一,最长的跨度到了24年,这个delayQueue也仅仅只有300个元素。

二、处理过期的槽
而这个槽到期后,也就是被我们从delayQueue中poll出来后,我们只需要将槽中的所有任务循环一次,重新加到新的槽中(添加失败则直接执行)即可。

 /**
     * 推进一下时间轮的指针,并且将delayQueue中的任务取出来再重新扔进去
     */
    public void advanceClock(long timeout) {
        try {
            Bucket bucket = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
            if (bucket != null) {
                timeWheel.advanceClock(bucket.getExpire());
                bucket.flush(this::addTask);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

还是得结合代码来看,debug一下 不然不太容易看懂!

你可能感兴趣的