高效定时器的实现

场景

linux crontab在实现定时任务中起到重要的作用,那如果我们去实现定时一个contab又如何实现呢?怎么样才能高效呢?本文会介绍Nginx内部的定时器的实现和golang的Tick定时器的实现。

常规思路

在一个死循环里每秒(精度为秒)去轮询查看有没有到期的任务需要执行。

问题一

有必要每秒轮询一次吗?下一秒、下下一秒、下下下一秒都不一定会有到期执行的任务,那样不就浪费了CPU了,为何不一直休眠到最近一个到期的任务的时间点,再去遍历呢?

问题二

用什么样的数据结构去存储所有任务,才能起到高效的轮询呢?用数组?每次都遍历一遍?很明显,不高效。在Nginx里,使用了红黑树,golang中使用了小堆。

golang tick实现

1
2
3
4
5
6
7
8
type timer struct {
i int // heap index
when int64
period int64
f func(interface{}, uintptr)
arg interface{}
seq uintptr
}

golang用一个timer结构体来表示一个定时任务,其中when表示到期的绝对时间戳,也是用来进行排序构建小堆的关键字段。这样子,最先到期的任务,就会是小堆堆头的那个节点了。period字段用来表示是否循环执行这个定时任务,如果是循环任务,执行完后会重新修改when为下一次执行的时间点,调整小堆。否则,把定时任务从小堆中删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func addtimerLocked(t *timer) {
if t.when < 0 {
t.when = 1<<63 - 1
}
t.i = len(timers.t)
timers.t = append(timers.t, t) // 插入到小堆尾部
siftupTimer(t.i) // 堆调整
if t.i == 0 {
// 如果新加入的timer比之前最早到期的timer还早
// 更新后台运行的检查到期timer 的 goroutine 的睡眠时间
if timers.sleeping {
timers.sleeping = false
notewakeup(&timers.waitnote)
}
if timers.rescheduling {
timers.rescheduling = false
goready(timers.gp, 0)
}
}
if !timers.created {
// 如果是第一次创建的定时器维护
// 启动一条后台goroutine来维护定时器
timers.created = true
go timerproc()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func timerproc() {
timers.gp = getg()
for {
lock(&timers.lock)
timers.sleeping = false
now := nanotime()
delta := int64(-1)
for {
if len(timers.t) == 0 {
delta = -1
break
}
t := timers.t[0] // 获取小堆第一个定时任务,即最早到期的任务
delta = t.when - now
if delta > 0 {
break
}
if t.period > 0 {
// 如果是循环执行的定时任务,重新计算下次执行时间,重新调整堆内位置
t.when += t.period * (1 + -delta/t.period)
siftdownTimer(0)
} else {
// 如果是只执行一次,直接把它从堆内删除
last := len(timers.t) - 1
if last > 0 {
timers.t[0] = timers.t[last]
timers.t[0].i = 0
}
timers.t[last] = nil
timers.t = timers.t[:last]
if last > 0 {
siftdownTimer(0)
}
t.i = -1 // mark as removed
}
f := t.f
arg := t.arg
seq := t.seq
unlock(&timers.lock)
if raceenabled {
raceacquire(unsafe.Pointer(t))
}
// 找到到期的任务,并执行
f(arg, seq)
lock(&timers.lock)
}
if delta < 0 || faketime > 0 {
// No timers left - put goroutine to sleep.
timers.rescheduling = true
goparkunlock(&timers.lock, "timer goroutine (idle)", traceEvGoBlock, 1)
continue
}
// 休眠到最早到期的定时任务
timers.sleeping = true
noteclear(&timers.waitnote)
unlock(&timers.lock)
notetsleepg(&timers.waitnote, delta)
}
}

Nginx定时器

Nginx运用定时器的地方很多,例如读取http头部超时。Nginx使用红黑树维护所有定时任务事件,进程在每次事件轮询返回后,都会检查一遍红黑树,处理过期的定时事件,设置ngx_event_t结构体里的timedout字段为1。

1
2
3
4
5
6
7
8
// ngx_event_timer.h
ngx_int_t ngx_event_timer_init(ngx_log_t *log); // 初始化定时任务管理器
ngx_msec_t ngx_event_find_timer(void); // 查找定时恩物
void ngx_event_expire_timers(void); // 处理过期的定时任务
ngx_int_t ngx_event_no_timers_left(void);
extern ngx_rbtree_t ngx_event_timer_rbtree; // 全局变量,维护保存所有定时任务时间
  • Nginx woker进程检查处理过期的定时事件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    // worker的事件和定时任务处理函数
    void
    ngx_process_events_and_timers(ngx_cycle_t *cycle)
    {
    ngx_uint_t flags;
    ngx_msec_t timer, delta;
    // ...
    delta = ngx_current_msec;
    (void) ngx_process_events(cycle, timer, flags);
    delta = ngx_current_msec - delta;
    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
    "timer delta: %M", delta);
    // 事件处理
    ngx_event_process_posted(cycle, &ngx_posted_accept_events);
    if (ngx_accept_mutex_held) {
    ngx_shmtx_unlock(&ngx_accept_mutex);
    }
    if (delta) {
    // 处理过期的定时任务
    ngx_event_expire_timers();
    }
    // ...
    }
  • ngx_event_expire_timers过期定时任务处理

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    void
    ngx_event_expire_timers(void)
    {
    ngx_event_t *ev;
    ngx_rbtree_node_t *node, *root, *sentinel;
    sentinel = ngx_event_timer_rbtree.sentinel;
    // 循环查找处理当前所有过期的时间
    for ( ;; ) {
    root = ngx_event_timer_rbtree.root;
    if (root == sentinel) {
    return;
    }
    node = ngx_rbtree_min(root, sentinel); // 在红黑树种查找当前时间最小的节点
    if ((ngx_msec_int_t) (node->key - ngx_current_msec) > 0) {
    // 如果时间最小的节点都还没过期,直接返回
    return;
    }
    ev = (ngx_event_t *) ((char *) node - offsetof(ngx_event_t, timer));
    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
    "event timer del: %d: %M",
    ngx_event_ident(ev->data), ev->timer.key);
    ngx_rbtree_delete(&ngx_event_timer_rbtree, &ev->timer); // 把时间从红黑树种删除
    ev->timer_set = 0;
    ev->timedout = 1; // 把时间设为过期
    ev->handler(ev); // 回调事件处理函数
    }
    }
  • 回调事件处理函数,拿http头部处理函数(ngx_http_process_request_headers)来举例吧

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    static void
    ngx_http_process_request_headers(ngx_event_t *rev)
    {
    // ...
    c = rev->data;
    r = c->data;
    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0,
    "http process request header line");
    // 如果过期了,在上一步ngx_event_expire_timers中,timedout字段会被设为1,表示头部处理超时了,就给客户端错误提示
    if (rev->timedout) {
    ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out");
    c->timedout = 1;
    // 超时,关闭链接,发送request timeout错误
    ngx_http_close_request(r, NGX_HTTP_REQUEST_TIME_OUT);
    return;
    }
    // ...
    }

总结

实现一个定时任务调度器,需要一个进程(线程或者协程) + 一个高效数据结构(满足高效查询、频繁插入删除),例如Nginx使用的红黑树、golang使用的小堆,或者skip list(跳跃表),跳跃表是有序链表,同时插入、删除、查找性能效率也不俗,实现起来还容易,可以考虑下。