Skip to content

Commit

Permalink
feat(parser): add parser opentsdb_json
Browse files Browse the repository at this point in the history
  • Loading branch information
zipper-meng committed Aug 1, 2024
1 parent 621864a commit b790eb9
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 0 deletions.
5 changes: 5 additions & 0 deletions plugins/parsers/all/opentsdb_json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build !custom || parsers || parsers.opentsdbtelnet

package all

import _ "github.com/influxdata/telegraf/plugins/parsers/opentsdb_json" // register plugin
34 changes: 34 additions & 0 deletions plugins/parsers/opentsdb_json/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# OpenTSDB JSON Style Parser Plugin

## Configuration

```toml
[[inputs.file]]
files = ["example"]
data_format = "opentsdb_json"
```

## Example

```json
[
{
"metric": "sys.cpu.nice",
"timestamp": 1346846400,
"value": 18,
"tags": {
"host": "web01",
"dc": "lga"
}
},
{
"metric": "sys.cpu.nice",
"timestamp": 1346846400,
"value": 9,
"tags": {
"host": "web02",
"dc": "lga"
}
}
]
```
103 changes: 103 additions & 0 deletions plugins/parsers/opentsdb_json/parser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package opentsdb

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
"github.com/influxdata/telegraf/plugins/parsers"
)

func init() {
parsers.Add("opentsdb_json",
func(_ string) telegraf.Parser {
return &Parser{}
},
)
}

type point struct {
Metric string `json:"metric"`
Time int64 `json:"timestamp"`
Value float64 `json:"value"`
Tags map[string]string `json:"tags,omitempty"`
}

type Parser struct {
DefaultTags map[string]string `toml:"-"`
}

func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) {
var multi bool
switch buf[0] {
case '{':
case '[':
multi = true
default:
return nil, errors.New("expected JSON array or hash")
}

points := make([]point, 1)
if dec := json.NewDecoder(bytes.NewReader(buf)); multi {
if err := dec.Decode(&points); err != nil {
return nil, errors.New("json array decode error for data format: opentsdb")
}
} else {
if err := dec.Decode(&points[0]); err != nil {
return nil, errors.New("json object decode error for data format: opentsdb")
}
}

metrics := make([]telegraf.Metric, 0, len(points))
for i := range points {
pt := points[i]

// Convert timestamp to Go time.
// If time value is over a billion then it's microseconds.
var ts time.Time
if pt.Time < 10000000000 {
ts = time.Unix(pt.Time, 0)
} else {
ts = time.Unix(pt.Time/1000, (pt.Time%1000)*1000)
}

var tags map[string]string
if len(p.DefaultTags) > 0 {
tags = make(map[string]string)
for k, v := range p.DefaultTags {
tags[k] = v
}
for k, v := range pt.Tags {
tags[k] = v
}
} else {
tags = pt.Tags
}

mt := metric.New(pt.Metric, tags, map[string]interface{}{"value": pt.Value}, ts)
metrics = append(metrics, mt)
}

return metrics, nil
}

func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line))
if err != nil {
return nil, err
}

if len(metrics) < 1 {
return nil, fmt.Errorf("can not parse the line: %s, for data format: opentsdb", line)
}

return metrics[0], nil
}

func (p *Parser) SetDefaultTags(tags map[string]string) {
p.DefaultTags = tags
}
71 changes: 71 additions & 0 deletions plugins/parsers/opentsdb_json/parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package opentsdb

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

const JsonArray = `[
{
"metric": "sys.cpu.nice",
"timestamp": 1346846400,
"value": 18,
"tags": {
"host": "web01",
"dc": "lga"
}
},
{
"metric": "sys.cpu.nice",
"timestamp": 1346846400,
"value": 9,
"tags": {
"host": "web02",
"dc": "lga"
}
}
]`

const JsonObject = `{
"metric": "sys.cpu.nice",
"timestamp": 1346846400,
"value": 18,
"tags": {
"host": "web01",
"dc": "lga"
}
}`

func TestParseJSONArray(t *testing.T) {
parser := &Parser{}
metrics, err := parser.Parse([]byte(JsonArray))
require.NoError(t, err)
require.Len(t, metrics, 2)
require.Equal(t, "sys.cpu.nice", metrics[0].Name())
require.Equal(t, map[string]string{
"host": "web01",
"dc": "lga",
}, metrics[0].Tags())
require.Equal(t, map[string]interface{}{
"value": float64(18),
}, metrics[0].Fields())
require.Equal(t, time.Unix(1346846400, 0), metrics[0].Time())
}

func TestParseJSONObject(t *testing.T) {
parser := &Parser{}
metrics, err := parser.Parse([]byte(JsonObject))
require.NoError(t, err)
require.Len(t, metrics, 1)
require.Equal(t, "sys.cpu.nice", metrics[0].Name())
require.Equal(t, map[string]string{
"host": "web01",
"dc": "lga",
}, metrics[0].Tags())
require.Equal(t, map[string]interface{}{
"value": float64(18),
}, metrics[0].Fields())
require.Equal(t, time.Unix(1346846400, 0), metrics[0].Time())
}

0 comments on commit b790eb9

Please sign in to comment.