Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

json_extract partial json and prefix #773

Merged
merged 8 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,12 @@ It transforms `{"server":{"os":"linux","arch":"amd64"}}` into `{"server":"{\"os\
[More details...](plugin/action/json_encode/README.md)
## json_extract
It extracts fields from JSON-encoded event field and adds extracted fields to the event root.

The plugin extracts fields on the go and can work with incomplete JSON (e.g. it was cut by max size limit).
If the field value is incomplete JSON string, fields can be extracted from the remaining part which must be the first half of JSON,
e.g. fields can be extracted from `{"service":"test","message":"long message"`, but not from `"service":"test","message:"long message"}`
because the start as a valid JSON matters.

> If extracted field already exists in the event root, it will be overridden.

[More details...](plugin/action/json_extract/README.md)
Expand Down
6 changes: 6 additions & 0 deletions plugin/action/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,12 @@ It transforms `{"server":{"os":"linux","arch":"amd64"}}` into `{"server":"{\"os\
[More details...](plugin/action/json_encode/README.md)
## json_extract
It extracts fields from JSON-encoded event field and adds extracted fields to the event root.

The plugin extracts fields on the go and can work with incomplete JSON (e.g. it was cut by max size limit).
If the field value is incomplete JSON string, fields can be extracted from the remaining part which must be the first half of JSON,
e.g. fields can be extracted from `{"service":"test","message":"long message"`, but not from `"service":"test","message:"long message"}`
because the start as a valid JSON matters.

> If extracted field already exists in the event root, it will be overridden.

[More details...](plugin/action/json_extract/README.md)
Expand Down
5 changes: 1 addition & 4 deletions plugin/action/decode/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,10 +570,7 @@ func (p *Plugin) decodeProtobuf(root *insaneJSON.Root, node *insaneJSON.Node, bu
}

func (p *Plugin) addFieldPrefix(root *insaneJSON.Root, key string, val []byte) {
if p.config.Prefix != "" {
key = fmt.Sprintf("%s%s", p.config.Prefix, key)
}
root.AddFieldNoAlloc(root, key).MutateToBytesCopy(root, val)
root.AddFieldNoAlloc(root, p.config.Prefix+key).MutateToBytesCopy(root, val)
}

func (p *Plugin) checkError(err error, node *insaneJSON.Node) bool {
Expand Down
41 changes: 41 additions & 0 deletions plugin/action/json_extract/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# JSON extract plugin
It extracts fields from JSON-encoded event field and adds extracted fields to the event root.

The plugin extracts fields on the go and can work with incomplete JSON (e.g. it was cut by max size limit).
If the field value is incomplete JSON string, fields can be extracted from the remaining part which must be the first half of JSON,
e.g. fields can be extracted from `{"service":"test","message":"long message"`, but not from `"service":"test","message:"long message"}`
because the start as a valid JSON matters.

> If extracted field already exists in the event root, it will be overridden.

## Examples
Expand Down Expand Up @@ -38,6 +44,35 @@ The resulting event:
"flags": ["flag1", "flag2"]
}
```
---
```yaml
pipelines:
example_pipeline:
...
actions:
- type: json_extract
field: log
extract_fields:
- extract1
- extract2
prefix: ext_
...
```
The original event:
```json
{
"log": "{\"level\":\"error\",\"extract1\":\"data1\",\"extract2\":\"long message ...",
"time": "2024-03-01T10:49:28.263317941Z"
}
```
The resulting event:
```json
{
"log": "{\"level\":\"error\",\"extract1\":\"data1\",\"extract2\":\"long message ...",
"time": "2024-03-01T10:49:28.263317941Z",
"ext_extract1": "data1"
}
```

## Benchmarks
Performance comparison of `json_extract` and `json_decode` plugins.
Expand Down Expand Up @@ -82,5 +117,11 @@ Fields to extract.

<br>

**`prefix`** *`string`*

A prefix to add to extracted field keys.

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
95 changes: 81 additions & 14 deletions plugin/action/json_extract/json_extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import (

/*{ introduction
It extracts fields from JSON-encoded event field and adds extracted fields to the event root.

The plugin extracts fields on the go and can work with incomplete JSON (e.g. it was cut by max size limit).
If the field value is incomplete JSON string, fields can be extracted from the remaining part which must be the first half of JSON,
e.g. fields can be extracted from `{"service":"test","message":"long message"`, but not from `"service":"test","message:"long message"}`
because the start as a valid JSON matters.

> If extracted field already exists in the event root, it will be overridden.
}*/

Expand Down Expand Up @@ -49,6 +55,35 @@ The resulting event:
"flags": ["flag1", "flag2"]
}
```
---
```yaml
pipelines:
example_pipeline:
...
actions:
- type: json_extract
field: log
extract_fields:
- extract1
- extract2
prefix: ext_
...
```
The original event:
```json
{
"log": "{\"level\":\"error\",\"extract1\":\"data1\",\"extract2\":\"long message ...",
"time": "2024-03-01T10:49:28.263317941Z"
}
```
The resulting event:
```json
{
"log": "{\"level\":\"error\",\"extract1\":\"data1\",\"extract2\":\"long message ...",
"time": "2024-03-01T10:49:28.263317941Z",
"ext_extract1": "data1"
}
```
}*/

/*{ benchmarks
Expand Down Expand Up @@ -102,6 +137,11 @@ type Config struct {
// >
// > Fields to extract.
ExtractFields []cfg.FieldSelector `json:"extract_fields" slice:"true"` // *

// > @3@4@5@6
// >
// > A prefix to add to extracted field keys.
Prefix string `json:"prefix" default:""` // *
}

func init() {
Expand Down Expand Up @@ -145,14 +185,14 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
}

p.decoder.ResetBytes(jsonNode.AsBytes())
extract(event.Root, p.decoder, p.extractFields.root.children, false)
extract(event.Root, p.decoder, p.extractFields.root.children, p.config.Prefix, false)
return pipeline.ActionPass
}

// extract extracts fields from decoder and adds it to the root.
//
// [skipAddField] flag is required for proper benchmarking.
func extract(root *insaneJSON.Root, d *jx.Decoder, fields pathNodes, skipAddField bool) {
func extract(root *insaneJSON.Root, d *jx.Decoder, fields pathNodes, prefix string, skipAddField bool) {
objIter, err := d.ObjIter()
if err != nil {
return
Expand All @@ -171,15 +211,19 @@ func extract(root *insaneJSON.Root, d *jx.Decoder, fields pathNodes, skipAddFiel

if len(n.children) == 0 { // last field in path, add to root
if skipAddField {
_ = d.Skip()
if err = d.Skip(); err != nil {
break
}
} else {
addField(root, n.data, d)
if err = addField(root, prefix+n.data, d); err != nil {
break
}
}
} else { // go deep
// Capture calls f and then rolls back to state before call
_ = d.Capture(func(d *jx.Decoder) error {
// recursively extract child fields
extract(root, d, n.children, skipAddField)
extract(root, d, n.children, prefix, skipAddField)
return nil
})
// skip the current field because we have processed it
Expand All @@ -196,32 +240,55 @@ func extract(root *insaneJSON.Root, d *jx.Decoder, fields pathNodes, skipAddFiel
}
}

func addField(root *insaneJSON.Root, field string, d *jx.Decoder) {
func addField(root *insaneJSON.Root, field string, d *jx.Decoder) error {
switch d.Next() {
case jx.Number:
num, _ := d.Num()
intVal, err := num.Int64()
num, err := d.Num()
if err != nil {
return err
}
var (
intVal int64
floatVal float64
)
intVal, err = num.Int64()
if err == nil {
root.AddFieldNoAlloc(root, field).MutateToInt64(intVal)
} else {
floatVal, err := num.Float64()
floatVal, err = num.Float64()
if err == nil {
root.AddFieldNoAlloc(root, field).MutateToFloat(floatVal)
}
}
if err != nil {
return err
}
case jx.String:
s, _ := d.StrBytes()
s, err := d.StrBytes()
if err != nil {
return err
}
root.AddFieldNoAlloc(root, field).MutateToBytesCopy(root, s)
case jx.Null:
_ = d.Null()
err := d.Null()
if err != nil {
return err
}
root.AddFieldNoAlloc(root, field).MutateToNull()
case jx.Bool:
b, _ := d.Bool()
b, err := d.Bool()
if err != nil {
return err
}
root.AddFieldNoAlloc(root, field).MutateToBool(b)
case jx.Object, jx.Array:
raw, _ := d.Raw()
raw, err := d.Raw()
if err != nil {
return err
}
root.AddFieldNoAlloc(root, field).MutateToJSON(root, raw.String())
default:
_ = d.Skip()
return d.Skip()
}
return nil
}
33 changes: 32 additions & 1 deletion plugin/action/json_extract/json_extract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,37 @@ func TestJsonExtract(t *testing.T) {
"extracted": "text",
},
},
{
name: "partial_json",
config: &Config{
Field: "json_field",
ExtractFields: []cfg.FieldSelector{
"extracted1",
"extracted2",
},
},
in: `{"field1":"value1","json_field":"{\"test\":\"test_value\",\"extracted1\":\"text\",\"extracted2\":\"long text ..."}`,
want: map[string]string{
"extracted1": "text",
"extracted2": "",
},
},
{
name: "extract_with_prefix",
config: &Config{
Field: "json_field",
ExtractFields: []cfg.FieldSelector{
"extracted1",
"extracted2",
},
Prefix: "ext_",
},
in: `{"field1":"value1","json_field":"{\"test\":\"test_value\",\"extracted1\":\"text1\",\"extracted2\":\"text2\"}","field3":3}`,
want: map[string]string{
"ext_extracted1": "text1",
"ext_extracted2": "text2",
},
},
}
for _, tt := range cases {
tt := tt
Expand Down Expand Up @@ -275,7 +306,7 @@ func BenchmarkExtract(b *testing.B) {
for i := 0; i < b.N; i++ {
d.ResetBytes(benchCase.json)
// remove allocs for adding new fields to root by passing `skipAddField` flag for correct benching
extract(nil, d, extractFields.root.children, true)
extract(nil, d, extractFields.root.children, "", true)
}
})
}
Expand Down