1 // Copyright 2015 The Prometheus Authors
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
6 // http://www.apache.org/licenses/LICENSE-2.0
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
23 dto "github.com/prometheus/client_model/go"
25 "github.com/matttproud/golang_protobuf_extensions/pbutil"
26 "github.com/prometheus/common/model"
29 // Decoder types decode an input stream into metric families.
30 type Decoder interface {
31 Decode(*dto.MetricFamily) error
34 // DecodeOptions contains options used by the Decoder and in sample extraction.
35 type DecodeOptions struct {
36 // Timestamp is added to each value from the stream that has no explicit timestamp set.
40 // ResponseFormat extracts the correct format from a HTTP response header.
41 // If no matching format can be found FormatUnknown is returned.
42 func ResponseFormat(h http.Header) Format {
43 ct := h.Get(hdrContentType)
45 mediatype, params, err := mime.ParseMediaType(ct)
50 const textType = "text/plain"
54 if p, ok := params["proto"]; ok && p != ProtoProtocol {
57 if e, ok := params["encoding"]; ok && e != "delimited" {
63 if v, ok := params["version"]; ok && v != TextVersion {
72 // NewDecoder returns a new decoder based on the given input format.
73 // If the input format does not imply otherwise, a text format decoder is returned.
74 func NewDecoder(r io.Reader, format Format) Decoder {
77 return &protoDecoder{r: r}
79 return &textDecoder{r: r}
82 // protoDecoder implements the Decoder interface for protocol buffers.
83 type protoDecoder struct {
87 // Decode implements the Decoder interface.
88 func (d *protoDecoder) Decode(v *dto.MetricFamily) error {
89 _, err := pbutil.ReadDelimited(d.r, v)
93 if !model.IsValidMetricName(model.LabelValue(v.GetName())) {
94 return fmt.Errorf("invalid metric name %q", v.GetName())
96 for _, m := range v.GetMetric() {
100 for _, l := range m.GetLabel() {
104 if !model.LabelValue(l.GetValue()).IsValid() {
105 return fmt.Errorf("invalid label value %q", l.GetValue())
107 if !model.LabelName(l.GetName()).IsValid() {
108 return fmt.Errorf("invalid label name %q", l.GetName())
115 // textDecoder implements the Decoder interface for the text protocol.
116 type textDecoder struct {
119 fams []*dto.MetricFamily
122 // Decode implements the Decoder interface.
123 func (d *textDecoder) Decode(v *dto.MetricFamily) error {
124 // TODO(fabxc): Wrap this as a line reader to make streaming safer.
125 if len(d.fams) == 0 {
126 // No cached metric families, read everything and parse metrics.
127 fams, err := d.p.TextToMetricFamilies(d.r)
134 d.fams = make([]*dto.MetricFamily, 0, len(fams))
135 for _, f := range fams {
136 d.fams = append(d.fams, f)
146 // SampleDecoder wraps a Decoder to extract samples from the metric families
147 // decoded by the wrapped Decoder.
148 type SampleDecoder struct {
155 // Decode calls the Decode method of the wrapped Decoder and then extracts the
156 // samples from the decoded MetricFamily into the provided model.Vector.
157 func (sd *SampleDecoder) Decode(s *model.Vector) error {
158 err := sd.Dec.Decode(&sd.f)
162 *s, err = extractSamples(&sd.f, sd.Opts)
166 // ExtractSamples builds a slice of samples from the provided metric
167 // families. If an error occurrs during sample extraction, it continues to
168 // extract from the remaining metric families. The returned error is the last
169 // error that has occurred.
170 func ExtractSamples(o *DecodeOptions, fams ...*dto.MetricFamily) (model.Vector, error) {
175 for _, f := range fams {
176 some, err := extractSamples(f, o)
181 all = append(all, some...)
186 func extractSamples(f *dto.MetricFamily, o *DecodeOptions) (model.Vector, error) {
188 case dto.MetricType_COUNTER:
189 return extractCounter(o, f), nil
190 case dto.MetricType_GAUGE:
191 return extractGauge(o, f), nil
192 case dto.MetricType_SUMMARY:
193 return extractSummary(o, f), nil
194 case dto.MetricType_UNTYPED:
195 return extractUntyped(o, f), nil
196 case dto.MetricType_HISTOGRAM:
197 return extractHistogram(o, f), nil
199 return nil, fmt.Errorf("expfmt.extractSamples: unknown metric family type %v", f.GetType())
202 func extractCounter(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
203 samples := make(model.Vector, 0, len(f.Metric))
205 for _, m := range f.Metric {
206 if m.Counter == nil {
210 lset := make(model.LabelSet, len(m.Label)+1)
211 for _, p := range m.Label {
212 lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
214 lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
216 smpl := &model.Sample{
217 Metric: model.Metric(lset),
218 Value: model.SampleValue(m.Counter.GetValue()),
221 if m.TimestampMs != nil {
222 smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
224 smpl.Timestamp = o.Timestamp
227 samples = append(samples, smpl)
233 func extractGauge(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
234 samples := make(model.Vector, 0, len(f.Metric))
236 for _, m := range f.Metric {
241 lset := make(model.LabelSet, len(m.Label)+1)
242 for _, p := range m.Label {
243 lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
245 lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
247 smpl := &model.Sample{
248 Metric: model.Metric(lset),
249 Value: model.SampleValue(m.Gauge.GetValue()),
252 if m.TimestampMs != nil {
253 smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
255 smpl.Timestamp = o.Timestamp
258 samples = append(samples, smpl)
264 func extractUntyped(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
265 samples := make(model.Vector, 0, len(f.Metric))
267 for _, m := range f.Metric {
268 if m.Untyped == nil {
272 lset := make(model.LabelSet, len(m.Label)+1)
273 for _, p := range m.Label {
274 lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
276 lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
278 smpl := &model.Sample{
279 Metric: model.Metric(lset),
280 Value: model.SampleValue(m.Untyped.GetValue()),
283 if m.TimestampMs != nil {
284 smpl.Timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
286 smpl.Timestamp = o.Timestamp
289 samples = append(samples, smpl)
295 func extractSummary(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
296 samples := make(model.Vector, 0, len(f.Metric))
298 for _, m := range f.Metric {
299 if m.Summary == nil {
303 timestamp := o.Timestamp
304 if m.TimestampMs != nil {
305 timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
308 for _, q := range m.Summary.Quantile {
309 lset := make(model.LabelSet, len(m.Label)+2)
310 for _, p := range m.Label {
311 lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
313 // BUG(matt): Update other names to "quantile".
314 lset[model.LabelName(model.QuantileLabel)] = model.LabelValue(fmt.Sprint(q.GetQuantile()))
315 lset[model.MetricNameLabel] = model.LabelValue(f.GetName())
317 samples = append(samples, &model.Sample{
318 Metric: model.Metric(lset),
319 Value: model.SampleValue(q.GetValue()),
320 Timestamp: timestamp,
324 lset := make(model.LabelSet, len(m.Label)+1)
325 for _, p := range m.Label {
326 lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
328 lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
330 samples = append(samples, &model.Sample{
331 Metric: model.Metric(lset),
332 Value: model.SampleValue(m.Summary.GetSampleSum()),
333 Timestamp: timestamp,
336 lset = make(model.LabelSet, len(m.Label)+1)
337 for _, p := range m.Label {
338 lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
340 lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
342 samples = append(samples, &model.Sample{
343 Metric: model.Metric(lset),
344 Value: model.SampleValue(m.Summary.GetSampleCount()),
345 Timestamp: timestamp,
352 func extractHistogram(o *DecodeOptions, f *dto.MetricFamily) model.Vector {
353 samples := make(model.Vector, 0, len(f.Metric))
355 for _, m := range f.Metric {
356 if m.Histogram == nil {
360 timestamp := o.Timestamp
361 if m.TimestampMs != nil {
362 timestamp = model.TimeFromUnixNano(*m.TimestampMs * 1000000)
367 for _, q := range m.Histogram.Bucket {
368 lset := make(model.LabelSet, len(m.Label)+2)
369 for _, p := range m.Label {
370 lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
372 lset[model.LabelName(model.BucketLabel)] = model.LabelValue(fmt.Sprint(q.GetUpperBound()))
373 lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
375 if math.IsInf(q.GetUpperBound(), +1) {
379 samples = append(samples, &model.Sample{
380 Metric: model.Metric(lset),
381 Value: model.SampleValue(q.GetCumulativeCount()),
382 Timestamp: timestamp,
386 lset := make(model.LabelSet, len(m.Label)+1)
387 for _, p := range m.Label {
388 lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
390 lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_sum")
392 samples = append(samples, &model.Sample{
393 Metric: model.Metric(lset),
394 Value: model.SampleValue(m.Histogram.GetSampleSum()),
395 Timestamp: timestamp,
398 lset = make(model.LabelSet, len(m.Label)+1)
399 for _, p := range m.Label {
400 lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
402 lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_count")
404 count := &model.Sample{
405 Metric: model.Metric(lset),
406 Value: model.SampleValue(m.Histogram.GetSampleCount()),
407 Timestamp: timestamp,
409 samples = append(samples, count)
412 // Append an infinity bucket sample.
413 lset := make(model.LabelSet, len(m.Label)+2)
414 for _, p := range m.Label {
415 lset[model.LabelName(p.GetName())] = model.LabelValue(p.GetValue())
417 lset[model.LabelName(model.BucketLabel)] = model.LabelValue("+Inf")
418 lset[model.MetricNameLabel] = model.LabelValue(f.GetName() + "_bucket")
420 samples = append(samples, &model.Sample{
421 Metric: model.Metric(lset),
423 Timestamp: timestamp,