Golang限流器rate包源码详细分析
源码地址:golang.org/x/time/rate
rate包是Google基于令牌桶的算法实现的限流器,可以在服务的限流中使用。
一、数据结构
1.常量变量
//定义某个时间的最大频率
//表示每秒的事件数
type Limit float64
//Inf表示无速率限制
const Inf = Limit(math.MaxFloat64)
2. 结构体
Limiter结构体
type Limiter struct {
limit Limit //每秒允许处理的事件数量,即每秒处理事件的频率
burst int //令牌桶的最大数量, 如果burst为0,则除非limit == Inf,否则不允许处理任何事件。
mu sync.Mutex
tokens float64 //令牌桶中可用的令牌数量
// last is the last time the limiter's tokens field was updated
//记录上次limiter的tokens被更新的时间
last time.Time
// lastEvent is the latest time of a rate-limited event (past or future)
//lastEvent记录速率受限制(桶中没有令牌)的时间点,该时间点可能是过去的,也可能是将来的(Reservation预定的结束时间点)
lastEvent time.Time
}
Limiter是限流器中最核心的结构体,用于限流(控制事件发生的频率),在初始化后默认是满的,并以每秒r个令牌的速率重新填充直到达到桶的容量(burst),如果r == Inf表示无限制速率。
Limiter有三个主要的方法 Allow、Reserve和Wait,最常用的是Wait和Allow方法
这三个方法每调用一次都会消耗一个令牌,这三个方法的区别在于没有令牌时,他们的处理方式不同
Allow: 如果没有令牌,则直接返回false
Reserve:如果没有令牌,则返回一个reservation,
Wait:如果没有令牌,则等待直到获取一个令牌或者其上下文被取消。
tokens更新的策略:
1). 成功获取到令牌或成功预约(Reserve)到令牌
2). 预约取消时(Cancel)并且需要还原令牌到令牌桶中时
3). 重新设置限流器的速率时(SetLimit)
- 重新设置限流器的容量时(SetBurst)
lastEvent表示速率受限制的时间点,它可能时过去的时间,也可能时将来的时间。
如果没有预约令牌的话,该时间等于last,是过去的
如果有预约令牌的话,该时间等于最新的预约的截至时间。
注意:由于令牌桶的令牌可以预约,所有令牌桶中的tokens可能为负数。
Reservation结构体
type Reservation struct {
ok bool //到截至时间是否可以获取足够的令牌
lim *Limiter
tokens int //需要获取的令牌数量
timeToAct time.Time //需要等待的时间点
// This is the Limit at reservation time, it can change later.
limit Limit
}
Reservation可以理解成预定令牌的操作,timeToAct是本次预约需要等待到的指定时间点才有足够预约的令牌。
#
二、常用方法
1.令牌桶Limiter相关的方法
1) Limiter初始化
func NewLimiter(r Limit, b int) *Limiter {
return &Limiter{
limit: r,
burst: b,
}
}
初始化Limiter,指定每秒允许处理事件的上限为r,允许令牌桶的最大容量为b
func Every(interval time.Duration) Limit {
if interval <= 0 {
return Inf
}
return 1 / Limit(interval.Seconds())
}
Every将事件的最小时间间隔转换为限制
2)Limiter使用
func (lim *Limiter) Allow() bool {
return lim.AllowN(time.Now(), 1)
}
从令牌桶中获取一个令牌,成功获取到则返回true
func (lim *Limiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(now, n, 0).ok
}
从令牌桶中获取n个令牌,成功获取到则返回true
func (lim *Limiter) Wait(ctx context.Context) (err error) {
return lim.WaitN(ctx, 1)
}
获取一个令牌,如果没有则等待直到获取令牌或者上下文ctx取消
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
//同步获取令牌桶的最大容量burst和限流器的速率limit
lim.mu.Lock()
burst := lim.burst
limit := lim.limit
lim.mu.Unlock()
//如果n大于令牌桶的最大容量,则返回error
if n > burst && limit != Inf {
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst)
}
// Check if ctx is already cancelled
//判断上下文ctx是否已经被取消,如果已经取消则返回error
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Determine wait limit
now := time.Now()
waitLimit := InfDuration
if deadline, ok := ctx.Deadline(); ok { //如果可以获取上下文的截至时间,则更新可以等待的时间waitLimit
waitLimit = deadline.Sub(now)
}
// Reserve
//调用reserveN获取Reversation
r := lim.reserveN(now, n, waitLimit)
if !r.ok { //没有足够的时间获取令牌,则返回error
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
}
// Wait if necessary
//需要等待的时间
delay := r.DelayFrom(now)
if delay == 0 {
return nil
}
t := time.NewTimer(delay)
defer t.Stop()
select {
case <-t.C:
// We can proceed.
return nil
case <-ctx.Done():
// Context was canceled before we could proceed. Cancel the
// reservation, which may permit other events to proceed sooner.
r.Cancel()
return ctx.Err()
}
}
WaitN方法获取n个令牌,直到成功获取或者ctx取消
如果n大于令牌桶的最大容量则返回error
如果上下文被取消或者等待的时间大于上下文的截至时间,则返回error
如果速率限制为Inf则不会限流
无论是Wait、Allow或者Reserve其实都会调用advance和reserveN方法,所以这两个方法是整个限流器rate实现的核心。
advance方法
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
//last不能在当前时间now之后,否则计算出来的elapsed为负数,会导致令牌桶数量减少
last := lim.last
if now.Before(last) {
last = now
}
// Avoid making delta overflow below when last is very old.
//根据令牌桶的缺数计算出令牌桶未进行更新的最大时间
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last) //令牌桶未进行更新的时间段
if elapsed > maxElapsed {
elapsed = maxElapsed
}
// Calculate the new number of tokens, due to time that passed.
//根据未更新的时间(未向桶中加入令牌的时间段)计算出产生的令牌数。
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta //计算出可用的令牌数
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return now, last, tokens
}
advance方法的作用是更新令牌桶的状态,计算出令牌桶未更新的时间(elapsed),根据elapsed算出需要向桶中加入的令牌数delta,然后算出桶中可用的令牌数newTokens
now:当前时间
lastNow: 上次更新令牌的时间
newTokens:计算出桶中可用的令牌数
reserveN方法
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
//如果没有限流则直接返回
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true, //桶中有足够的令牌
lim: lim,
tokens: n,
timeToAct: now,
}
}
//更新令牌桶的状态,tokens为目前可用的令牌数量
now, last, tokens := lim.advance(now)
// Calculate the remaining number of tokens resulting from the request.
//可用的令牌数tokens减去需要获取的令牌数(n)
tokens -= float64(n)
// Calculate the wait duration
//如果tokens小于0,则说明桶中没有足够的令牌,计算出产生这些缺数的令牌需要多久(waitDuration)
//计算出产生出缺数的令牌(即-tokens)需要多长时间
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// Decide result
//如果n小于等于令牌桶的容量,并且可以等待到足够的令牌(即 waitDuration <= maxFutureReserve),则ok为true。表示可以获取到足够的令牌
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n // 需要的令牌数
r.timeToAct = now.Add(waitDuration) //计算获取到足够令牌的结束时间点
}
// Update state
if ok {
lim.last = now //更新tokens的时间
lim.tokens = tokens //更新令牌桶目前可用的令牌数tokens
lim.lastEvent = r.timeToAct //下次事件时间(即获取到足够令牌的时刻)
} else {
lim.last = last
}
lim.mu.Unlock()
return r
}
参数:
now:当前时间
n: 想要获取的令牌数量
maxFutureReserve: 最大的等待时间
reserveN是 AllowN, ReserveN及 WaitN的辅助方法,用于判断在maxFutureReserve时间内是否有足够的令牌。
durationFromTokens和tokensFromDuration工具转换方法
func (limit Limit) durationFromTokens(tokens float64) time.Duration {
seconds := tokens / float64(limit)
return time.Nanosecond * time.Duration(1e9*seconds)
}
根据根据令牌数量tokens计算出产生该数量的令牌需要的时长
// tokensFromDuration is a unit conversion function from a time duration to the number of tokens
// which could be accumulated during that duration at a rate of limit tokens per second.
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
// Split the integer and fractional parts ourself to minimize rounding errors.
// See golang.org/issues/34861.
sec := float64(d/time.Second) * float64(limit)
nsec := float64(d%time.Second) * float64(limit)
return sec + nsec/1e9
}
获取指定期间d内产生的令牌数量
另外:
Limiter的Limit方法用于获取限流的速率即结构体中limit的值,Burst方法用于返回桶的最大容量。
#
2. Reservation相关的方法
Reservation相关的方法即预约令牌需要使用的方法
Reserve和ReserveN分别用于预约1个或者n个令牌
func (lim *Limiter) Reserve() *Reservation {
return lim.ReserveN(time.Now(), 1)
}
// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.
// The Limiter takes this Reservation into account when allowing future events.
// ReserveN returns false if n exceeds the Limiter's burst size.
// Usage example:
// r := lim.ReserveN(time.Now(), 1)
// if !r.OK() {
// // Not allowed to act! Did you remember to set lim.burst to be > 0 ?
// return
// }
// time.Sleep(r.Delay())
// Act()
// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
// If you need to respect a deadline or cancel the delay, use Wait instead.
// To drop or skip events exceeding rate limit, use Allow instead.
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation {
r := lim.reserveN(now, n, InfDuration)
return &r
}
返回一个Reservation,该Reservation指示调用者在n个事件发生前需要等待多长事件
如果n超出了限流器的burst,则返回false
func (r *Reservation) OK() bool {
return r.ok
}
返回限流器limiter是否可以在最大等待时间内提供请求数量的令牌。
如果Ok为false,则Delay返回InfDuration,Cancel不执行任何操作
func (r *Reservation) Delay() time.Duration {
return r.DelayFrom(time.Now())
}
返回到截至时间的时间段 ,即需要等待的时间
func (r *Reservation) DelayFrom(now time.Time) time.Duration {
if !r.ok { //ok为false,则返回InfDuration
return InfDuration
}
delay := r.timeToAct.Sub(now) //截止时间
if delay < 0 { //如果截至时间已过,则返回0
return 0
}
return delay
}
DelayFrom方法用于返回当前时间now到截至时间的时间段
如果为0,表示有足够的令牌,需要立即执行
如果返回InfDuration,表示到截至时间时仍然没有足够的令牌
func (r *Reservation) Cancel() {
r.CancelAt(time.Now())
return
}
func (r *Reservation) CancelAt(now time.Time) {
if !r.ok {
return
}
r.lim.mu.Lock()
defer r.lim.mu.Unlock()
/*
1.如果无需限流
2. tokens为0 (需要获取的令牌数量为0)
3. 已经过了截至时间
以上三种情况无需处理取消操作
*/
if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {
return
}
// calculate tokens to restore
// The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
// after r was obtained. These tokens should not be restored.
//计算出出需要还原的令牌数量
//这里的r.lim.lastEvent可能是本次Reservation的结束时间,也可能是后来的Reservation的结束时间,所以要把本次结束时间点(r.timeToAct)之后产生的令牌数减去
restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
if restoreTokens <= 0 {
return
}
// advance time to now
//从新计算令牌桶的状态
now, _, tokens := r.lim.advance(now)
// calculate new number of tokens
//还原当前令牌桶的令牌数量,当前的令牌数tokens加上需要还原的令牌数restoreTokens
tokens += restoreTokens
//如果tokens大于桶的最大容量,则将tokens置为桶的最大容量
if burst := float64(r.lim.burst); tokens > burst {
tokens = burst
}
// update state
r.lim.last = now //记录桶的更新时间
r.lim.tokens = tokens //更新令牌数量
//还原lastEvent,即上次速率受限制的时间
if r.timeToAct == r.lim.lastEvent {
prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
if !prevEvent.Before(now) {
r.lim.lastEvent = prevEvent
}
}
return
}
CancelAt用于取消预约令牌操作,如果有需要还原的令牌,则将需要还原的令牌重新放入到令牌桶中。
还没有评论,来说两句吧...