Remove BPA from Makefile
[icn.git] / cmd / bpa-operator / vendor / google.golang.org / api / support / bundler / bundler.go
1 // Copyright 2016 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // Package bundler supports bundling (batching) of items. Bundling amortizes an
16 // action with fixed costs over multiple items. For example, if an API provides
17 // an RPC that accepts a list of items as input, but clients would prefer
18 // adding items one at a time, then a Bundler can accept individual items from
19 // the client and bundle many of them into a single RPC.
20 //
21 // This package is experimental and subject to change without notice.
22 package bundler
23
24 import (
25         "context"
26         "errors"
27         "math"
28         "reflect"
29         "sync"
30         "time"
31
32         "golang.org/x/sync/semaphore"
33 )
34
35 const (
36         DefaultDelayThreshold       = time.Second
37         DefaultBundleCountThreshold = 10
38         DefaultBundleByteThreshold  = 1e6 // 1M
39         DefaultBufferedByteLimit    = 1e9 // 1G
40 )
41
42 var (
43         // ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit.
44         ErrOverflow = errors.New("bundler reached buffered byte limit")
45
46         // ErrOversizedItem indicates that an item's size exceeds the maximum bundle size.
47         ErrOversizedItem = errors.New("item size exceeds bundle byte limit")
48 )
49
50 // A Bundler collects items added to it into a bundle until the bundle
51 // exceeds a given size, then calls a user-provided function to handle the bundle.
52 type Bundler struct {
53         // Starting from the time that the first message is added to a bundle, once
54         // this delay has passed, handle the bundle. The default is DefaultDelayThreshold.
55         DelayThreshold time.Duration
56
57         // Once a bundle has this many items, handle the bundle. Since only one
58         // item at a time is added to a bundle, no bundle will exceed this
59         // threshold, so it also serves as a limit. The default is
60         // DefaultBundleCountThreshold.
61         BundleCountThreshold int
62
63         // Once the number of bytes in current bundle reaches this threshold, handle
64         // the bundle. The default is DefaultBundleByteThreshold. This triggers handling,
65         // but does not cap the total size of a bundle.
66         BundleByteThreshold int
67
68         // The maximum size of a bundle, in bytes. Zero means unlimited.
69         BundleByteLimit int
70
71         // The maximum number of bytes that the Bundler will keep in memory before
72         // returning ErrOverflow. The default is DefaultBufferedByteLimit.
73         BufferedByteLimit int
74
75         // The maximum number of handler invocations that can be running at once.
76         // The default is 1.
77         HandlerLimit int
78
79         handler       func(interface{}) // called to handle a bundle
80         itemSliceZero reflect.Value     // nil (zero value) for slice of items
81         flushTimer    *time.Timer       // implements DelayThreshold
82
83         mu        sync.Mutex
84         sem       *semaphore.Weighted // enforces BufferedByteLimit
85         semOnce   sync.Once
86         curBundle bundle // incoming items added to this bundle
87
88         // Each bundle is assigned a unique ticket that determines the order in which the
89         // handler is called. The ticket is assigned with mu locked, but waiting for tickets
90         // to be handled is done via mu2 and cond, below.
91         nextTicket uint64 // next ticket to be assigned
92
93         mu2         sync.Mutex
94         cond        *sync.Cond
95         nextHandled uint64 // next ticket to be handled
96
97         // In this implementation, active uses space proportional to HandlerLimit, and
98         // waitUntilAllHandled takes time proportional to HandlerLimit each time an acquire
99         // or release occurs, so large values of HandlerLimit max may cause performance
100         // issues.
101         active map[uint64]bool // tickets of bundles actively being handled
102 }
103
104 type bundle struct {
105         items reflect.Value // slice of item type
106         size  int           // size in bytes of all items
107 }
108
109 // NewBundler creates a new Bundler.
110 //
111 // itemExample is a value of the type that will be bundled. For example, if you
112 // want to create bundles of *Entry, you could pass &Entry{} for itemExample.
113 //
114 // handler is a function that will be called on each bundle. If itemExample is
115 // of type T, the argument to handler is of type []T. handler is always called
116 // sequentially for each bundle, and never in parallel.
117 //
118 // Configure the Bundler by setting its thresholds and limits before calling
119 // any of its methods.
120 func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler {
121         b := &Bundler{
122                 DelayThreshold:       DefaultDelayThreshold,
123                 BundleCountThreshold: DefaultBundleCountThreshold,
124                 BundleByteThreshold:  DefaultBundleByteThreshold,
125                 BufferedByteLimit:    DefaultBufferedByteLimit,
126                 HandlerLimit:         1,
127
128                 handler:       handler,
129                 itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))),
130                 active:        map[uint64]bool{},
131         }
132         b.curBundle.items = b.itemSliceZero
133         b.cond = sync.NewCond(&b.mu2)
134         return b
135 }
136
137 func (b *Bundler) initSemaphores() {
138         // Create the semaphores lazily, because the user may set limits
139         // after NewBundler.
140         b.semOnce.Do(func() {
141                 b.sem = semaphore.NewWeighted(int64(b.BufferedByteLimit))
142         })
143 }
144
145 // Add adds item to the current bundle. It marks the bundle for handling and
146 // starts a new one if any of the thresholds or limits are exceeded.
147 //
148 // If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
149 // the item can never be handled. Add returns ErrOversizedItem in this case.
150 //
151 // If adding the item would exceed the maximum memory allowed
152 // (Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for
153 // memory, Add returns ErrOverflow.
154 //
155 // Add never blocks.
156 func (b *Bundler) Add(item interface{}, size int) error {
157         // If this item exceeds the maximum size of a bundle,
158         // we can never send it.
159         if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
160                 return ErrOversizedItem
161         }
162         // If adding this item would exceed our allotted memory
163         // footprint, we can't accept it.
164         // (TryAcquire also returns false if anything is waiting on the semaphore,
165         // so calls to Add and AddWait shouldn't be mixed.)
166         b.initSemaphores()
167         if !b.sem.TryAcquire(int64(size)) {
168                 return ErrOverflow
169         }
170         b.add(item, size)
171         return nil
172 }
173
174 // add adds item to the current bundle. It marks the bundle for handling and
175 // starts a new one if any of the thresholds or limits are exceeded.
176 func (b *Bundler) add(item interface{}, size int) {
177         b.mu.Lock()
178         defer b.mu.Unlock()
179
180         // If adding this item to the current bundle would cause it to exceed the
181         // maximum bundle size, close the current bundle and start a new one.
182         if b.BundleByteLimit > 0 && b.curBundle.size+size > b.BundleByteLimit {
183                 b.startFlushLocked()
184         }
185         // Add the item.
186         b.curBundle.items = reflect.Append(b.curBundle.items, reflect.ValueOf(item))
187         b.curBundle.size += size
188
189         // Start a timer to flush the item if one isn't already running.
190         // startFlushLocked clears the timer and closes the bundle at the same time,
191         // so we only allocate a new timer for the first item in each bundle.
192         // (We could try to call Reset on the timer instead, but that would add a lot
193         // of complexity to the code just to save one small allocation.)
194         if b.flushTimer == nil {
195                 b.flushTimer = time.AfterFunc(b.DelayThreshold, b.Flush)
196         }
197
198         // If the current bundle equals the count threshold, close it.
199         if b.curBundle.items.Len() == b.BundleCountThreshold {
200                 b.startFlushLocked()
201         }
202         // If the current bundle equals or exceeds the byte threshold, close it.
203         if b.curBundle.size >= b.BundleByteThreshold {
204                 b.startFlushLocked()
205         }
206 }
207
208 // AddWait adds item to the current bundle. It marks the bundle for handling and
209 // starts a new one if any of the thresholds or limits are exceeded.
210 //
211 // If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
212 // the item can never be handled. AddWait returns ErrOversizedItem in this case.
213 //
214 // If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
215 // AddWait blocks until space is available or ctx is done.
216 //
217 // Calls to Add and AddWait should not be mixed on the same Bundler.
218 func (b *Bundler) AddWait(ctx context.Context, item interface{}, size int) error {
219         // If this item exceeds the maximum size of a bundle,
220         // we can never send it.
221         if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
222                 return ErrOversizedItem
223         }
224         // If adding this item would exceed our allotted memory footprint, block
225         // until space is available. The semaphore is FIFO, so there will be no
226         // starvation.
227         b.initSemaphores()
228         if err := b.sem.Acquire(ctx, int64(size)); err != nil {
229                 return err
230         }
231         // Here, we've reserved space for item. Other goroutines can call AddWait
232         // and even acquire space, but no one can take away our reservation
233         // (assuming sem.Release is used correctly). So there is no race condition
234         // resulting from locking the mutex after sem.Acquire returns.
235         b.add(item, size)
236         return nil
237 }
238
239 // Flush invokes the handler for all remaining items in the Bundler and waits
240 // for it to return.
241 func (b *Bundler) Flush() {
242         b.mu.Lock()
243         b.startFlushLocked()
244         // Here, all bundles with tickets < b.nextTicket are
245         // either finished or active. Those are the ones
246         // we want to wait for.
247         t := b.nextTicket
248         b.mu.Unlock()
249         b.initSemaphores()
250         b.waitUntilAllHandled(t)
251 }
252
253 func (b *Bundler) startFlushLocked() {
254         if b.flushTimer != nil {
255                 b.flushTimer.Stop()
256                 b.flushTimer = nil
257         }
258         if b.curBundle.items.Len() == 0 {
259                 return
260         }
261         // Here, both semaphores must have been initialized.
262         bun := b.curBundle
263         b.curBundle = bundle{items: b.itemSliceZero}
264         ticket := b.nextTicket
265         b.nextTicket++
266         go func() {
267                 defer func() {
268                         b.sem.Release(int64(bun.size))
269                         b.release(ticket)
270                 }()
271                 b.acquire(ticket)
272                 b.handler(bun.items.Interface())
273         }()
274 }
275
276 // acquire blocks until ticket is the next to be served, then returns. In order for N
277 // acquire calls to return, the tickets must be in the range [0, N). A ticket must
278 // not be presented to acquire more than once.
279 func (b *Bundler) acquire(ticket uint64) {
280         b.mu2.Lock()
281         defer b.mu2.Unlock()
282         if ticket < b.nextHandled {
283                 panic("bundler: acquire: arg too small")
284         }
285         for !(ticket == b.nextHandled && len(b.active) < b.HandlerLimit) {
286                 b.cond.Wait()
287         }
288         // Here,
289         // ticket == b.nextHandled: the caller is the next one to be handled;
290         // and len(b.active) < b.HandlerLimit: there is space available.
291         b.active[ticket] = true
292         b.nextHandled++
293         // Broadcast, not Signal: although at most one acquire waiter can make progress,
294         // there might be waiters in waitUntilAllHandled.
295         b.cond.Broadcast()
296 }
297
298 // If a ticket is used for a call to acquire, it must later be passed to release. A
299 // ticket must not be presented to release more than once.
300 func (b *Bundler) release(ticket uint64) {
301         b.mu2.Lock()
302         defer b.mu2.Unlock()
303         if !b.active[ticket] {
304                 panic("bundler: release: not an active ticket")
305         }
306         delete(b.active, ticket)
307         b.cond.Broadcast()
308 }
309
310 // waitUntilAllHandled blocks until all tickets < n have called release, meaning
311 // all bundles with tickets < n have been handled.
312 func (b *Bundler) waitUntilAllHandled(n uint64) {
313         // Proof of correctness of this function.
314         // "N is acquired" means acquire(N) has returned.
315         // "N is released" means release(N) has returned.
316         // 1. If N is acquired, N-1 is acquired.
317         //    Follows from the loop test in acquire, and the fact
318         //    that nextHandled is incremented by 1.
319         // 2. If nextHandled >= N, then N-1 is acquired.
320         //    Because we only increment nextHandled to N after N-1 is acquired.
321         // 3. If nextHandled >= N, then all n < N is acquired.
322         //    Follows from #1 and #2.
323         // 4. If N is acquired and N is not in active, then N is released.
324         //    Because we put N in active before acquire returns, and only
325         //    remove it when it is released.
326         // Let min(active) be the smallest member of active, or infinity if active is empty.
327         // 5. If nextHandled >= N and N <= min(active), then all n < N is released.
328         //    From nextHandled >= N and #3, all n < N is acquired.
329         //    N <= min(active) implies n < min(active) for all n < N. So all n < N is not in active.
330         //    So from #4, all n < N is released.
331         // The loop test below is the antecedent of #5.
332         b.mu2.Lock()
333         defer b.mu2.Unlock()
334         for !(b.nextHandled >= n && n <= min(b.active)) {
335                 b.cond.Wait()
336         }
337 }
338
339 // min returns the minimum value of the set s, or the largest uint64 if
340 // s is empty.
341 func min(s map[uint64]bool) uint64 {
342         var m uint64 = math.MaxUint64
343         for n := range s {
344                 if n < m {
345                         m = n
346                 }
347         }
348         return m
349 }