time
Timer and Ticker
- go version: v1.12.1
定时器的实现一般有以下几种:
- 最小堆,go 使用的这种。
- 红黑树,nginx 使用的这种。
- 链表,redis 使用的这种。
- 时间轮,linux 使用的这种。
创建定时器
创建 Timer 的代码:
func NewTimer(d Duration) *Timer { c := make(chan Time, 1) t := &Timer{ C: c, r: runtimeTimer{ when: when(d), f: sendTime, arg: c, }, } startTimer(&t.r) return t}
创建 Ticker 的代码:
func NewTicker(d Duration) *Ticker { if d <= 0 { panic(errors.New("non-positive interval for NewTicker")) } // Give the channel a 1-element time buffer. // If the client falls behind while reading, we drop ticks // on the floor until the client catches up. c := make(chan Time, 1) t := &Ticker{ C: c, r: runtimeTimer{ when: when(d), period: int64(d), f: sendTime, arg: c, }, } startTimer(&t.r) return t}
Timer 和 Ticker 都是调用的 startTimer(*runtimeTimer)
,区别是 Ticker 比 Timer 多了一个 period 字段。
其中 startTimer()
的声明如下:
func startTimer(*runtimeTimer)
没有实现,对应的是 runtime/time.go 中的如下函数:
// startTimer adds t to the timer heap.//go:linkname startTimer time.startTimerfunc startTimer(t *timer) { if raceenabled { racerelease(unsafe.Pointer(t)) } addtimer(t)}
这里的参数是 runtime.timer,而传进来时是 time.runtimeTimer,这两个结构体字段是一一对应的:
// Interface to timers implemented in package runtime.// Must be in sync with ../runtime/time.go:/^type timertype runtimeTimer struct { tb uintptr i int when int64 period int64 f func(interface{}, uintptr) // NOTE: must not be closure arg interface{} seq uintptr}type timer struct { tb *timersBucket // the bucket the timer lives in i int // heap index // Timer wakes up at when, and then at when+period, ... (period > 0 only) // each time calling f(arg, now) in the timer goroutine, so f must be // a well-behaved function and not block. when int64 period int64 f func(interface{}, uintptr) arg interface{} seq uintptr}
在 startTimer()
中调用了 addtimer()
:
func addtimer(t *timer) { tb := t.assignBucket() lock(&tb.lock) ok := tb.addtimerLocked(t) unlock(&tb.lock) if !ok { badTimer() }}
在 runtime/time.go 中有一个全局变量 timers:
var timers [timersLen]struct { timersBucket // The padding should eliminate false sharing // between timersBucket values. pad [cpu.CacheLinePadSize - unsafe.Sizeof(timersBucket{})%cpu.CacheLinePadSize]byte}
它的结构大概是这样子的:
timers 包含固定的 64 个 timersBucket,而每个 timersBucket 中包含多个 *timer
(字段 t)。timersBucket 中的多个 timer 使用最小堆来组织的。
为什么是 64 个?
个数最好应该是 GOMAXPROCS 个,但是这样的话就需要动态分配了,64 是根据内存使用和性能之间平衡得出的。
addtimer()
首先确定一个 timersBucket,然后将 timer 放入这个 bucket 中。
怎么确定 bucket 的?
func (t *timer) assignBucket() *timersBucket { id := uint8(getg().m.p.ptr().id) % timersLen t.tb = &timers[id].timersBucket return t.tb}
根据当前 G 所在的 P 的 id。
然后是放入 bucket 中的逻辑:
func (tb *timersBucket) addtimerLocked(t *timer) bool { // when must never be negative; otherwise timerproc will overflow // during its delta calculation and never expire other runtime timers. if t.when < 0 { t.when = 1<<63 - 1 } t.i = len(tb.t) tb.t = append(tb.t, t) if !siftupTimer(tb.t, t.i) { return false } if t.i == 0 { // siftup moved to top: new earliest deadline. if tb.sleeping && tb.sleepUntil > t.when { tb.sleeping = false notewakeup(&tb.waitnote) } if tb.rescheduling { tb.rescheduling = false goready(tb.gp, 0) } if !tb.created { tb.created = true go timerproc(tb) } } return true}
首先是加入到 t 切片中,然后使用 siftupTimer()
来维护最小堆的性质。t.i == 0
说明当前 bucket 中没有其他 timer。
bucket 第一个添加 timer 时会启动一个协程调用 timerproc
,代码如下:
func timerproc(tb *timersBucket) { tb.gp = getg() for { lock(&tb.lock) tb.sleeping = false now := nanotime() delta := int64(-1) for { // 列表是空的,跳出循环 if len(tb.t) == 0 { delta = -1 break } // 堆上最小的 timer,最老的那个 t := tb.t[0] delta = t.when - now // 还没到时间 if delta > 0 { break } ok := true // ticker,重新计算到期时间,不从堆上删除 if t.period > 0 { // leave in heap but adjust next time to fire t.when += t.period * (1 + -delta/t.period) if !siftdownTimer(tb.t, 0) { ok = false } } else { // timer, remove from heap last := len(tb.t) - 1 if last > 0 { tb.t[0] = tb.t[last] tb.t[0].i = 0 } tb.t[last] = nil tb.t = tb.t[:last] if last > 0 { if !siftdownTimer(tb.t, 0) { ok = false } } t.i = -1 // mark as removed } f := t.f arg := t.arg seq := t.seq unlock(&tb.lock) if !ok { badTimer() } if raceenabled { raceacquire(unsafe.Pointer(t)) } f(arg, seq) lock(&tb.lock) } if delta < 0 || faketime > 0 { // No timers left - put goroutine to sleep. tb.rescheduling = true goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1) continue } // At least one timer pending. Sleep until then. tb.sleeping = true tb.sleepUntil = now + delta noteclear(&tb.waitnote) unlock(&tb.lock) notetsleepg(&tb.waitnote, delta) }}
如果当前 t 列表是空的,那么 rescheduling = true
,然后将当前协程挂起。什么时候再唤醒呢? 在 addtimer()
中如果 rescheduling
为 true,那么就将协程唤醒继续 for 循环。
如果堆上最小的元素(最先到期的)还没到期,那么 sleeping = true
,同时会 sleep 知道该元素到期。如果在 sleep 期间又添加了一个元素,而这个元素比堆上所有的 timer 都更快到期,在 addtimer()
中会通过 waitnote 来唤醒,继续 for 循环来处理。
如果堆上最小的元素已经到期了,应该给这个到期的 timer.C 发送当前时间。如果 timer 是一个 Ticker,那么会修改它的到期时间,不从堆上移走。如果 timer 是一个 Timer,是一次性的,那么会从堆上删除它。
如何计算 Ticker 的下次到期时间?
t.when += t.period * (1 + -delta/t.period)
这里的 delta 是 t.when - now
的结果,表示距离过期时间已经过去了多久,计算新的过期时间时将这个值减去了。
处理 timer 就是调用 timer.f()
,对应的是 timer.sendTime()
:
func sendTime(c interface{}, seq uintptr) { // Non-blocking send of time on c. // Used in NewTimer, it cannot block anyway (buffer). // Used in NewTicker, dropping sends on the floor is // the desired behavior when the reader gets behind, // because the sends are periodic. select { case c.(chan Time) <- Now(): default: }}
Timer 和 Ticker 的 c 都是 make(chan Time, 1)
。对于 Timer 来说,因为有一个缓存,所以会执行到 case 分支。对于 Ticker 来说,因为会多次调用这个方法,如果一直没有从 Ticker.C 中拿取时间,那么这里会调用 default 分支,也就是后面的时间会被丢弃,以此来保证 timerproc
不会阻塞。
停止定时器
Timer 和 Ticker 都是通过 runtime/time.go 中的 stopTimer()
来停止的:
// stopTimer removes t from the timer heap if it is there.// It returns true if t was removed, false if t wasn't even there.//go:linkname stopTimer time.stopTimerfunc stopTimer(t *timer) bool { return deltimer(t)}// Delete timer t from the heap.// Do not need to update the timerproc: if it wakes up early, no big deal.func deltimer(t *timer) bool { if t.tb == nil { // t.tb can be nil if the user created a timer // directly, without invoking startTimer e.g // time.Ticker{C: c} // In this case, return early without any deletion. // See Issue 21874. return false } tb := t.tb lock(&tb.lock) removed, ok := tb.deltimerLocked(t) unlock(&tb.lock) if !ok { badTimer() } return removed}func (tb *timersBucket) deltimerLocked(t *timer) (removed, ok bool) { // t may not be registered anymore and may have // a bogus i (typically 0, if generated by Go). // Verify it before proceeding. i := t.i last := len(tb.t) - 1 if i < 0 || i > last || tb.t[i] != t { return false, true } if i != last { tb.t[i] = tb.t[last] tb.t[i].i = i } tb.t[last] = nil tb.t = tb.t[:last] ok = true if i != last { if !siftupTimer(tb.t, i) { ok = false } if !siftdownTimer(tb.t, i) { ok = false } } return true, ok}
timer.i
表示这个 timer 在堆上的索引。对于 Timer 来说,在到期后可能会从堆上删掉了,这时 timerproc()
函数会将 timer.i 标记为 -1。
删除就是将 timer 和堆上最后一个元素交换,然后从 t 中删除,最后重新维护下堆的性质。
如果不调用 Timer.Stop()/Ticker.Stop() 会发生什么?
Timer 在到期后会被 timerproc()
函数删除,但及时主动删除可以减轻 timersBucket 的压力,尤其是在定时器比较多的情况下。
Ticker 如果不调用 Stop 会一直存在堆上。