2 Copyright 2016 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
24 "golang.org/x/time/rate"
27 type RateLimiter interface {
28 // When gets an item and gets to decide how long that item should wait
29 When(item interface{}) time.Duration
30 // Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
31 // or for success, we'll stop tracking it
32 Forget(item interface{})
33 // NumRequeues returns back how many failures the item has had
34 NumRequeues(item interface{}) int
37 // DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
38 // both overall and per-item rate limitting. The overall is a token bucket and the per-item is exponential
39 func DefaultControllerRateLimiter() RateLimiter {
40 return NewMaxOfRateLimiter(
41 NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
42 // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
43 &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
47 // BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
48 type BucketRateLimiter struct {
52 var _ RateLimiter = &BucketRateLimiter{}
54 func (r *BucketRateLimiter) When(item interface{}) time.Duration {
55 return r.Limiter.Reserve().Delay()
58 func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
62 func (r *BucketRateLimiter) Forget(item interface{}) {
65 // ItemExponentialFailureRateLimiter does a simple baseDelay*10^<num-failures> limit
66 // dealing with max failures and expiration are up to the caller
67 type ItemExponentialFailureRateLimiter struct {
68 failuresLock sync.Mutex
69 failures map[interface{}]int
71 baseDelay time.Duration
72 maxDelay time.Duration
75 var _ RateLimiter = &ItemExponentialFailureRateLimiter{}
77 func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
78 return &ItemExponentialFailureRateLimiter{
79 failures: map[interface{}]int{},
85 func DefaultItemBasedRateLimiter() RateLimiter {
86 return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second)
89 func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
91 defer r.failuresLock.Unlock()
93 exp := r.failures[item]
94 r.failures[item] = r.failures[item] + 1
96 // The backoff is capped such that 'calculated' value never overflows.
97 backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
98 if backoff > math.MaxInt64 {
102 calculated := time.Duration(backoff)
103 if calculated > r.maxDelay {
110 func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
111 r.failuresLock.Lock()
112 defer r.failuresLock.Unlock()
114 return r.failures[item]
117 func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
118 r.failuresLock.Lock()
119 defer r.failuresLock.Unlock()
121 delete(r.failures, item)
124 // ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
125 type ItemFastSlowRateLimiter struct {
126 failuresLock sync.Mutex
127 failures map[interface{}]int
130 fastDelay time.Duration
131 slowDelay time.Duration
134 var _ RateLimiter = &ItemFastSlowRateLimiter{}
136 func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
137 return &ItemFastSlowRateLimiter{
138 failures: map[interface{}]int{},
139 fastDelay: fastDelay,
140 slowDelay: slowDelay,
141 maxFastAttempts: maxFastAttempts,
145 func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
146 r.failuresLock.Lock()
147 defer r.failuresLock.Unlock()
149 r.failures[item] = r.failures[item] + 1
151 if r.failures[item] <= r.maxFastAttempts {
158 func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
159 r.failuresLock.Lock()
160 defer r.failuresLock.Unlock()
162 return r.failures[item]
165 func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
166 r.failuresLock.Lock()
167 defer r.failuresLock.Unlock()
169 delete(r.failures, item)
172 // MaxOfRateLimiter calls every RateLimiter and returns the worst case response
173 // When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
174 // were separately delayed a longer time.
175 type MaxOfRateLimiter struct {
176 limiters []RateLimiter
179 func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
180 ret := time.Duration(0)
181 for _, limiter := range r.limiters {
182 curr := limiter.When(item)
191 func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
192 return &MaxOfRateLimiter{limiters: limiters}
195 func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
197 for _, limiter := range r.limiters {
198 curr := limiter.NumRequeues(item)
207 func (r *MaxOfRateLimiter) Forget(item interface{}) {
208 for _, limiter := range r.limiters {