From: xinhuili Date: Wed, 14 Aug 2019 06:33:13 +0000 (+0800) Subject: Customise kafka-topic-exporter X-Git-Url: https://gerrit.akraino.org/r/gitweb?a=commitdiff_plain;h=f963aebfae5d868317912342cf651092171dbf91;p=iec%2Fxconnect.git Customise kafka-topic-exporter This patch is customized kafka-topic-exporter Signed-off-by: XINHUI LI Change-Id: I2a66ea206fd80a729745b2ecb77fbe2df6c0d286 --- diff --git a/src/kafka-topic-exporter/.gitreview b/src/kafka-topic-exporter/.gitreview new file mode 100644 index 0000000..dc78107 --- /dev/null +++ b/src/kafka-topic-exporter/.gitreview @@ -0,0 +1,6 @@ +[gerrit] +host=gerrit.opencord.org +port=29418 +project=kafka-topic-exporter.git +defaultremote=origin +defaultbranch=master diff --git a/src/kafka-topic-exporter/Dockerfile b/src/kafka-topic-exporter/Dockerfile new file mode 100644 index 0000000..29e46da --- /dev/null +++ b/src/kafka-topic-exporter/Dockerfile @@ -0,0 +1,29 @@ +# Copyright 2018-present Open Networking Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# docker build -t opencord/kafka-topic-exporter:latest . +# docker build -t 10.90.0.101:30500/opencord/kafka-topic-exporter:latest . + +FROM golang:1.10-stretch as builder +RUN mkdir /app +ADD . /app/ +WORKDIR /app +RUN go get github.com/prometheus/client_golang/prometheus +RUN go get github.com/Shopify/sarama +RUN CGO_ENABLED=0 GOOS=linux go build -o main . + +FROM alpine:3.8 +WORKDIR /app/ +COPY --from=builder /app/main . +ENTRYPOINT ["./main"] \ No newline at end of file diff --git a/src/kafka-topic-exporter/README.md b/src/kafka-topic-exporter/README.md new file mode 100644 index 0000000..7b10c8f --- /dev/null +++ b/src/kafka-topic-exporter/README.md @@ -0,0 +1,27 @@ +# Kafka topic exported + +## Expected format + +```json +{ + "type": "slice", + "ts": 1536617075.762331, + "slice_data": [ + { + "metrics": { + "deferreds": 119.0, + "rss-mb": 106.0 + }, + "metadata": { + "logical_device_id": "", + "title": "voltha.internal", + "serial_no": "", + "ts": 1536617075.762331, + "context": { + "instance_id": "vcore-0" + }, "device_id": "" + + } + ] +} +``` \ No newline at end of file diff --git a/src/kafka-topic-exporter/VERSION b/src/kafka-topic-exporter/VERSION new file mode 100644 index 0000000..45a1b3f --- /dev/null +++ b/src/kafka-topic-exporter/VERSION @@ -0,0 +1 @@ +1.1.2 diff --git a/src/kafka-topic-exporter/main.go b/src/kafka-topic-exporter/main.go new file mode 100644 index 0000000..0eba8ba --- /dev/null +++ b/src/kafka-topic-exporter/main.go @@ -0,0 +1,97 @@ +// Copyright 2018 Open Networking Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "fmt" + "net/http" + "sync" + + "github.com/Shopify/sarama" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + broker = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker") + volthaTopic = "voltha.kpis" + onosTopic = "onos.kpis" + + volthaTopicPointer = &volthaTopic + onosTopicPointer = &onosTopic +) + +var brokers []string + +func kafkaInit(brokers []string) { + config := sarama.NewConfig() + config.Consumer.Return.Errors = true + var wg sync.WaitGroup + + wg.Add(2) // we are spinning up two thread and we need to wait for them to exit before stopping the kafka connection + + master, err := sarama.NewConsumer(brokers, config) + if err != nil { + fmt.Println("kafkaInit panic") + panic(err) + } + defer func() { + fmt.Println("kafkaInit close connection") + if err := master.Close(); err != nil { + panic(err) + } + }() + go VOLTHAListener(volthaTopicPointer, master, wg) + go ONOSListener(onosTopicPointer, master, wg) + + wg.Wait() +} + +func runServer() { + fmt.Println("Starting Server") + http.Handle("/metrics", prometheus.Handler()) + http.ListenAndServe(":8080", nil) +} + +func init() { + + // read config from cli flags + flag.Parse() + brokers = make([]string, 0) + brokers = []string{*broker} + fmt.Println("Connecting to broker: ", brokers) + fmt.Println("Listening to voltha on topic: ", *volthaTopicPointer) + fmt.Println("Listening to onos on topic: ", *onosTopicPointer) + + // register metrics within Prometheus + prometheus.MustRegister(volthaTxBytesTotal) + prometheus.MustRegister(volthaRxBytesTotal) + prometheus.MustRegister(volthaTxPacketsTotal) + prometheus.MustRegister(volthaRxPacketsTotal) + prometheus.MustRegister(volthaTxErrorPacketsTotal) + prometheus.MustRegister(volthaRxErrorPacketsTotal) + + prometheus.MustRegister(onosTxBytesTotal) + prometheus.MustRegister(onosRxBytesTotal) + prometheus.MustRegister(onosTxPacketsTotal) + prometheus.MustRegister(onosRxPacketsTotal) + prometheus.MustRegister(onosTxDropPacketsTotal) + prometheus.MustRegister(onosRxDropPacketsTotal) +} + +func main() { + go kafkaInit(brokers) + runServer() +} diff --git a/src/kafka-topic-exporter/onos-kpi.go b/src/kafka-topic-exporter/onos-kpi.go new file mode 100644 index 0000000..da7b120 --- /dev/null +++ b/src/kafka-topic-exporter/onos-kpi.go @@ -0,0 +1,63 @@ +// Copyright 2018 Open Networking Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "encoding/json" + "fmt" + "log" + "os" + "os/signal" + "sync" + + "github.com/Shopify/sarama" +) + +func ONOSListener(topic *string, master sarama.Consumer, wg sync.WaitGroup) { + fmt.Println("Starting ONOSListener") + defer wg.Done() + consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest) + if err != nil { + fmt.Println("ONOSListener panic") + panic(err) + } + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + doneCh := make(chan struct{}) + go func() { + for { + select { + case err := <-consumer.Errors(): + fmt.Println(err) + case msg := <-consumer.Messages(): + + kpi := OnosKPI{} + + err := json.Unmarshal(msg.Value, &kpi) + + if err != nil { + log.Fatal(err) + } + + exportOnosKPI(kpi) + + case <-signals: + fmt.Println("Interrupt is detected") + doneCh <- struct{}{} + } + } + }() + <-doneCh +} diff --git a/src/kafka-topic-exporter/onos-prometheus.go b/src/kafka-topic-exporter/onos-prometheus.go new file mode 100644 index 0000000..b383b55 --- /dev/null +++ b/src/kafka-topic-exporter/onos-prometheus.go @@ -0,0 +1,103 @@ +// Copyright 2018 Open Networking Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + onosTxBytesTotal = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "onos_tx_bytes_total", + Help: "Number of total bytes transmitted", + }, + []string{"device_id", "port_id"}, + ) + onosRxBytesTotal = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "onos_rx_bytes_total", + Help: "Number of total bytes received", + }, + []string{"device_id", "port_id"}, + ) + onosTxPacketsTotal = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "onos_tx_packets_total", + Help: "Number of total packets transmitted", + }, + []string{"device_id", "port_id"}, + ) + onosRxPacketsTotal = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "onos_rx_packets_total", + Help: "Number of total packets received", + }, + []string{"device_id", "port_id"}, + ) + + onosTxDropPacketsTotal = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "onos_tx_drop_packets_total", + Help: "Number of total transmitted packets dropped", + }, + []string{"device_id", "port_id"}, + ) + + onosRxDropPacketsTotal = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "onos_rx_drop_packets_total", + Help: "Number of total received packets dropped", + }, + []string{"device_id", "port_id"}, + ) +) + +func exportOnosKPI(kpi OnosKPI) { + + for _, data := range kpi.Ports { + + onosTxBytesTotal.WithLabelValues( + kpi.DeviceID, + data.PortID, + ).Set(data.TxBytes) + + onosRxBytesTotal.WithLabelValues( + kpi.DeviceID, + data.PortID, + ).Set(data.RxBytes) + + onosTxPacketsTotal.WithLabelValues( + kpi.DeviceID, + data.PortID, + ).Set(data.TxPackets) + + onosRxPacketsTotal.WithLabelValues( + kpi.DeviceID, + data.PortID, + ).Set(data.RxPackets) + + onosTxDropPacketsTotal.WithLabelValues( + kpi.DeviceID, + data.PortID, + ).Set(data.TxPacketsDrop) + + onosRxDropPacketsTotal.WithLabelValues( + kpi.DeviceID, + data.PortID, + ).Set(data.RxPacketsDrop) + + } +} diff --git a/src/kafka-topic-exporter/types.go b/src/kafka-topic-exporter/types.go new file mode 100644 index 0000000..b3ecd59 --- /dev/null +++ b/src/kafka-topic-exporter/types.go @@ -0,0 +1,81 @@ +// Copyright 2018 Open Networking Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +// KPI Events format + +type Metrics struct { + TxBytes float64 `json:"tx_bytes"` + TxPackets float64 `json:"tx_packets"` + TxErrorPackets float64 `json:"tx_error_packets"` + TxBcastPackets float64 `json:"tx_bcast_packets"` + TxUnicastPackets float64 `json:"tx_ucast_packets"` + TxMulticastPackets float64 `json:"tx_mcast_packets"` + RxBytes float64 `json:"rx_bytes"` + RxPackets float64 `json:"rx_packets"` + RxErrorPackets float64 `json:"rx_error_packets"` + RxBcastPackets float64 `json:"rx_bcast_packets"` + RxMulticastPackets float64 `json:"rx_mcast_packets"` + + // ONU Ethernet_Bridge_Port_history + Packets float64 `json:"packets"` + Octets float64 `json:"octets"` +} + +type Context struct { + InterfaceID string `json:"intf_id"` + PonID string `json:"pon_id"` + PortNumber string `json:"port_no"` + + // ONU Performance Metrics + ParentClassId string `json:"parent_class_id"` + ParentEntityId string `json:"parent_entity_id"` + Upstream string `json:"upstream"` +} + +type Metadata struct { + LogicalDeviceID string `json:"logical_device_id"` + Title string `json:"title"` + SerialNumber string `json:"serial_no"` + Timestamp float64 `json:"ts"` + DeviceID string `json:"device_id"` + Context *Context `json:"context"` +} + +type SliceData struct { + Metrics *Metrics `json:"metrics"` + Metadata *Metadata `json:"metadata"` +} + +type VolthaKPI struct { + Type string `json:"type"` + Timestamp float64 `json:"ts"` + SliceDatas []*SliceData `json:"slice_data"` +} + +type OnosPort struct { + PortID string `json:"portId"` + RxPackets float64 `json:"pktRx"` + TxPackets float64 `json:"pktTx"` + RxBytes float64 `json:"bytesRx"` + TxBytes float64 `json:"bytesTx"` + RxPacketsDrop float64 `json:"pktRxDrp"` + TxPacketsDrop float64 `json:"pktTxDrp"` +} + +type OnosKPI struct { + DeviceID string `json:"deviceId"` + Ports []*OnosPort `json:"ports"` +} diff --git a/src/kafka-topic-exporter/voltha-kpi.go b/src/kafka-topic-exporter/voltha-kpi.go new file mode 100644 index 0000000..122f3f0 --- /dev/null +++ b/src/kafka-topic-exporter/voltha-kpi.go @@ -0,0 +1,64 @@ +// Copyright 2018 Open Networking Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "encoding/json" + "fmt" + "log" + "os" + "os/signal" + "sync" + + "github.com/Shopify/sarama" +) + +func VOLTHAListener(topic *string, master sarama.Consumer, wg sync.WaitGroup) { + fmt.Println("Starting VOLTHAListener") + defer wg.Done() + consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest) + if err != nil { + fmt.Println("VOLTHAListener panic") + panic(err) + } + signals := make(chan os.Signal, 1) + signal.Notify(signals, os.Interrupt) + doneCh := make(chan struct{}) + go func() { + for { + select { + case err := <-consumer.Errors(): + fmt.Println(err) + case msg := <-consumer.Messages(): + // fmt.Println(string(msg.Value)) + + kpi := VolthaKPI{} + + err := json.Unmarshal(msg.Value, &kpi) + + if err != nil { + log.Fatal(err) + } + + exportVolthaKPI(kpi) + + case <-signals: + fmt.Println("Interrupt is detected") + doneCh <- struct{}{} + } + } + }() + <-doneCh +} diff --git a/src/kafka-topic-exporter/voltha-prometheus.go b/src/kafka-topic-exporter/voltha-prometheus.go new file mode 100644 index 0000000..526010b --- /dev/null +++ b/src/kafka-topic-exporter/voltha-prometheus.go @@ -0,0 +1,262 @@ +// Copyright 2018 Open Networking Foundation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + volthaTxBytesTotal = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "voltha_tx_bytes_total", + Help: "Number of total bytes transmitted", + }, + []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"}, + ) + volthaRxBytesTotal = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "voltha_rx_bytes_total", + Help: "Number of total bytes received", + }, + []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"}, + ) + volthaTxPacketsTotal = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "voltha_tx_packets_total", + Help: "Number of total packets transmitted", + }, + []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"}, + ) + volthaRxPacketsTotal = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "voltha_rx_packets_total", + Help: "Number of total packets received", + }, + []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"}, + ) + + volthaTxErrorPacketsTotal = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "voltha_tx_error_packets_total", + Help: "Number of total transmitted packets error", + }, + []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"}, + ) + + volthaRxErrorPacketsTotal = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "voltha_rx_error_packets_total", + Help: "Number of total received packets error", + }, + []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"}, + ) +) + +func exportVolthaKPI(kpi VolthaKPI) { + + for _, data := range kpi.SliceDatas { + switch title := data.Metadata.Title; title { + case "Ethernet", "PON": + volthaTxBytesTotal.WithLabelValues( + data.Metadata.LogicalDeviceID, + data.Metadata.SerialNumber, + data.Metadata.DeviceID, + data.Metadata.Context.InterfaceID, + data.Metadata.Context.PonID, + data.Metadata.Context.PortNumber, + data.Metadata.Title, + ).Set(data.Metrics.TxBytes) + + volthaRxBytesTotal.WithLabelValues( + data.Metadata.LogicalDeviceID, + data.Metadata.SerialNumber, + data.Metadata.DeviceID, + data.Metadata.Context.InterfaceID, + data.Metadata.Context.PonID, + data.Metadata.Context.PortNumber, + data.Metadata.Title, + ).Set(data.Metrics.RxBytes) + + volthaTxPacketsTotal.WithLabelValues( + data.Metadata.LogicalDeviceID, + data.Metadata.SerialNumber, + data.Metadata.DeviceID, + data.Metadata.Context.InterfaceID, + data.Metadata.Context.PonID, + data.Metadata.Context.PortNumber, + data.Metadata.Title, + ).Set(data.Metrics.TxPackets) + + volthaRxPacketsTotal.WithLabelValues( + data.Metadata.LogicalDeviceID, + data.Metadata.SerialNumber, + data.Metadata.DeviceID, + data.Metadata.Context.InterfaceID, + data.Metadata.Context.PonID, + data.Metadata.Context.PortNumber, + data.Metadata.Title, + ).Set(data.Metrics.RxPackets) + + volthaTxErrorPacketsTotal.WithLabelValues( + data.Metadata.LogicalDeviceID, + data.Metadata.SerialNumber, + data.Metadata.DeviceID, + data.Metadata.Context.InterfaceID, + data.Metadata.Context.PonID, + data.Metadata.Context.PortNumber, + data.Metadata.Title, + ).Set(data.Metrics.TxErrorPackets) + + volthaRxErrorPacketsTotal.WithLabelValues( + data.Metadata.LogicalDeviceID, + data.Metadata.SerialNumber, + data.Metadata.DeviceID, + data.Metadata.Context.InterfaceID, + data.Metadata.Context.PonID, + data.Metadata.Context.PortNumber, + data.Metadata.Title, + ).Set(data.Metrics.RxErrorPackets) + + // TODO add metrics for: + // TxBcastPackets + // TxUnicastPackets + // TxMulticastPackets + // RxBcastPackets + // RxMulticastPackets + + case "Ethernet_Bridge_Port_History": + if data.Metadata.Context.Upstream == "True" { + // ONU. Extended Ethernet statistics. + volthaTxPacketsTotal.WithLabelValues( + data.Metadata.LogicalDeviceID, + data.Metadata.SerialNumber, + data.Metadata.DeviceID, + "NA", // InterfaceID + "NA", // PonID + "NA", // PortNumber + data.Metadata.Title, + ).Add(data.Metrics.Packets) + + volthaTxBytesTotal.WithLabelValues( + data.Metadata.LogicalDeviceID, + data.Metadata.SerialNumber, + data.Metadata.DeviceID, + "NA", // InterfaceID + "NA", // PonID + "NA", // PortNumber + data.Metadata.Title, + ).Add(data.Metrics.Octets) + } else { + // ONU. Extended Ethernet statistics. + volthaRxPacketsTotal.WithLabelValues( + data.Metadata.LogicalDeviceID, + data.Metadata.SerialNumber, + data.Metadata.DeviceID, + "NA", // InterfaceID + "NA", // PonID + "NA", // PortNumber + data.Metadata.Title, + ).Add(data.Metrics.Packets) + + volthaRxBytesTotal.WithLabelValues( + data.Metadata.LogicalDeviceID, + data.Metadata.SerialNumber, + data.Metadata.DeviceID, + "NA", // InterfaceID + "NA", // PonID + "NA", // PortNumber + data.Metadata.Title, + ).Add(data.Metrics.Octets) + } + + case "Ethernet_UNI_History": + // ONU. Do nothing. + + case "FEC_History": + // ONU. Do Nothing. + + volthaTxBytesTotal.WithLabelValues( + data.Metadata.LogicalDeviceID, + data.Metadata.SerialNumber, + data.Metadata.DeviceID, + data.Metadata.Context.InterfaceID, + data.Metadata.Context.PonID, + data.Metadata.Context.PortNumber, + data.Metadata.Title, + ).Set(data.Metrics.TxBytes) + + volthaRxBytesTotal.WithLabelValues( + data.Metadata.LogicalDeviceID, + data.Metadata.SerialNumber, + data.Metadata.DeviceID, + data.Metadata.Context.InterfaceID, + data.Metadata.Context.PonID, + data.Metadata.Context.PortNumber, + data.Metadata.Title, + ).Set(data.Metrics.RxBytes) + + volthaTxPacketsTotal.WithLabelValues( + data.Metadata.LogicalDeviceID, + data.Metadata.SerialNumber, + data.Metadata.DeviceID, + data.Metadata.Context.InterfaceID, + data.Metadata.Context.PonID, + data.Metadata.Context.PortNumber, + data.Metadata.Title, + ).Set(data.Metrics.TxPackets) + + volthaRxPacketsTotal.WithLabelValues( + data.Metadata.LogicalDeviceID, + data.Metadata.SerialNumber, + data.Metadata.DeviceID, + data.Metadata.Context.InterfaceID, + data.Metadata.Context.PonID, + data.Metadata.Context.PortNumber, + data.Metadata.Title, + ).Set(data.Metrics.RxPackets) + + volthaTxErrorPacketsTotal.WithLabelValues( + data.Metadata.LogicalDeviceID, + data.Metadata.SerialNumber, + data.Metadata.DeviceID, + data.Metadata.Context.InterfaceID, + data.Metadata.Context.PonID, + data.Metadata.Context.PortNumber, + data.Metadata.Title, + ).Set(data.Metrics.TxErrorPackets) + + volthaRxErrorPacketsTotal.WithLabelValues( + data.Metadata.LogicalDeviceID, + data.Metadata.SerialNumber, + data.Metadata.DeviceID, + data.Metadata.Context.InterfaceID, + data.Metadata.Context.PonID, + data.Metadata.Context.PortNumber, + data.Metadata.Title, + ).Set(data.Metrics.RxErrorPackets) + + // TODO add metrics for: + // TxBcastPackets + // TxUnicastPackets + // TxMulticastPackets + // RxBcastPackets + // RxMulticastPackets + + case "voltha.internal": + // Voltha Internal. Do nothing. + } + } +}