博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【源码学习】time.Timer 和 time.Ticker
阅读量:6708 次
发布时间:2019-06-25

本文共 7455 字,大约阅读时间需要 24 分钟。

hot3.png

time

Timer and Ticker

  • go version: v1.12.1

定时器的实现一般有以下几种:

  • 最小堆,go 使用的这种。
  • 红黑树,nginx 使用的这种。
  • 链表,redis 使用的这种。
  • 时间轮,linux 使用的这种。

创建定时器

创建 Timer 的代码:

func NewTimer(d Duration) *Timer {	c := make(chan Time, 1)	t := &Timer{		C: c,		r: runtimeTimer{			when: when(d),			f:    sendTime,			arg:  c,		},	}	startTimer(&t.r)	return t}

创建 Ticker 的代码:

func NewTicker(d Duration) *Ticker {	if d <= 0 {		panic(errors.New("non-positive interval for NewTicker"))	}	// Give the channel a 1-element time buffer.	// If the client falls behind while reading, we drop ticks	// on the floor until the client catches up.	c := make(chan Time, 1)	t := &Ticker{		C: c,		r: runtimeTimer{			when:   when(d),			period: int64(d),			f:      sendTime,			arg:    c,		},	}	startTimer(&t.r)	return t}

Timer 和 Ticker 都是调用的 startTimer(*runtimeTimer),区别是 Ticker 比 Timer 多了一个 period 字段。

其中 startTimer() 的声明如下:

func startTimer(*runtimeTimer)

没有实现,对应的是 runtime/time.go 中的如下函数:

// startTimer adds t to the timer heap.//go:linkname startTimer time.startTimerfunc startTimer(t *timer) {	if raceenabled {		racerelease(unsafe.Pointer(t))	}	addtimer(t)}

这里的参数是 runtime.timer,而传进来时是 time.runtimeTimer,这两个结构体字段是一一对应的:

// Interface to timers implemented in package runtime.// Must be in sync with ../runtime/time.go:/^type timertype runtimeTimer struct {	tb uintptr	i  int	when   int64	period int64	f      func(interface{}, uintptr) // NOTE: must not be closure	arg    interface{}	seq    uintptr}type timer struct {	tb *timersBucket // the bucket the timer lives in	i  int           // heap index	// Timer wakes up at when, and then at when+period, ... (period > 0 only)	// each time calling f(arg, now) in the timer goroutine, so f must be	// a well-behaved function and not block.	when   int64	period int64	f      func(interface{}, uintptr)	arg    interface{}	seq    uintptr}

startTimer() 中调用了 addtimer():

func addtimer(t *timer) {	tb := t.assignBucket()	lock(&tb.lock)	ok := tb.addtimerLocked(t)	unlock(&tb.lock)	if !ok {		badTimer()	}}

在 runtime/time.go 中有一个全局变量 timers:

var timers [timersLen]struct {	timersBucket	// The padding should eliminate false sharing	// between timersBucket values.	pad [cpu.CacheLinePadSize - unsafe.Sizeof(timersBucket{})%cpu.CacheLinePadSize]byte}

它的结构大概是这样子的:

timers 包含固定的 64 个 timersBucket,而每个 timersBucket 中包含多个 *timer(字段 t)。timersBucket 中的多个 timer 使用最小堆来组织的。

为什么是 64 个?

个数最好应该是 GOMAXPROCS 个,但是这样的话就需要动态分配了,64 是根据内存使用和性能之间平衡得出的。

addtimer() 首先确定一个 timersBucket,然后将 timer 放入这个 bucket 中。

怎么确定 bucket 的?

func (t *timer) assignBucket() *timersBucket {	id := uint8(getg().m.p.ptr().id) % timersLen	t.tb = &timers[id].timersBucket	return t.tb}

根据当前 G 所在的 P 的 id。

然后是放入 bucket 中的逻辑:

func (tb *timersBucket) addtimerLocked(t *timer) bool {	// when must never be negative; otherwise timerproc will overflow	// during its delta calculation and never expire other runtime timers.	if t.when < 0 {		t.when = 1<<63 - 1	}	t.i = len(tb.t)	tb.t = append(tb.t, t)	if !siftupTimer(tb.t, t.i) {		return false	}	if t.i == 0 {		// siftup moved to top: new earliest deadline.		if tb.sleeping && tb.sleepUntil > t.when {			tb.sleeping = false			notewakeup(&tb.waitnote)		}		if tb.rescheduling {			tb.rescheduling = false			goready(tb.gp, 0)		}		if !tb.created {			tb.created = true			go timerproc(tb)		}	}	return true}

首先是加入到 t 切片中,然后使用 siftupTimer() 来维护最小堆的性质。t.i == 0 说明当前 bucket 中没有其他 timer。

bucket 第一个添加 timer 时会启动一个协程调用 timerproc,代码如下:

func timerproc(tb *timersBucket) {	tb.gp = getg()	for {		lock(&tb.lock)		tb.sleeping = false		now := nanotime()		delta := int64(-1)		for {            // 列表是空的,跳出循环			if len(tb.t) == 0 {				delta = -1				break            }            // 堆上最小的 timer,最老的那个			t := tb.t[0]            delta = t.when - now            // 还没到时间			if delta > 0 {				break			}            ok := true            // ticker,重新计算到期时间,不从堆上删除			if t.period > 0 {				// leave in heap but adjust next time to fire				t.when += t.period * (1 + -delta/t.period)				if !siftdownTimer(tb.t, 0) {					ok = false				}			} else {				// timer, remove from heap				last := len(tb.t) - 1				if last > 0 {					tb.t[0] = tb.t[last]					tb.t[0].i = 0				}				tb.t[last] = nil				tb.t = tb.t[:last]				if last > 0 {					if !siftdownTimer(tb.t, 0) {						ok = false					}				}				t.i = -1 // mark as removed			}			f := t.f			arg := t.arg			seq := t.seq			unlock(&tb.lock)			if !ok {				badTimer()			}			if raceenabled {				raceacquire(unsafe.Pointer(t))			}			f(arg, seq)			lock(&tb.lock)		}		if delta < 0 || faketime > 0 {			// No timers left - put goroutine to sleep.			tb.rescheduling = true			goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1)			continue		}		// At least one timer pending. Sleep until then.		tb.sleeping = true		tb.sleepUntil = now + delta		noteclear(&tb.waitnote)		unlock(&tb.lock)		notetsleepg(&tb.waitnote, delta)	}}

如果当前 t 列表是空的,那么 rescheduling = true,然后将当前协程挂起。什么时候再唤醒呢?addtimer() 中如果 rescheduling 为 true,那么就将协程唤醒继续 for 循环。

如果堆上最小的元素(最先到期的)还没到期,那么 sleeping = true,同时会 sleep 知道该元素到期。如果在 sleep 期间又添加了一个元素,而这个元素比堆上所有的 timer 都更快到期,在 addtimer() 中会通过 waitnote 来唤醒,继续 for 循环来处理。

如果堆上最小的元素已经到期了,应该给这个到期的 timer.C 发送当前时间。如果 timer 是一个 Ticker,那么会修改它的到期时间,不从堆上移走。如果 timer 是一个 Timer,是一次性的,那么会从堆上删除它。

如何计算 Ticker 的下次到期时间?

t.when += t.period * (1 + -delta/t.period)

这里的 delta 是 t.when - now 的结果,表示距离过期时间已经过去了多久,计算新的过期时间时将这个值减去了。

处理 timer 就是调用 timer.f(),对应的是 timer.sendTime():

func sendTime(c interface{}, seq uintptr) {	// Non-blocking send of time on c.	// Used in NewTimer, it cannot block anyway (buffer).	// Used in NewTicker, dropping sends on the floor is	// the desired behavior when the reader gets behind,	// because the sends are periodic.	select {	case c.(chan Time) <- Now():	default:	}}

Timer 和 Ticker 的 c 都是 make(chan Time, 1)。对于 Timer 来说,因为有一个缓存,所以会执行到 case 分支。对于 Ticker 来说,因为会多次调用这个方法,如果一直没有从 Ticker.C 中拿取时间,那么这里会调用 default 分支,也就是后面的时间会被丢弃,以此来保证 timerproc 不会阻塞。

停止定时器

Timer 和 Ticker 都是通过 runtime/time.go 中的 stopTimer() 来停止的:

// stopTimer removes t from the timer heap if it is there.// It returns true if t was removed, false if t wasn't even there.//go:linkname stopTimer time.stopTimerfunc stopTimer(t *timer) bool {	return deltimer(t)}// Delete timer t from the heap.// Do not need to update the timerproc: if it wakes up early, no big deal.func deltimer(t *timer) bool {	if t.tb == nil {		// t.tb can be nil if the user created a timer		// directly, without invoking startTimer e.g		//    time.Ticker{C: c}		// In this case, return early without any deletion.		// See Issue 21874.		return false	}	tb := t.tb	lock(&tb.lock)	removed, ok := tb.deltimerLocked(t)	unlock(&tb.lock)	if !ok {		badTimer()	}	return removed}func (tb *timersBucket) deltimerLocked(t *timer) (removed, ok bool) {	// t may not be registered anymore and may have	// a bogus i (typically 0, if generated by Go).	// Verify it before proceeding.	i := t.i	last := len(tb.t) - 1	if i < 0 || i > last || tb.t[i] != t {		return false, true	}	if i != last {		tb.t[i] = tb.t[last]		tb.t[i].i = i	}	tb.t[last] = nil	tb.t = tb.t[:last]	ok = true	if i != last {		if !siftupTimer(tb.t, i) {			ok = false		}		if !siftdownTimer(tb.t, i) {			ok = false		}	}	return true, ok}

timer.i 表示这个 timer 在堆上的索引。对于 Timer 来说,在到期后可能会从堆上删掉了,这时 timerproc() 函数会将 timer.i 标记为 -1。

删除就是将 timer 和堆上最后一个元素交换,然后从 t 中删除,最后重新维护下堆的性质。

如果不调用 Timer.Stop()/Ticker.Stop() 会发生什么?

Timer 在到期后会被 timerproc() 函数删除,但及时主动删除可以减轻 timersBucket 的压力,尤其是在定时器比较多的情况下。

Ticker 如果不调用 Stop 会一直存在堆上。

转载于:https://my.oschina.net/u/2004526/blog/3042442

你可能感兴趣的文章
Java调用Https
查看>>
用户定位 User Location
查看>>
图文混排 文字垂直居中对齐
查看>>
Qt Socket简单通信
查看>>
如何优雅地过滤敏感词
查看>>
windows EFS加密
查看>>
那些年遇到过的面试题
查看>>
sublime汉化
查看>>
Netfilter/iptables的一些新进展
查看>>
关于Netfilter NF_HOOK宏的outdev参数bug
查看>>
VNC配置
查看>>
RIPv2与EIGRP的自动汇总区别
查看>>
python Flask w2ui sidebar json数据加载方法
查看>>
双向实时远程同步文件(inotify+rsync与nfs)
查看>>
SIM_AT_Command
查看>>
Windows-- ×××安装与配置过程
查看>>
rsync生产实战考试题模拟09
查看>>
Session详解
查看>>
基于centOS6.7搭建LAMP(httpd-2.4.18+mysql-5.5.47+php-5.6.16)环境
查看>>
C# Directory和DirectoryInfo类(文件目录操作)
查看>>