Customise kafka-topic-exporter 06/1406/1
authorxinhuili <lxinhui@vmware.com>
Wed, 14 Aug 2019 06:33:13 +0000 (14:33 +0800)
committerxinhuili <lxinhui@vmware.com>
Wed, 14 Aug 2019 06:33:13 +0000 (14:33 +0800)
This patch is customized kafka-topic-exporter

Signed-off-by: XINHUI LI <lxinhui@vmware.com>
Change-Id: I2a66ea206fd80a729745b2ecb77fbe2df6c0d286

src/kafka-topic-exporter/.gitreview [new file with mode: 0644]
src/kafka-topic-exporter/Dockerfile [new file with mode: 0644]
src/kafka-topic-exporter/README.md [new file with mode: 0644]
src/kafka-topic-exporter/VERSION [new file with mode: 0644]
src/kafka-topic-exporter/main.go [new file with mode: 0644]
src/kafka-topic-exporter/onos-kpi.go [new file with mode: 0644]
src/kafka-topic-exporter/onos-prometheus.go [new file with mode: 0644]
src/kafka-topic-exporter/types.go [new file with mode: 0644]
src/kafka-topic-exporter/voltha-kpi.go [new file with mode: 0644]
src/kafka-topic-exporter/voltha-prometheus.go [new file with mode: 0644]

