Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / github.com / beorn7 / perks / quantile / stream.go
1 // Package quantile computes approximate quantiles over an unbounded data
2 // stream within low memory and CPU bounds.
3 //
4 // A small amount of accuracy is traded to achieve the above properties.
5 //
6 // Multiple streams can be merged before calling Query to generate a single set
7 // of results. This is meaningful when the streams represent the same type of
8 // data. See Merge and Samples.
9 //
10 // For more detailed information about the algorithm used, see:
11 //
12 // Effective Computation of Biased Quantiles over Data Streams
13 //
14 // http://www.cs.rutgers.edu/~muthu/bquant.pdf
15 package quantile
16
17 import (
18         "math"
19         "sort"
20 )
21
22 // Sample holds an observed value and meta information for compression. JSON
23 // tags have been added for convenience.
24 type Sample struct {
25         Value float64 `json:",string"`
26         Width float64 `json:",string"`
27         Delta float64 `json:",string"`
28 }
29
30 // Samples represents a slice of samples. It implements sort.Interface.
31 type Samples []Sample
32
33 func (a Samples) Len() int           { return len(a) }
34 func (a Samples) Less(i, j int) bool { return a[i].Value < a[j].Value }
35 func (a Samples) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
36
37 type invariant func(s *stream, r float64) float64
38
39 // NewLowBiased returns an initialized Stream for low-biased quantiles
40 // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
41 // error guarantees can still be given even for the lower ranks of the data
42 // distribution.
43 //
44 // The provided epsilon is a relative error, i.e. the true quantile of a value
45 // returned by a query is guaranteed to be within (1±Epsilon)*Quantile.
46 //
47 // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
48 // properties.
49 func NewLowBiased(epsilon float64) *Stream {
50         ƒ := func(s *stream, r float64) float64 {
51                 return 2 * epsilon * r
52         }
53         return newStream(ƒ)
54 }
55
56 // NewHighBiased returns an initialized Stream for high-biased quantiles
57 // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
58 // error guarantees can still be given even for the higher ranks of the data
59 // distribution.
60 //
61 // The provided epsilon is a relative error, i.e. the true quantile of a value
62 // returned by a query is guaranteed to be within 1-(1±Epsilon)*(1-Quantile).
63 //
64 // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
65 // properties.
66 func NewHighBiased(epsilon float64) *Stream {
67         ƒ := func(s *stream, r float64) float64 {
68                 return 2 * epsilon * (s.n - r)
69         }
70         return newStream(ƒ)
71 }
72
73 // NewTargeted returns an initialized Stream concerned with a particular set of
74 // quantile values that are supplied a priori. Knowing these a priori reduces
75 // space and computation time. The targets map maps the desired quantiles to
76 // their absolute errors, i.e. the true quantile of a value returned by a query
77 // is guaranteed to be within (Quantile±Epsilon).
78 //
79 // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties.
80 func NewTargeted(targetMap map[float64]float64) *Stream {
81         // Convert map to slice to avoid slow iterations on a map.
82         // ƒ is called on the hot path, so converting the map to a slice
83         // beforehand results in significant CPU savings.
84         targets := targetMapToSlice(targetMap)
85
86         ƒ := func(s *stream, r float64) float64 {
87                 var m = math.MaxFloat64
88                 var f float64
89                 for _, t := range targets {
90                         if t.quantile*s.n <= r {
91                                 f = (2 * t.epsilon * r) / t.quantile
92                         } else {
93                                 f = (2 * t.epsilon * (s.n - r)) / (1 - t.quantile)
94                         }
95                         if f < m {
96                                 m = f
97                         }
98                 }
99                 return m
100         }
101         return newStream(ƒ)
102 }
103
104 type target struct {
105         quantile float64
106         epsilon  float64
107 }
108
109 func targetMapToSlice(targetMap map[float64]float64) []target {
110         targets := make([]target, 0, len(targetMap))
111
112         for quantile, epsilon := range targetMap {
113                 t := target{
114                         quantile: quantile,
115                         epsilon:  epsilon,
116                 }
117                 targets = append(targets, t)
118         }
119
120         return targets
121 }
122
123 // Stream computes quantiles for a stream of float64s. It is not thread-safe by
124 // design. Take care when using across multiple goroutines.
125 type Stream struct {
126         *stream
127         b      Samples
128         sorted bool
129 }
130
131 func newStream(ƒ invariant) *Stream {
132         x := &stream{ƒ: ƒ}
133         return &Stream{x, make(Samples, 0, 500), true}
134 }
135
136 // Insert inserts v into the stream.
137 func (s *Stream) Insert(v float64) {
138         s.insert(Sample{Value: v, Width: 1})
139 }
140
141 func (s *Stream) insert(sample Sample) {
142         s.b = append(s.b, sample)
143         s.sorted = false
144         if len(s.b) == cap(s.b) {
145                 s.flush()
146         }
147 }
148
149 // Query returns the computed qth percentiles value. If s was created with
150 // NewTargeted, and q is not in the set of quantiles provided a priori, Query
151 // will return an unspecified result.
152 func (s *Stream) Query(q float64) float64 {
153         if !s.flushed() {
154                 // Fast path when there hasn't been enough data for a flush;
155                 // this also yields better accuracy for small sets of data.
156                 l := len(s.b)
157                 if l == 0 {
158                         return 0
159                 }
160                 i := int(math.Ceil(float64(l) * q))
161                 if i > 0 {
162                         i -= 1
163                 }
164                 s.maybeSort()
165                 return s.b[i].Value
166         }
167         s.flush()
168         return s.stream.query(q)
169 }
170
171 // Merge merges samples into the underlying streams samples. This is handy when
172 // merging multiple streams from separate threads, database shards, etc.
173 //
174 // ATTENTION: This method is broken and does not yield correct results. The
175 // underlying algorithm is not capable of merging streams correctly.
176 func (s *Stream) Merge(samples Samples) {
177         sort.Sort(samples)
178         s.stream.merge(samples)
179 }
180
181 // Reset reinitializes and clears the list reusing the samples buffer memory.
182 func (s *Stream) Reset() {
183         s.stream.reset()
184         s.b = s.b[:0]
185 }
186
187 // Samples returns stream samples held by s.
188 func (s *Stream) Samples() Samples {
189         if !s.flushed() {
190                 return s.b
191         }
192         s.flush()
193         return s.stream.samples()
194 }
195
196 // Count returns the total number of samples observed in the stream
197 // since initialization.
198 func (s *Stream) Count() int {
199         return len(s.b) + s.stream.count()
200 }
201
202 func (s *Stream) flush() {
203         s.maybeSort()
204         s.stream.merge(s.b)
205         s.b = s.b[:0]
206 }
207
208 func (s *Stream) maybeSort() {
209         if !s.sorted {
210                 s.sorted = true
211                 sort.Sort(s.b)
212         }
213 }
214
215 func (s *Stream) flushed() bool {
216         return len(s.stream.l) > 0
217 }
218
219 type stream struct {
220         n float64
221         l []Sample
222         ƒ invariant
223 }
224
225 func (s *stream) reset() {
226         s.l = s.l[:0]
227         s.n = 0
228 }
229
230 func (s *stream) insert(v float64) {
231         s.merge(Samples{{v, 1, 0}})
232 }
233
234 func (s *stream) merge(samples Samples) {
235         // TODO(beorn7): This tries to merge not only individual samples, but
236         // whole summaries. The paper doesn't mention merging summaries at
237         // all. Unittests show that the merging is inaccurate. Find out how to
238         // do merges properly.
239         var r float64
240         i := 0
241         for _, sample := range samples {
242                 for ; i < len(s.l); i++ {
243                         c := s.l[i]
244                         if c.Value > sample.Value {
245                                 // Insert at position i.
246                                 s.l = append(s.l, Sample{})
247                                 copy(s.l[i+1:], s.l[i:])
248                                 s.l[i] = Sample{
249                                         sample.Value,
250                                         sample.Width,
251                                         math.Max(sample.Delta, math.Floor(s.ƒ(s, r))-1),
252                                         // TODO(beorn7): How to calculate delta correctly?
253                                 }
254                                 i++
255                                 goto inserted
256                         }
257                         r += c.Width
258                 }
259                 s.l = append(s.l, Sample{sample.Value, sample.Width, 0})
260                 i++
261         inserted:
262                 s.n += sample.Width
263                 r += sample.Width
264         }
265         s.compress()
266 }
267
268 func (s *stream) count() int {
269         return int(s.n)
270 }
271
272 func (s *stream) query(q float64) float64 {
273         t := math.Ceil(q * s.n)
274         t += math.Ceil(s.ƒ(s, t) / 2)
275         p := s.l[0]
276         var r float64
277         for _, c := range s.l[1:] {
278                 r += p.Width
279                 if r+c.Width+c.Delta > t {
280                         return p.Value
281                 }
282                 p = c
283         }
284         return p.Value
285 }
286
287 func (s *stream) compress() {
288         if len(s.l) < 2 {
289                 return
290         }
291         x := s.l[len(s.l)-1]
292         xi := len(s.l) - 1
293         r := s.n - 1 - x.Width
294
295         for i := len(s.l) - 2; i >= 0; i-- {
296                 c := s.l[i]
297                 if c.Width+x.Width+x.Delta <= s.ƒ(s, r) {
298                         x.Width += c.Width
299                         s.l[xi] = x
300                         // Remove element at i.
301                         copy(s.l[i:], s.l[i+1:])
302                         s.l = s.l[:len(s.l)-1]
303                         xi -= 1
304                 } else {
305                         x = c
306                         xi = i
307                 }
308                 r -= c.Width
309         }
310 }
311
312 func (s *stream) samples() Samples {
313         samples := make(Samples, len(s.l))
314         copy(samples, s.l)
315         return samples
316 }