1 // Package quantile computes approximate quantiles over an unbounded data
2 // stream within low memory and CPU bounds.
4 // A small amount of accuracy is traded to achieve the above properties.
6 // Multiple streams can be merged before calling Query to generate a single set
7 // of results. This is meaningful when the streams represent the same type of
8 // data. See Merge and Samples.
10 // For more detailed information about the algorithm used, see:
12 // Effective Computation of Biased Quantiles over Data Streams
14 // http://www.cs.rutgers.edu/~muthu/bquant.pdf
22 // Sample holds an observed value and meta information for compression. JSON
23 // tags have been added for convenience.
25 Value float64 `json:",string"`
26 Width float64 `json:",string"`
27 Delta float64 `json:",string"`
30 // Samples represents a slice of samples. It implements sort.Interface.
33 func (a Samples) Len() int { return len(a) }
34 func (a Samples) Less(i, j int) bool { return a[i].Value < a[j].Value }
35 func (a Samples) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
37 type invariant func(s *stream, r float64) float64
39 // NewLowBiased returns an initialized Stream for low-biased quantiles
40 // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
41 // error guarantees can still be given even for the lower ranks of the data
44 // The provided epsilon is a relative error, i.e. the true quantile of a value
45 // returned by a query is guaranteed to be within (1±Epsilon)*Quantile.
47 // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
49 func NewLowBiased(epsilon float64) *Stream {
50 ƒ := func(s *stream, r float64) float64 {
51 return 2 * epsilon * r
56 // NewHighBiased returns an initialized Stream for high-biased quantiles
57 // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
58 // error guarantees can still be given even for the higher ranks of the data
61 // The provided epsilon is a relative error, i.e. the true quantile of a value
62 // returned by a query is guaranteed to be within 1-(1±Epsilon)*(1-Quantile).
64 // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
66 func NewHighBiased(epsilon float64) *Stream {
67 ƒ := func(s *stream, r float64) float64 {
68 return 2 * epsilon * (s.n - r)
73 // NewTargeted returns an initialized Stream concerned with a particular set of
74 // quantile values that are supplied a priori. Knowing these a priori reduces
75 // space and computation time. The targets map maps the desired quantiles to
76 // their absolute errors, i.e. the true quantile of a value returned by a query
77 // is guaranteed to be within (Quantile±Epsilon).
79 // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties.
80 func NewTargeted(targetMap map[float64]float64) *Stream {
81 // Convert map to slice to avoid slow iterations on a map.
82 // ƒ is called on the hot path, so converting the map to a slice
83 // beforehand results in significant CPU savings.
84 targets := targetMapToSlice(targetMap)
86 ƒ := func(s *stream, r float64) float64 {
87 var m = math.MaxFloat64
89 for _, t := range targets {
90 if t.quantile*s.n <= r {
91 f = (2 * t.epsilon * r) / t.quantile
93 f = (2 * t.epsilon * (s.n - r)) / (1 - t.quantile)
109 func targetMapToSlice(targetMap map[float64]float64) []target {
110 targets := make([]target, 0, len(targetMap))
112 for quantile, epsilon := range targetMap {
117 targets = append(targets, t)
123 // Stream computes quantiles for a stream of float64s. It is not thread-safe by
124 // design. Take care when using across multiple goroutines.
131 func newStream(ƒ invariant) *Stream {
133 return &Stream{x, make(Samples, 0, 500), true}
136 // Insert inserts v into the stream.
137 func (s *Stream) Insert(v float64) {
138 s.insert(Sample{Value: v, Width: 1})
141 func (s *Stream) insert(sample Sample) {
142 s.b = append(s.b, sample)
144 if len(s.b) == cap(s.b) {
149 // Query returns the computed qth percentiles value. If s was created with
150 // NewTargeted, and q is not in the set of quantiles provided a priori, Query
151 // will return an unspecified result.
152 func (s *Stream) Query(q float64) float64 {
154 // Fast path when there hasn't been enough data for a flush;
155 // this also yields better accuracy for small sets of data.
160 i := int(math.Ceil(float64(l) * q))
168 return s.stream.query(q)
171 // Merge merges samples into the underlying streams samples. This is handy when
172 // merging multiple streams from separate threads, database shards, etc.
174 // ATTENTION: This method is broken and does not yield correct results. The
175 // underlying algorithm is not capable of merging streams correctly.
176 func (s *Stream) Merge(samples Samples) {
178 s.stream.merge(samples)
181 // Reset reinitializes and clears the list reusing the samples buffer memory.
182 func (s *Stream) Reset() {
187 // Samples returns stream samples held by s.
188 func (s *Stream) Samples() Samples {
193 return s.stream.samples()
196 // Count returns the total number of samples observed in the stream
197 // since initialization.
198 func (s *Stream) Count() int {
199 return len(s.b) + s.stream.count()
202 func (s *Stream) flush() {
208 func (s *Stream) maybeSort() {
215 func (s *Stream) flushed() bool {
216 return len(s.stream.l) > 0
225 func (s *stream) reset() {
230 func (s *stream) insert(v float64) {
231 s.merge(Samples{{v, 1, 0}})
234 func (s *stream) merge(samples Samples) {
235 // TODO(beorn7): This tries to merge not only individual samples, but
236 // whole summaries. The paper doesn't mention merging summaries at
237 // all. Unittests show that the merging is inaccurate. Find out how to
238 // do merges properly.
241 for _, sample := range samples {
242 for ; i < len(s.l); i++ {
244 if c.Value > sample.Value {
245 // Insert at position i.
246 s.l = append(s.l, Sample{})
247 copy(s.l[i+1:], s.l[i:])
251 math.Max(sample.Delta, math.Floor(s.ƒ(s, r))-1),
252 // TODO(beorn7): How to calculate delta correctly?
259 s.l = append(s.l, Sample{sample.Value, sample.Width, 0})
268 func (s *stream) count() int {
272 func (s *stream) query(q float64) float64 {
273 t := math.Ceil(q * s.n)
274 t += math.Ceil(s.ƒ(s, t) / 2)
277 for _, c := range s.l[1:] {
279 if r+c.Width+c.Delta > t {
287 func (s *stream) compress() {
293 r := s.n - 1 - x.Width
295 for i := len(s.l) - 2; i >= 0; i-- {
297 if c.Width+x.Width+x.Delta <= s.ƒ(s, r) {
300 // Remove element at i.
301 copy(s.l[i:], s.l[i+1:])
302 s.l = s.l[:len(s.l)-1]
312 func (s *stream) samples() Samples {
313 samples := make(Samples, len(s.l))