diff --git a/src/kafka-topic-exporter/.gitreview b/src/kafka-topic-exporter/.gitreview
new file mode 100644 (file)
index 0000000..dc78107
--- /dev/null
@@ -0,0 +1,6 @@
+[gerrit]
+host=gerrit.opencord.org
+port=29418
+project=kafka-topic-exporter.git
+defaultremote=origin
+defaultbranch=master
diff --git a/src/kafka-topic-exporter/Dockerfile b/src/kafka-topic-exporter/Dockerfile
new file mode 100644 (file)
index 0000000..29e46da
--- /dev/null
@@ -0,0 +1,29 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# docker build -t opencord/kafka-topic-exporter:latest .
+# docker build -t 10.90.0.101:30500/opencord/kafka-topic-exporter:latest .
+
+FROM golang:1.10-stretch as builder
+RUN mkdir /app
+ADD . /app/
+WORKDIR /app
+RUN go get github.com/prometheus/client_golang/prometheus
+RUN go get github.com/Shopify/sarama
+RUN CGO_ENABLED=0 GOOS=linux go build -o main .
+
+FROM alpine:3.8
+WORKDIR /app/
+COPY --from=builder /app/main .
+ENTRYPOINT ["./main"]
\ No newline at end of file
diff --git a/src/kafka-topic-exporter/README.md b/src/kafka-topic-exporter/README.md
new file mode 100644 (file)
index 0000000..7b10c8f
--- /dev/null
@@ -0,0 +1,27 @@
+# Kafka topic exported
+
+## Expected format
+
+```json
+{
+    "type": "slice",
+    "ts": 1536617075.762331,
+    "slice_data": [
+        {
+            "metrics": {
+                "deferreds": 119.0,
+                "rss-mb": 106.0
+            },
+            "metadata": {
+                "logical_device_id": "",
+                "title": "voltha.internal",
+                "serial_no": "",
+                "ts": 1536617075.762331,
+                "context": {
+                    "instance_id": "vcore-0"
+                },     "device_id": ""
+           
+            }
+    ]
+}
+```
\ No newline at end of file
diff --git a/src/kafka-topic-exporter/VERSION b/src/kafka-topic-exporter/VERSION
new file mode 100644 (file)
index 0000000..45a1b3f
--- /dev/null
@@ -0,0 +1 @@
+1.1.2
diff --git a/src/kafka-topic-exporter/main.go b/src/kafka-topic-exporter/main.go
new file mode 100644 (file)
index 0000000..0eba8ba
--- /dev/null
@@ -0,0 +1,97 @@
+// Copyright 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+       "flag"
+       "fmt"
+       "net/http"
+       "sync"
+
+       "github.com/Shopify/sarama"
+       "github.com/prometheus/client_golang/prometheus"
+)
+
+var (
+       broker      = flag.String("broker", "voltha-kafka.default.svc.cluster.local:9092", "The Kafka broker")
+       volthaTopic = "voltha.kpis"
+       onosTopic   = "onos.kpis"
+
+       volthaTopicPointer = &volthaTopic
+       onosTopicPointer   = &onosTopic
+)
+
+var brokers []string
+
+func kafkaInit(brokers []string) {
+       config := sarama.NewConfig()
+       config.Consumer.Return.Errors = true
+       var wg sync.WaitGroup
+
+       wg.Add(2) // we are spinning up two thread and we need to wait for them to exit before stopping the kafka connection
+
+       master, err := sarama.NewConsumer(brokers, config)
+       if err != nil {
+               fmt.Println("kafkaInit panic")
+               panic(err)
+       }
+       defer func() {
+               fmt.Println("kafkaInit close connection")
+               if err := master.Close(); err != nil {
+                       panic(err)
+               }
+       }()
+       go VOLTHAListener(volthaTopicPointer, master, wg)
+       go ONOSListener(onosTopicPointer, master, wg)
+
+       wg.Wait()
+}
+
+func runServer() {
+       fmt.Println("Starting Server")
+       http.Handle("/metrics", prometheus.Handler())
+       http.ListenAndServe(":8080", nil)
+}
+
+func init() {
+
+       // read config from cli flags
+       flag.Parse()
+       brokers = make([]string, 0)
+       brokers = []string{*broker}
+       fmt.Println("Connecting to broker: ", brokers)
+       fmt.Println("Listening to voltha on topic: ", *volthaTopicPointer)
+       fmt.Println("Listening to onos on topic: ", *onosTopicPointer)
+
+       // register metrics within Prometheus
+       prometheus.MustRegister(volthaTxBytesTotal)
+       prometheus.MustRegister(volthaRxBytesTotal)
+       prometheus.MustRegister(volthaTxPacketsTotal)
+       prometheus.MustRegister(volthaRxPacketsTotal)
+       prometheus.MustRegister(volthaTxErrorPacketsTotal)
+       prometheus.MustRegister(volthaRxErrorPacketsTotal)
+
+       prometheus.MustRegister(onosTxBytesTotal)
+       prometheus.MustRegister(onosRxBytesTotal)
+       prometheus.MustRegister(onosTxPacketsTotal)
+       prometheus.MustRegister(onosRxPacketsTotal)
+       prometheus.MustRegister(onosTxDropPacketsTotal)
+       prometheus.MustRegister(onosRxDropPacketsTotal)
+}
+
+func main() {
+       go kafkaInit(brokers)
+       runServer()
+}
diff --git a/src/kafka-topic-exporter/onos-kpi.go b/src/kafka-topic-exporter/onos-kpi.go
new file mode 100644 (file)
index 0000000..da7b120
--- /dev/null
@@ -0,0 +1,63 @@
+// Copyright 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+       "encoding/json"
+       "fmt"
+       "log"
+       "os"
+       "os/signal"
+       "sync"
+
+       "github.com/Shopify/sarama"
+)
+
+func ONOSListener(topic *string, master sarama.Consumer, wg sync.WaitGroup) {
+       fmt.Println("Starting ONOSListener")
+       defer wg.Done()
+       consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
+       if err != nil {
+               fmt.Println("ONOSListener panic")
+               panic(err)
+       }
+       signals := make(chan os.Signal, 1)
+       signal.Notify(signals, os.Interrupt)
+       doneCh := make(chan struct{})
+       go func() {
+               for {
+                       select {
+                       case err := <-consumer.Errors():
+                               fmt.Println(err)
+                       case msg := <-consumer.Messages():
+
+                               kpi := OnosKPI{}
+
+                               err := json.Unmarshal(msg.Value, &kpi)
+
+                               if err != nil {
+                                       log.Fatal(err)
+                               }
+
+                               exportOnosKPI(kpi)
+
+                       case <-signals:
+                               fmt.Println("Interrupt is detected")
+                               doneCh <- struct{}{}
+                       }
+               }
+       }()
+       <-doneCh
+}
diff --git a/src/kafka-topic-exporter/onos-prometheus.go b/src/kafka-topic-exporter/onos-prometheus.go
new file mode 100644 (file)
index 0000000..b383b55
--- /dev/null
@@ -0,0 +1,103 @@
+// Copyright 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+       "github.com/prometheus/client_golang/prometheus"
+)
+
+var (
+       onosTxBytesTotal = prometheus.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Name: "onos_tx_bytes_total",
+                       Help: "Number of total bytes transmitted",
+               },
+               []string{"device_id", "port_id"},
+       )
+       onosRxBytesTotal = prometheus.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Name: "onos_rx_bytes_total",
+                       Help: "Number of total bytes received",
+               },
+               []string{"device_id", "port_id"},
+       )
+       onosTxPacketsTotal = prometheus.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Name: "onos_tx_packets_total",
+                       Help: "Number of total packets transmitted",
+               },
+               []string{"device_id", "port_id"},
+       )
+       onosRxPacketsTotal = prometheus.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Name: "onos_rx_packets_total",
+                       Help: "Number of total packets received",
+               },
+               []string{"device_id", "port_id"},
+       )
+
+       onosTxDropPacketsTotal = prometheus.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Name: "onos_tx_drop_packets_total",
+                       Help: "Number of total transmitted packets dropped",
+               },
+               []string{"device_id", "port_id"},
+       )
+
+       onosRxDropPacketsTotal = prometheus.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Name: "onos_rx_drop_packets_total",
+                       Help: "Number of total received packets dropped",
+               },
+               []string{"device_id", "port_id"},
+       )
+)
+
+func exportOnosKPI(kpi OnosKPI) {
+
+       for _, data := range kpi.Ports {
+
+               onosTxBytesTotal.WithLabelValues(
+                       kpi.DeviceID,
+                       data.PortID,
+               ).Set(data.TxBytes)
+
+               onosRxBytesTotal.WithLabelValues(
+                       kpi.DeviceID,
+                       data.PortID,
+               ).Set(data.RxBytes)
+
+               onosTxPacketsTotal.WithLabelValues(
+                       kpi.DeviceID,
+                       data.PortID,
+               ).Set(data.TxPackets)
+
+               onosRxPacketsTotal.WithLabelValues(
+                       kpi.DeviceID,
+                       data.PortID,
+               ).Set(data.RxPackets)
+
+               onosTxDropPacketsTotal.WithLabelValues(
+                       kpi.DeviceID,
+                       data.PortID,
+               ).Set(data.TxPacketsDrop)
+
+               onosRxDropPacketsTotal.WithLabelValues(
+                       kpi.DeviceID,
+                       data.PortID,
+               ).Set(data.RxPacketsDrop)
+
+       }
+}
diff --git a/src/kafka-topic-exporter/types.go b/src/kafka-topic-exporter/types.go
new file mode 100644 (file)
index 0000000..b3ecd59
--- /dev/null
@@ -0,0 +1,81 @@
+// Copyright 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+// KPI Events format
+
+type Metrics struct {
+       TxBytes            float64 `json:"tx_bytes"`
+       TxPackets          float64 `json:"tx_packets"`
+       TxErrorPackets     float64 `json:"tx_error_packets"`
+       TxBcastPackets     float64 `json:"tx_bcast_packets"`
+       TxUnicastPackets   float64 `json:"tx_ucast_packets"`
+       TxMulticastPackets float64 `json:"tx_mcast_packets"`
+       RxBytes            float64 `json:"rx_bytes"`
+       RxPackets          float64 `json:"rx_packets"`
+       RxErrorPackets     float64 `json:"rx_error_packets"`
+       RxBcastPackets     float64 `json:"rx_bcast_packets"`
+       RxMulticastPackets float64 `json:"rx_mcast_packets"`
+
+       // ONU Ethernet_Bridge_Port_history
+       Packets            float64 `json:"packets"`
+       Octets             float64 `json:"octets"`
+}
+
+type Context struct {
+       InterfaceID string `json:"intf_id"`
+       PonID       string `json:"pon_id"`
+       PortNumber  string `json:"port_no"`
+
+       // ONU Performance Metrics
+       ParentClassId string `json:"parent_class_id"`
+       ParentEntityId string `json:"parent_entity_id"`
+       Upstream    string `json:"upstream"`
+}
+
+type Metadata struct {
+       LogicalDeviceID string   `json:"logical_device_id"`
+       Title           string   `json:"title"`
+       SerialNumber    string   `json:"serial_no"`
+       Timestamp       float64  `json:"ts"`
+       DeviceID        string   `json:"device_id"`
+       Context         *Context `json:"context"`
+}
+
+type SliceData struct {
+       Metrics  *Metrics  `json:"metrics"`
+       Metadata *Metadata `json:"metadata"`
+}
+
+type VolthaKPI struct {
+       Type       string       `json:"type"`
+       Timestamp  float64      `json:"ts"`
+       SliceDatas []*SliceData `json:"slice_data"`
+}
+
+type OnosPort struct {
+       PortID        string  `json:"portId"`
+       RxPackets     float64 `json:"pktRx"`
+       TxPackets     float64 `json:"pktTx"`
+       RxBytes       float64 `json:"bytesRx"`
+       TxBytes       float64 `json:"bytesTx"`
+       RxPacketsDrop float64 `json:"pktRxDrp"`
+       TxPacketsDrop float64 `json:"pktTxDrp"`
+}
+
+type OnosKPI struct {
+       DeviceID string      `json:"deviceId"`
+       Ports    []*OnosPort `json:"ports"`
+}
diff --git a/src/kafka-topic-exporter/voltha-kpi.go b/src/kafka-topic-exporter/voltha-kpi.go
new file mode 100644 (file)
index 0000000..122f3f0
--- /dev/null
@@ -0,0 +1,64 @@
+// Copyright 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+       "encoding/json"
+       "fmt"
+       "log"
+       "os"
+       "os/signal"
+       "sync"
+
+       "github.com/Shopify/sarama"
+)
+
+func VOLTHAListener(topic *string, master sarama.Consumer, wg sync.WaitGroup) {
+       fmt.Println("Starting VOLTHAListener")
+       defer wg.Done()
+       consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
+       if err != nil {
+               fmt.Println("VOLTHAListener panic")
+               panic(err)
+       }
+       signals := make(chan os.Signal, 1)
+       signal.Notify(signals, os.Interrupt)
+       doneCh := make(chan struct{})
+       go func() {
+               for {
+                       select {
+                       case err := <-consumer.Errors():
+                               fmt.Println(err)
+                       case msg := <-consumer.Messages():
+                               // fmt.Println(string(msg.Value))
+
+                               kpi := VolthaKPI{}
+
+                               err := json.Unmarshal(msg.Value, &kpi)
+
+                               if err != nil {
+                                       log.Fatal(err)
+                               }
+
+                               exportVolthaKPI(kpi)
+
+                       case <-signals:
+                               fmt.Println("Interrupt is detected")
+                               doneCh <- struct{}{}
+                       }
+               }
+       }()
+       <-doneCh
+}
diff --git a/src/kafka-topic-exporter/voltha-prometheus.go b/src/kafka-topic-exporter/voltha-prometheus.go
new file mode 100644 (file)
index 0000000..526010b
--- /dev/null
@@ -0,0 +1,262 @@
+// Copyright 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+       "github.com/prometheus/client_golang/prometheus"
+)
+
+var (
+       volthaTxBytesTotal = prometheus.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Name: "voltha_tx_bytes_total",
+                       Help: "Number of total bytes transmitted",
+               },
+               []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+       )
+       volthaRxBytesTotal = prometheus.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Name: "voltha_rx_bytes_total",
+                       Help: "Number of total bytes received",
+               },
+               []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+       )
+       volthaTxPacketsTotal = prometheus.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Name: "voltha_tx_packets_total",
+                       Help: "Number of total packets transmitted",
+               },
+               []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+       )
+       volthaRxPacketsTotal = prometheus.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Name: "voltha_rx_packets_total",
+                       Help: "Number of total packets received",
+               },
+               []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+       )
+
+       volthaTxErrorPacketsTotal = prometheus.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Name: "voltha_tx_error_packets_total",
+                       Help: "Number of total transmitted packets error",
+               },
+               []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+       )
+
+       volthaRxErrorPacketsTotal = prometheus.NewGaugeVec(
+               prometheus.GaugeOpts{
+                       Name: "voltha_rx_error_packets_total",
+                       Help: "Number of total received packets error",
+               },
+               []string{"logical_device_id", "serial_number", "device_id", "interface_id", "pon_id", "port_number", "title"},
+       )
+)
+
+func exportVolthaKPI(kpi VolthaKPI) {
+
+       for _, data := range kpi.SliceDatas {
+               switch title := data.Metadata.Title; title {
+               case "Ethernet", "PON":
+                       volthaTxBytesTotal.WithLabelValues(
+                               data.Metadata.LogicalDeviceID,
+                               data.Metadata.SerialNumber,
+                               data.Metadata.DeviceID,
+                               data.Metadata.Context.InterfaceID,
+                               data.Metadata.Context.PonID,
+                               data.Metadata.Context.PortNumber,
+                               data.Metadata.Title,
+                       ).Set(data.Metrics.TxBytes)
+
+                       volthaRxBytesTotal.WithLabelValues(
+                               data.Metadata.LogicalDeviceID,
+                               data.Metadata.SerialNumber,
+                               data.Metadata.DeviceID,
+                               data.Metadata.Context.InterfaceID,
+                               data.Metadata.Context.PonID,
+                               data.Metadata.Context.PortNumber,
+                               data.Metadata.Title,
+                       ).Set(data.Metrics.RxBytes)
+
+                       volthaTxPacketsTotal.WithLabelValues(
+                               data.Metadata.LogicalDeviceID,
+                               data.Metadata.SerialNumber,
+                               data.Metadata.DeviceID,
+                               data.Metadata.Context.InterfaceID,
+                               data.Metadata.Context.PonID,
+                               data.Metadata.Context.PortNumber,
+                               data.Metadata.Title,
+                       ).Set(data.Metrics.TxPackets)
+
+                       volthaRxPacketsTotal.WithLabelValues(
+                               data.Metadata.LogicalDeviceID,
+                               data.Metadata.SerialNumber,
+                               data.Metadata.DeviceID,
+                               data.Metadata.Context.InterfaceID,
+                               data.Metadata.Context.PonID,
+                               data.Metadata.Context.PortNumber,
+                               data.Metadata.Title,
+                       ).Set(data.Metrics.RxPackets)
+
+                       volthaTxErrorPacketsTotal.WithLabelValues(
+                               data.Metadata.LogicalDeviceID,
+                               data.Metadata.SerialNumber,
+                               data.Metadata.DeviceID,
+                               data.Metadata.Context.InterfaceID,
+                               data.Metadata.Context.PonID,
+                               data.Metadata.Context.PortNumber,
+                               data.Metadata.Title,
+                       ).Set(data.Metrics.TxErrorPackets)
+
+                       volthaRxErrorPacketsTotal.WithLabelValues(
+                               data.Metadata.LogicalDeviceID,
+                               data.Metadata.SerialNumber,
+                               data.Metadata.DeviceID,
+                               data.Metadata.Context.InterfaceID,
+                               data.Metadata.Context.PonID,
+                               data.Metadata.Context.PortNumber,
+                               data.Metadata.Title,
+                       ).Set(data.Metrics.RxErrorPackets)
+
+                       // TODO add metrics for:
+                       // TxBcastPackets
+                       // TxUnicastPackets
+                       // TxMulticastPackets
+                       // RxBcastPackets
+                       // RxMulticastPackets
+
+               case "Ethernet_Bridge_Port_History":
+                       if data.Metadata.Context.Upstream == "True" {
+                               // ONU. Extended Ethernet statistics.
+                               volthaTxPacketsTotal.WithLabelValues(
+                                       data.Metadata.LogicalDeviceID,
+                                       data.Metadata.SerialNumber,
+                                       data.Metadata.DeviceID,
+                                       "NA", // InterfaceID
+                                       "NA", // PonID
+                                       "NA", // PortNumber
+                                       data.Metadata.Title,
+                               ).Add(data.Metrics.Packets)
+
+                               volthaTxBytesTotal.WithLabelValues(
+                                       data.Metadata.LogicalDeviceID,
+                                       data.Metadata.SerialNumber,
+                                       data.Metadata.DeviceID,
+                                       "NA", // InterfaceID
+                                       "NA", // PonID
+                                       "NA", // PortNumber
+                                       data.Metadata.Title,
+                               ).Add(data.Metrics.Octets)
+                       } else {
+                               // ONU. Extended Ethernet statistics.
+                               volthaRxPacketsTotal.WithLabelValues(
+                                       data.Metadata.LogicalDeviceID,
+                                       data.Metadata.SerialNumber,
+                                       data.Metadata.DeviceID,
+                                       "NA", // InterfaceID
+                                       "NA", // PonID
+                                       "NA", // PortNumber
+                                       data.Metadata.Title,
+                               ).Add(data.Metrics.Packets)
+
+                               volthaRxBytesTotal.WithLabelValues(
+                                       data.Metadata.LogicalDeviceID,
+                                       data.Metadata.SerialNumber,
+                                       data.Metadata.DeviceID,
+                                       "NA", // InterfaceID
+                                       "NA", // PonID
+                                       "NA", // PortNumber
+                                       data.Metadata.Title,
+                               ).Add(data.Metrics.Octets)
+                       }
+
+               case "Ethernet_UNI_History":
+                       // ONU. Do nothing.
+
+               case "FEC_History":
+                       // ONU. Do Nothing.
+
+                       volthaTxBytesTotal.WithLabelValues(
+                               data.Metadata.LogicalDeviceID,
+                               data.Metadata.SerialNumber,
+                               data.Metadata.DeviceID,
+                               data.Metadata.Context.InterfaceID,
+                               data.Metadata.Context.PonID,
+                               data.Metadata.Context.PortNumber,
+                               data.Metadata.Title,
+                       ).Set(data.Metrics.TxBytes)
+
+                       volthaRxBytesTotal.WithLabelValues(
+                               data.Metadata.LogicalDeviceID,
+                               data.Metadata.SerialNumber,
+                               data.Metadata.DeviceID,
+                               data.Metadata.Context.InterfaceID,
+                               data.Metadata.Context.PonID,
+                               data.Metadata.Context.PortNumber,
+                               data.Metadata.Title,
+                       ).Set(data.Metrics.RxBytes)
+
+                       volthaTxPacketsTotal.WithLabelValues(
+                               data.Metadata.LogicalDeviceID,
+                               data.Metadata.SerialNumber,
+                               data.Metadata.DeviceID,
+                               data.Metadata.Context.InterfaceID,
+                               data.Metadata.Context.PonID,
+                               data.Metadata.Context.PortNumber,
+                               data.Metadata.Title,
+                       ).Set(data.Metrics.TxPackets)
+
+                       volthaRxPacketsTotal.WithLabelValues(
+                               data.Metadata.LogicalDeviceID,
+                               data.Metadata.SerialNumber,
+                               data.Metadata.DeviceID,
+                               data.Metadata.Context.InterfaceID,
+                               data.Metadata.Context.PonID,
+                               data.Metadata.Context.PortNumber,
+                               data.Metadata.Title,
+                       ).Set(data.Metrics.RxPackets)
+
+                       volthaTxErrorPacketsTotal.WithLabelValues(
+                               data.Metadata.LogicalDeviceID,
+                               data.Metadata.SerialNumber,
+                               data.Metadata.DeviceID,
+                               data.Metadata.Context.InterfaceID,
+                               data.Metadata.Context.PonID,
+                               data.Metadata.Context.PortNumber,
+                               data.Metadata.Title,
+                       ).Set(data.Metrics.TxErrorPackets)
+
+                       volthaRxErrorPacketsTotal.WithLabelValues(
+                               data.Metadata.LogicalDeviceID,
+                               data.Metadata.SerialNumber,
+                               data.Metadata.DeviceID,
+                               data.Metadata.Context.InterfaceID,
+                               data.Metadata.Context.PonID,
+                               data.Metadata.Context.PortNumber,
+                               data.Metadata.Title,
+                       ).Set(data.Metrics.RxErrorPackets)
+
+                       // TODO add metrics for:
+                       // TxBcastPackets
+                       // TxUnicastPackets
+                       // TxMulticastPackets
+                       // RxBcastPackets
+                       // RxMulticastPackets
+
+               case "voltha.internal":
+                       // Voltha Internal. Do nothing.
+               }
+       }
+}