From d00c2228c544fcac4ee640db055e94ea8f44ac80 Mon Sep 17 00:00:00 2001 From: Oleg Don <31502412+HeadHunter483@users.noreply.github.com> Date: Thu, 20 Feb 2025 13:43:53 +0500 Subject: [PATCH] 775 parse unixtime microseconds (#776) * Add parse unixtime milli, micro, nanoseconds * Update gelf output ts format options * Fix docs links for github pages * Add pipeline and related docs * Add datetime parse doc in pipeline --- cfg/matchrule/README.md | 53 ++++++++ docs/architecture.md | 6 +- docs/examples.md | 6 +- docs/installation.md | 2 +- pipeline/README.idoc.md | 150 +++++++++++++++++++++ pipeline/README.md | 150 +++++++++++++++++++++ pipeline/antispam/README.md | 39 ++++++ pipeline/doif/README.idoc.md | 18 +-- pipeline/doif/README.md | 18 +-- pipeline/util.go | 67 +++++++-- pipeline/util_test.go | 45 +++++++ plugin/action/convert_date/README.md | 6 +- plugin/action/convert_date/convert_date.go | 17 ++- plugin/action/set_time/README.md | 3 +- plugin/action/set_time/set_time.go | 12 +- plugin/action/throttle/README.md | 3 +- plugin/action/throttle/throttle.go | 3 +- plugin/output/gelf/README.md | 5 +- plugin/output/gelf/gelf.go | 6 +- 19 files changed, 552 insertions(+), 57 deletions(-) create mode 100644 cfg/matchrule/README.md create mode 100644 pipeline/antispam/README.md diff --git a/cfg/matchrule/README.md b/cfg/matchrule/README.md new file mode 100644 index 000000000..e3531db7b --- /dev/null +++ b/cfg/matchrule/README.md @@ -0,0 +1,53 @@ +# Match rules + +Match rules are lightweight checks for the raw byte contents. The rules are combined in rulesets, they can be used with logical `and` or `or` applied to all rules in the rulset, its result might be inverted, they can check values in case insensitive mode. + +## Rule + +**`values`** *`[]string`* + +List of values to check the content against. + +
+ +**`mode`** *`string`* *`required`* *`options=prefix|suffix|contains`* + +Content check mode. In `prefix` mode only first bytes of the content are checked. In `suffix` mode only last bytes of the content are checked. In `contains` mode there is a substring search in the contents. + +
+ +**`case_insensitive`** *`bool`* *`default=false`* + +When `case_insensitive` is set to `true` all `values` and the checking contents are converted to lowercase. It is better to avoid using this mode because it can impact throughput and performance of the logs collection. + +
+ +**`invert`** *`bool`* *`default=false`* + +Flag indicating whether to negate the match result. For example if all of the rules are matched and `invert` is set to `true` the whole ruleset will result as not matched. It should be used when it is easier to list items that should not match the rules. + +
+ +## RuleSet + +**`name`** *`string`* + +The name of the ruleset. Has some additional semantics in [antispam exceptions](/pipeline/antispam/README.md#exception-parameters). + +
+ +**`cond`** *`string`* *`default=and`* *`options=and|or`* + +Logical conditional operation to combine rules with. If set to `and` ruleset will only match when all rules are matched. If set to `or` ruleset will match when at least one of the rules is matched. + +
+ +**`rules`** *`[]`[Rule](/cfg/matchrule/README.md#rule)* + +List of rules to check the log against. + +
+ +## RuleSets + +List of [RuleSet](/cfg/matchrule/README.md#ruleset). Always combined with logical `or`, meaning it matches when at least one of the rulesets match. diff --git a/docs/architecture.md b/docs/architecture.md index e78ac56f7..de0228dcf 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -6,13 +6,13 @@ Here is a bit simplified architecture of the **file.d** solution. What's going on here: -- **Input plugin** pulls data from external systems and pushes it next to the pipeline controller. Full list of input plugins available is [here](../plugin/input). +- **Input plugin** pulls data from external systems and pushes it next to the pipeline controller. Full list of input plugins available is [here](/plugin/input/README.md). - The **pipeline controller** creates **streams** of the data and is in charge of converting data to event and subsequent routing. - The **event pool** provides fast event instancing. - Events are processed by one or more **processors**. Every processor holds all **action plugins** from the configuration. - Every moment the processor gets a stream of data, process 1 or more events and returns the stream to a **streamer** that is a pool of streams. -- Action plugins act on the events which meet particular criteria. -- Finally, the event goes to the **output plugins** and is dispatched to the external system. +- Action plugins act on the events which meet particular criteria. Full list of action plugins available is [here](/plugin/action/README.md). +- Finally, the event goes to the **output plugins** and is dispatched to the external system. Full list of output plugins available is [here](/plugin/output/README.md). You can extend `file.d` by adding your own input, action, and output plugins. diff --git a/docs/examples.md b/docs/examples.md index 80fef6620..a4200e528 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -74,7 +74,7 @@ pipelines: ``` ## What's next? -1. [Input](/plugin/input) plugins documentation -2. [Action](/plugin/action) plugins documentation -3. [Output](/plugin/output) plugins documentation +1. [Input](/plugin/input/README.md) plugins documentation +2. [Action](/plugin/action/README.md) plugins documentation +3. [Output](/plugin/output/README.md) plugins documentation 4. [Helm-chart](/charts/filed/README.md) and examples for running in Kubernetes diff --git a/docs/installation.md b/docs/installation.md index 2658b74cd..defa0abdb 100644 --- a/docs/installation.md +++ b/docs/installation.md @@ -6,7 +6,7 @@ Images are available on [GitHub container registry](https://github.com/ozontech/file.d/pkgs/container/file.d/versions?filters%5Bversion_type%5D=tagged). **Note**: -If you are using [journalctl](https://github.com/ozontech/file.d/tree/master/plugin/input/journalctl) input plugin, we +If you are using [journalctl](https://github.com/ozontech/file.d/tree/master/plugin/input/journalctl/README.md) input plugin, we recommend choosing the ubuntu version that matches the host machine version. For example, if the host machine with which you want to collect logs using journald has a version of Ubuntu 18.04, you diff --git a/pipeline/README.idoc.md b/pipeline/README.idoc.md index 20521b6ea..de4838c6c 100644 --- a/pipeline/README.idoc.md +++ b/pipeline/README.idoc.md @@ -1,2 +1,152 @@ +# Pipeline + +Pipeline is an entity which handles data. It consists of input plugin, list of action plugins and output plugin. The input plugin sends the data to `pipeline.In` controller. There the data is validated, if the data is empty, it is discarded, the data size is also checked, the behaviour for the long logs is defined by `cut_off_event_by_limit` setting. Then the data is checked in `antispam` if it is enabled. After all checks are passed the data is converted to the `Event` structure, the events are limited by the `EventPool`, and decoded depending on the [pipeline settings](#settings). The event is sent to stream which are handled with `processors`. In the processors the event is passed through the list of action plugins and sent to the output plugin. Output plugin commits the `Event` by calling `pipeline.Commit` function and after the commit is finished the data is considered as processed. More details and architecture is presented in [architecture page](/docs/architecture.md). + +## Settings + +**`capacity`** *`int`* *`default=1024`* + +Capacity of the `EventPool`. There can only be processed no more than `capacity` events at the same time. It can be considered as one of the rate limiting tools, but its primary role is to control the amount of RAM used by File.d. + +
+ +**`avg_log_size`** *`int`* *`default=4096`* + +Expected average size of the input logs in bytes. Used in standard event pool to release buffer memory when its size exceeds this value. + +
+ +**`max_event_size`** *`int`* *`default=0`* + +Maximum allowed size of the input logs in bytes. If set to 0, logs of any size are allowed. If set to the value greater than 0, logs with size greater than `max_event_size` are discarded unless `cut_off_event_by_limit` is set to `true`. + +
+ +**`cut_off_event_by_limit`** *`bool`* *`default=false`* + +Flag indicating whether to cut logs which have exceeded the `max_event_size`. If set to `true` huge logs are cut and only the first `max_event_size` bytes of the logs are passed further. If set to `false` huge logs are discarded. Only works if `max_event_size` is greater than 0, otherwise does nothing. Useful when there are huge logs which affect the logging system but it is prefferable to deliver them at least partially. + +
+ +**`cut_off_event_by_limit_field`** *`string`* + +Field to add to log if it was cut by `max_event_size`. E.g. with `cut_off_event_by_limit_field: _cropped`, if the log was cut, the output event will have field `"_cropped":true`. Only works if `cut_off_event_by_limit` is set to `true` and `max_event_size` is greater than 0. Useful for marking cut logs. + +
+ +**`decoder`** *`string`* *`default=auto`* + +Which decoder to use on every log from input plugin. Defaults to `auto` meaning the usage of the decoder suggested by the input plugin. Currently most of the time `json` decoder is suggested, the only exception is [k8s input plugin](/plugin/input/k8s/README.md) with CRI type not docker, in that case `cri` decoder is suggested. The full list of the decoders is available on the [decoders page](/decoder/readme.md). + +
+ +**`decoder_params`** *`map[string]any`* + +Additional parameters for the chosen decoder. The params list varies. It can be found on the [decoders page](/decoder/readme.md) for each of them. + +
+ +**`stream_field`** *`string`* *`default=stream`* + +Which field in the log indicates `stream`. Mostly used for distinguishing `stdout` from `stderr` in k8s logs. + +
+ +**`maintenance_interval`** *`string`* *`default=5s`* + +How often to perform maintenance. Maintenance includes antispammer maintenance and metric cleanup, metric holder maintenance, increasing basic pipeline metrics with accumulated deltas, logging pipeline stats. The value must be passed in format of duration (`(ms|s|m|h)`). + +
+ +**`event_timeout`** *`bool`* *`default=30s`* + +How long the event can process in action plugins and block stream in streamer until it is marked as a timeout event and unlocks stream so that the whole pipeline does not get stuck. The value must be passed in format of duration (`(ms|s|m|h)`). + +
+ +**`antispam_threshold`** *`int`* *`default=0`* + +Threshold value for the [antispammer](/pipeline/antispam/README.md#antispammer) to ban sources. If set to 0 antispammer is disabled. If set to the value greater than 0 antispammer is enabled and bans sources which write `antispam_threshold` or more logs in `maintenance_interval` time. + +
+ +**`antispam_exceptions`** *`[]`[antispam.Exception](/pipeline/antispam/README.md#exception-parameters)* + +The list of antispammer exceptions. If the log matches at least one of the exceptions it is not accounted in antispammer. + +
+ +**`meta_cache_size`** *`int`* *`default=1024`* + +Amount of entries in metadata cache. + +
+ +**`source_name_meta_field`** *`string`* + +The metadata field used to retrieve the name or origin of a data source. You can use it for antispam. Metadata is configured via `meta` parameter in input plugin. For example: + +```yaml +input: + type: k8s + meta: + pod_namespace: '{{ .pod_name }}.{{ .namespace_name }}' +pipeline: + antispam_threshold: 2000 + source_name_meta_field: pod_namespace +``` + +
+ +**`is_strict`** *`bool`* *`default=false`* + +Whether to fatal on decoding error. + +
+ +**`metric_hold_duration`** *`string`* *`default=30m`* + +The amount of time the metric can be idle until it is deleted. Used for deleting rarely updated metrics to save metrics storage resources. The value must be passed in format of duration (`(ms|s|m|h)`). + +
+ +**`pool`** *`string`* *`options=std|low_memory`* + +Type of `EventPool`. `std` pool is an original pool with the slice of `Event` pointers and slices of free events indicators. `low_memory` pool is a leveled pool based on multiple `sync.Pool` for the events of different size. The latter one is experimental. + +
+ +## Datetime parse formats + +Most of the plugins which work with parsing datetime call `pipeline.ParseTime` function. It accepts datetime layouts the same way as Go `time.Parse` (in format of datetime like `2006-01-02T15:04:05.999999999Z07:00`) except unix timestamp formats, they can only be specified via aliases. + +For the comfort of use there are aliases to some datetime formats: + ++ `ansic` - Mon Jan _2 15:04:05 2006 ++ `unixdate` - Mon Jan _2 15:04:05 MST 2006 ++ `rubydate` - Mon Jan 02 15:04:05 -0700 2006 ++ `rfc822` - 02 Jan 06 15:04 MST ++ `rfc822z` - 02 Jan 06 15:04 -0700 ++ `rfc850` - Monday, 02-Jan-06 15:04:05 MST ++ `rfc1123` - Mon, 02 Jan 2006 15:04:05 MST ++ `rfc1123z` - Mon, 02 Jan 2006 15:04:05 -0700 ++ `rfc3339` - 2006-01-02T15:04:05Z07:00 ++ `rfc3339nano` - 2006-01-02T15:04:05.999999999Z07:00 ++ `kitchen` - 3:04PM ++ `stamp` - Jan _2 15:04:05 ++ `stampmilli` - Jan _2 15:04:05.000 ++ `stampmicro` - Jan _2 15:04:05.000000 ++ `stampnano` - Jan _2 15:04:05.000000000 ++ `nginx_errorlog` - 2006/01/02 15:04:05 ++ `unixtime` - unix timestamp in seconds: 1739959880 ++ `unixtimemilli` - unix timestamp in milliseconds: 1739959880999 ++ `unixtimemicro` - unix timestamp in microseconds: 1739959880999999 (e.g. `journalctl` writes timestamp in that format in `__REALTIME_TIMESTAMP` field when using json output format) ++ `unixtimenano` - unix timestamp in nanoseconds: 1739959880999999999 + +**Note**: when using `unixtime(|milli|micro|nano)` if there is a float value its whole part is always considered as seconds and the fractional part is fractions of a second. + ## Match modes + +> Note: consider using [DoIf match rules](/pipeline/doif/README.md) instead, since it is an advanced version of match modes. + @match-modes|header-description diff --git a/pipeline/README.md b/pipeline/README.md index 293e39e1e..e65aaa8f2 100755 --- a/pipeline/README.md +++ b/pipeline/README.md @@ -1,4 +1,154 @@ +# Pipeline + +Pipeline is an entity which handles data. It consists of input plugin, list of action plugins and output plugin. The input plugin sends the data to `pipeline.In` controller. There the data is validated, if the data is empty, it is discarded, the data size is also checked, the behaviour for the long logs is defined by `cut_off_event_by_limit` setting. Then the data is checked in `antispam` if it is enabled. After all checks are passed the data is converted to the `Event` structure, the events are limited by the `EventPool`, and decoded depending on the [pipeline settings](#settings). The event is sent to stream which are handled with `processors`. In the processors the event is passed through the list of action plugins and sent to the output plugin. Output plugin commits the `Event` by calling `pipeline.Commit` function and after the commit is finished the data is considered as processed. More details and architecture is presented in [architecture page](/docs/architecture.md). + +## Settings + +**`capacity`** *`int`* *`default=1024`* + +Capacity of the `EventPool`. There can only be processed no more than `capacity` events at the same time. It can be considered as one of the rate limiting tools, but its primary role is to control the amount of RAM used by File.d. + +
+ +**`avg_log_size`** *`int`* *`default=4096`* + +Expected average size of the input logs in bytes. Used in standard event pool to release buffer memory when its size exceeds this value. + +
+ +**`max_event_size`** *`int`* *`default=0`* + +Maximum allowed size of the input logs in bytes. If set to 0, logs of any size are allowed. If set to the value greater than 0, logs with size greater than `max_event_size` are discarded unless `cut_off_event_by_limit` is set to `true`. + +
+ +**`cut_off_event_by_limit`** *`bool`* *`default=false`* + +Flag indicating whether to cut logs which have exceeded the `max_event_size`. If set to `true` huge logs are cut and only the first `max_event_size` bytes of the logs are passed further. If set to `false` huge logs are discarded. Only works if `max_event_size` is greater than 0, otherwise does nothing. Useful when there are huge logs which affect the logging system but it is prefferable to deliver them at least partially. + +
+ +**`cut_off_event_by_limit_field`** *`string`* + +Field to add to log if it was cut by `max_event_size`. E.g. with `cut_off_event_by_limit_field: _cropped`, if the log was cut, the output event will have field `"_cropped":true`. Only works if `cut_off_event_by_limit` is set to `true` and `max_event_size` is greater than 0. Useful for marking cut logs. + +
+ +**`decoder`** *`string`* *`default=auto`* + +Which decoder to use on every log from input plugin. Defaults to `auto` meaning the usage of the decoder suggested by the input plugin. Currently most of the time `json` decoder is suggested, the only exception is [k8s input plugin](/plugin/input/k8s/README.md) with CRI type not docker, in that case `cri` decoder is suggested. The full list of the decoders is available on the [decoders page](/decoder/readme.md). + +
+ +**`decoder_params`** *`map[string]any`* + +Additional parameters for the chosen decoder. The params list varies. It can be found on the [decoders page](/decoder/readme.md) for each of them. + +
+ +**`stream_field`** *`string`* *`default=stream`* + +Which field in the log indicates `stream`. Mostly used for distinguishing `stdout` from `stderr` in k8s logs. + +
+ +**`maintenance_interval`** *`string`* *`default=5s`* + +How often to perform maintenance. Maintenance includes antispammer maintenance and metric cleanup, metric holder maintenance, increasing basic pipeline metrics with accumulated deltas, logging pipeline stats. The value must be passed in format of duration (`(ms|s|m|h)`). + +
+ +**`event_timeout`** *`bool`* *`default=30s`* + +How long the event can process in action plugins and block stream in streamer until it is marked as a timeout event and unlocks stream so that the whole pipeline does not get stuck. The value must be passed in format of duration (`(ms|s|m|h)`). + +
+ +**`antispam_threshold`** *`int`* *`default=0`* + +Threshold value for the [antispammer](/pipeline/antispam/README.md#antispammer) to ban sources. If set to 0 antispammer is disabled. If set to the value greater than 0 antispammer is enabled and bans sources which write `antispam_threshold` or more logs in `maintenance_interval` time. + +
+ +**`antispam_exceptions`** *`[]`[antispam.Exception](/pipeline/antispam/README.md#exception-parameters)* + +The list of antispammer exceptions. If the log matches at least one of the exceptions it is not accounted in antispammer. + +
+ +**`meta_cache_size`** *`int`* *`default=1024`* + +Amount of entries in metadata cache. + +
+ +**`source_name_meta_field`** *`string`* + +The metadata field used to retrieve the name or origin of a data source. You can use it for antispam. Metadata is configured via `meta` parameter in input plugin. For example: + +```yaml +input: + type: k8s + meta: + pod_namespace: '{{ .pod_name }}.{{ .namespace_name }}' +pipeline: + antispam_threshold: 2000 + source_name_meta_field: pod_namespace +``` + +
+ +**`is_strict`** *`bool`* *`default=false`* + +Whether to fatal on decoding error. + +
+ +**`metric_hold_duration`** *`string`* *`default=30m`* + +The amount of time the metric can be idle until it is deleted. Used for deleting rarely updated metrics to save metrics storage resources. The value must be passed in format of duration (`(ms|s|m|h)`). + +
+ +**`pool`** *`string`* *`options=std|low_memory`* + +Type of `EventPool`. `std` pool is an original pool with the slice of `Event` pointers and slices of free events indicators. `low_memory` pool is a leveled pool based on multiple `sync.Pool` for the events of different size. The latter one is experimental. + +
+ +## Datetime parse formats + +Most of the plugins which work with parsing datetime call `pipeline.ParseTime` function. It accepts datetime layouts the same way as Go `time.Parse` (in format of datetime like `2006-01-02T15:04:05.999999999Z07:00`) except unix timestamp formats, they can only be specified via aliases. + +For the comfort of use there are aliases to some datetime formats: + ++ `ansic` - Mon Jan _2 15:04:05 2006 ++ `unixdate` - Mon Jan _2 15:04:05 MST 2006 ++ `rubydate` - Mon Jan 02 15:04:05 -0700 2006 ++ `rfc822` - 02 Jan 06 15:04 MST ++ `rfc822z` - 02 Jan 06 15:04 -0700 ++ `rfc850` - Monday, 02-Jan-06 15:04:05 MST ++ `rfc1123` - Mon, 02 Jan 2006 15:04:05 MST ++ `rfc1123z` - Mon, 02 Jan 2006 15:04:05 -0700 ++ `rfc3339` - 2006-01-02T15:04:05Z07:00 ++ `rfc3339nano` - 2006-01-02T15:04:05.999999999Z07:00 ++ `kitchen` - 3:04PM ++ `stamp` - Jan _2 15:04:05 ++ `stampmilli` - Jan _2 15:04:05.000 ++ `stampmicro` - Jan _2 15:04:05.000000 ++ `stampnano` - Jan _2 15:04:05.000000000 ++ `nginx_errorlog` - 2006/01/02 15:04:05 ++ `unixtime` - unix timestamp in seconds: 1739959880 ++ `unixtimemilli` - unix timestamp in milliseconds: 1739959880999 ++ `unixtimemicro` - unix timestamp in microseconds: 1739959880999999 (e.g. `journalctl` writes timestamp in that format in `__REALTIME_TIMESTAMP` field when using json output format) ++ `unixtimenano` - unix timestamp in nanoseconds: 1739959880999999999 + +**Note**: when using `unixtime(|milli|micro|nano)` if there is a float value its whole part is always considered as seconds and the fractional part is fractions of a second. + ## Match modes + +> Note: consider using [DoIf match rules](/pipeline/doif/README.md) instead, since it is an advanced version of match modes. + #### And `match_mode: and` — matches fields with AND operator diff --git a/pipeline/antispam/README.md b/pipeline/antispam/README.md new file mode 100644 index 000000000..64d1a033f --- /dev/null +++ b/pipeline/antispam/README.md @@ -0,0 +1,39 @@ +# Antispam + +In some systems services might explode with logs due to different circumstances. If there are a lot of services to collect logs from and some of them suddenly start writing too much logs while the others operate normally, antispam system can help reduce the impact from the spamming services on the others. Usually it is used when there is no room for increasing File.d throughput or capacity, e.g. when File.d is used as daemonset on k8s nodes with limited resources. + +## Antispammer + +The main entity is `Antispammer`. It counts input data from the sources (e.g. if data comes from [file input plugin](/plugin/input/file/README.md), source can be filename) and decides whether to ban it or not. For each source it counts how many logs it has got, in other words the counter for the source is incremented for each incoming log. When the counter is greater or equal to the threshold value the source is banned until its counter is less than the threshold value. The counter value is decremented once in maintenance interval by the threshold value. The maintenance interval for antispam is the same as for the pipeline (see `maintenance_interval` in [pipeline settings](/pipeline/README.md#settings)). + +## Exceptions + +Antispammer has some exception rules which can be applied by checking source name or log as raw bytes contents. If the log is matched by the rules it is not accounted for in the antispammer. It might be helpful for the logs from critical infrastructure services which must not be banned at all. + +### Exception parameters + +The exception parameters are the extension of [RuleSet](/cfg/matchrule/README.md). + +**`name`** *`string`* + +The name of the ruleset of the exception. If set to nonempty string, adds label value for the `name` label in the `antispam_exceptions` metric. + +
+ +**`cond`** *`string`* *`default=and`* *`options=and|or`* + +Logical conditional operation to combine rules with. If set to `and` exception will only match when all rules are matched. If set to `or` exception will match when at least one of the rules is matched. + +
+ +**`rules`** *`[]`Rule* + +List of rules to check the log against. + +
+ +**`check_source_name`** *`bool`* *`default=false`* + +Flag indicating whether to check source name. If set to `true` source name will be checked against all rules. If set to `false` log as raw bytes content will be checked against all rules. + +
diff --git a/pipeline/doif/README.idoc.md b/pipeline/doif/README.idoc.md index 666b80904..aea515be0 100644 --- a/pipeline/doif/README.idoc.md +++ b/pipeline/doif/README.idoc.md @@ -1,30 +1,30 @@ -## Experimental: Do If rules +# Experimental: Do If rules (logs content matching rules) This is experimental feature and represents an advanced version of `match_fields`. The Do If rules are a tree of nodes. The tree is stored in the Do If Checker instance. When Do If Checker's Match func is called it calls to the root Match func and then the chain of Match func calls are performed across the whole tree. -### Node types +## Node types @do-if-node|description -### Field op node +## Field op node @do-if-field-op-node -### Field operations +## Field operations @do-if-field-op|description -### Logical op node +## Logical op node @do-if-logical-op-node -### Logical operations +## Logical operations @do-if-logical-op|description -### Length comparison op node +## Length comparison op node @do-if-len-cmp-op-node -### Timestamp comparison op node +## Timestamp comparison op node @do-if-ts-cmp-op-node -### Check type op node +## Check type op node @do-if-check-type-op-node diff --git a/pipeline/doif/README.md b/pipeline/doif/README.md index 388c55b08..9153420f0 100755 --- a/pipeline/doif/README.md +++ b/pipeline/doif/README.md @@ -1,11 +1,11 @@ -## Experimental: Do If rules +# Experimental: Do If rules (logs content matching rules) This is experimental feature and represents an advanced version of `match_fields`. The Do If rules are a tree of nodes. The tree is stored in the Do If Checker instance. When Do If Checker's Match func is called it calls to the root Match func and then the chain of Match func calls are performed across the whole tree. -### Node types +## Node types **`FieldOp`** Type of node where matching rules for fields are stored.
@@ -27,7 +27,7 @@ the chain of Match func calls are performed across the whole tree.
-### Field op node +## Field op node DoIf field op node is considered to always be a leaf in the DoIf tree. It checks byte representation of the value by the given field path. Array and object values are considered as not matched since encoding them to bytes leads towards large CPU and memory consumption. @@ -53,7 +53,7 @@ pipelines: ``` -### Field operations +## Field operations **`Equal`** checks whether the field value is equal to one of the elements in the values list. Example: @@ -177,7 +177,7 @@ result:
-### Logical op node +## Logical op node DoIf logical op node is a node considered to be the root or an edge between nodes. It always has at least one operand which are other nodes and calls their checks to apply logical operation on their results. @@ -206,7 +206,7 @@ pipelines: ``` -### Logical operations +## Logical operations **`Or`** accepts at least one operand and returns true on the first returned true from its operands. Example: @@ -293,7 +293,7 @@ result:
-### Length comparison op node +## Length comparison op node DoIf length comparison op node is considered to always be a leaf in the DoIf tree like DoIf field op node. It contains operation that compares field length in bytes or array length (for array fields) with certain value. @@ -360,7 +360,7 @@ They denote corresponding comparison operations. | `eq` | `==` | | `ne` | `!=` | -### Timestamp comparison op node +## Timestamp comparison op node DoIf timestamp comparison op node is considered to always be a leaf in the DoIf tree like DoIf field op node. It contains operation that compares timestamps with certain value. @@ -403,7 +403,7 @@ Result: {"timestamp":"2011-01-01T00:00:00Z"} # not discarded (condition is not met) ``` -### Check type op node +## Check type op node DoIf check type op node checks whether the type of the field node is the one from the list. Params: diff --git a/pipeline/util.go b/pipeline/util.go index 2180d1d17..112a70b20 100644 --- a/pipeline/util.go +++ b/pipeline/util.go @@ -2,6 +2,7 @@ package pipeline import ( "fmt" + "math" "reflect" "strconv" "strings" @@ -56,13 +57,17 @@ func StringToByteUnsafe(s string) (b []byte) { */ const ( - formats = "ansic|unixdate|rubydate|rfc822|rfc822z|rfc850|rfc1123|rfc1123z|rfc3339|rfc3339nano|kitchen|stamp|stampmilli|stampmicro|stampnano|unixtime|nginx_errorlog" - UnixTime = "unixtime" - nginxDateFmt = "2006/01/02 15:04:05" + formats = "ansic|unixdate|rubydate|rfc822|rfc822z|rfc850|rfc1123|rfc1123z|rfc3339|rfc3339nano|kitchen|stamp|stampmilli|stampmicro|stampnano|unixtime|unixtimemilli|unixtimemicro|unixtimenano|nginx_errorlog" + UnixTime = "unixtime" + UnixTimeMilli = "unixtimemilli" + UnixTimeMicro = "unixtimemicro" + UnixTimeNano = "unixtimenano" + nginxDateFmt = "2006/01/02 15:04:05" ) func ParseFormatName(formatName string) (string, error) { - switch strings.ToLower(strings.TrimSpace(formatName)) { + formatNameProcessed := strings.ToLower(strings.TrimSpace(formatName)) + switch formatNameProcessed { case "ansic": return time.ANSIC, nil case "unixdate": @@ -95,31 +100,63 @@ func ParseFormatName(formatName string) (string, error) { return time.StampNano, nil case "nginx_errorlog": return nginxDateFmt, nil - case UnixTime: - return UnixTime, nil + case UnixTime, UnixTimeMilli, UnixTimeMicro, UnixTimeNano: + return formatNameProcessed, nil default: return "", fmt.Errorf("unknown format name %q, should be one of %s", formatName, formats) } } +type unixTimeFormat int + +const ( + unixTimeSec unixTimeFormat = iota + unixTimeMilli + unixTimeMicro + unixTimeNano +) + func ParseTime(format, value string) (time.Time, error) { - if format == UnixTime { - return parseUnixTime(value) + switch format { + case UnixTime: + return parseUnixTime(value, unixTimeSec) + case UnixTimeMilli: + return parseUnixTime(value, unixTimeMilli) + case UnixTimeMicro: + return parseUnixTime(value, unixTimeMicro) + case UnixTimeNano: + return parseUnixTime(value, unixTimeNano) + default: + return time.Parse(format, value) } - return time.Parse(format, value) } -func parseUnixTime(value string) (time.Time, error) { +func parseUnixTime(value string, format unixTimeFormat) (time.Time, error) { numbers := strings.Split(value, ".") - var sec, nsec int64 + var sec, nsec, val int64 var err error switch len(numbers) { case 1: - sec, err = strconv.ParseInt(numbers[0], 10, 64) + val, err = strconv.ParseInt(numbers[0], 10, 64) if err != nil { return time.Time{}, err } + switch format { + case unixTimeSec: + sec = val + case unixTimeMilli: + sec = val / 1e3 + nsec = (val % 1e3) * 1e6 + case unixTimeMicro: + sec = val / 1e6 + nsec = (val % 1e6) * 1e3 + case unixTimeNano: + sec = val / 1e9 + nsec = val % 1e9 + } case 2: + // when timestamp is presented as a float number its whole part is always considered as seconds + // and the fractional part is fractions of a second sec, err = strconv.ParseInt(numbers[0], 10, 64) if err != nil { return time.Time{}, err @@ -128,6 +165,12 @@ func parseUnixTime(value string) (time.Time, error) { if err != nil { return time.Time{}, err } + // if there are less than 9 digits to the right of the decimal point + // it must be multiplied by 10^(9 - digits) to get nsec value + digits := len(numbers[1]) + if digits < 9 { + nsec *= int64(math.Pow10(9 - digits)) + } default: return time.Time{}, fmt.Errorf("unexpected time format") } diff --git a/pipeline/util_test.go b/pipeline/util_test.go index 394e0710e..70cd261f5 100644 --- a/pipeline/util_test.go +++ b/pipeline/util_test.go @@ -1,7 +1,9 @@ package pipeline import ( + "strconv" "testing" + "time" insaneJSON "github.com/ozontech/insane-json" "github.com/stretchr/testify/require" @@ -105,3 +107,46 @@ func TestLevelParsing(t *testing.T) { require.Equal(t, got, expected) } } + +func TestParseTime(t *testing.T) { + testTime := time.Now() + tests := []struct { + name string + format string + value string + want int64 + }{ + { + name: "unixtime_ok", + format: "unixtime", + value: strconv.FormatInt(testTime.Unix(), 10), + want: testTime.Unix() * 1e9, + }, + { + name: "unixtimemilli_ok", + format: "unixtimemilli", + value: strconv.FormatInt(testTime.UnixMilli(), 10), + want: testTime.UnixMilli() * 1e6, + }, + { + name: "unixtimemicro_ok", + format: "unixtimemicro", + value: strconv.FormatInt(testTime.UnixMicro(), 10), + want: testTime.UnixMicro() * 1e3, + }, + { + name: "unixtimenano_ok", + format: "unixtimenano", + value: strconv.FormatInt(testTime.UnixNano(), 10), + want: testTime.UnixNano(), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ParseTime(tt.format, tt.value) + require.NoError(t, err, "must be no error") + require.Equal(t, tt.want, got.UnixNano()) + }) + } +} diff --git a/plugin/action/convert_date/README.md b/plugin/action/convert_date/README.md index 716cf0fd3..58b7e2963 100755 --- a/plugin/action/convert_date/README.md +++ b/plugin/action/convert_date/README.md @@ -10,13 +10,15 @@ The event field name which contains date information. **`source_formats`** *`[]string`* *`default=rfc3339nano,rfc3339`* -List of date formats to parse a field. Available list items should be one of `ansic|unixdate|rubydate|rfc822|rfc822z|rfc850|rfc1123|rfc1123z|rfc3339|rfc3339nano|kitchen|stamp|stampmilli|stampmicro|stampnano|unixtime|nginx_errorlog`. +List of date formats to parse a field. Can be specified as a datetime layout in Go [time.Parse](https://pkg.go.dev/time#Parse) format or by alias. +List of available datetime format aliases can be found [here](/pipeline/README.md#datetime-parse-formats).
**`target_format`** *`string`* *`default=unixtime`* -Date format to convert to. +Date format to convert to. Can be specified as a datetime layout in Go [time.Parse](https://pkg.go.dev/time#Parse) format or by alias. +List of available datetime format aliases can be found [here](/pipeline/README.md#datetime-parse-formats).
diff --git a/plugin/action/convert_date/convert_date.go b/plugin/action/convert_date/convert_date.go index 66f2ba0d2..e1cbbc532 100644 --- a/plugin/action/convert_date/convert_date.go +++ b/plugin/action/convert_date/convert_date.go @@ -25,13 +25,15 @@ type Config struct { // > @3@4@5@6 // > - // > List of date formats to parse a field. Available list items should be one of `ansic|unixdate|rubydate|rfc822|rfc822z|rfc850|rfc1123|rfc1123z|rfc3339|rfc3339nano|kitchen|stamp|stampmilli|stampmicro|stampnano|unixtime|nginx_errorlog`. + // > List of date formats to parse a field. Can be specified as a datetime layout in Go [time.Parse](https://pkg.go.dev/time#Parse) format or by alias. + // > List of available datetime format aliases can be found [here](/pipeline/README.md#datetime-parse-formats). SourceFormats []string `json:"source_formats" default:"rfc3339nano,rfc3339"` // * SourceFormats_ []string // > @3@4@5@6 // > - // > Date format to convert to. + // > Date format to convert to. Can be specified as a datetime layout in Go [time.Parse](https://pkg.go.dev/time#Parse) format or by alias. + // > List of available datetime format aliases can be found [here](/pipeline/README.md#datetime-parse-formats). TargetFormat string `json:"target_format" default:"unixtime"` // * TargetFormat_ string @@ -85,9 +87,16 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { for _, format := range p.config.SourceFormats_ { t, err := pipeline.ParseTime(format, date) if err == nil { - if p.config.TargetFormat_ == pipeline.UnixTime { + switch p.config.TargetFormat_ { + case pipeline.UnixTime: dateNode.MutateToInt(int(t.Unix())) - } else { + case pipeline.UnixTimeMilli: + dateNode.MutateToInt(int(t.UnixMilli())) + case pipeline.UnixTimeMicro: + dateNode.MutateToInt(int(t.UnixMicro())) + case pipeline.UnixTimeNano: + dateNode.MutateToInt(int(t.UnixNano())) + default: dateNode.MutateToString(t.Format(p.config.TargetFormat_)) } diff --git a/plugin/action/set_time/README.md b/plugin/action/set_time/README.md index dcd80303c..c713ebe93 100755 --- a/plugin/action/set_time/README.md +++ b/plugin/action/set_time/README.md @@ -11,7 +11,8 @@ The event field to put the time. **`format`** *`string`* *`default=rfc3339nano`* *`required`* -Date format to parse a field. This could be one of +Date format to parse a field. Can be specified as a datetime layout in Go [time.Parse](https://pkg.go.dev/time#Parse) format or by alias. +List of available datetime format aliases can be found [here](/pipeline/README.md#datetime-parse-formats).
diff --git a/plugin/action/set_time/set_time.go b/plugin/action/set_time/set_time.go index 93e5ffcea..1b3d42557 100644 --- a/plugin/action/set_time/set_time.go +++ b/plugin/action/set_time/set_time.go @@ -25,10 +25,8 @@ type Config struct { // > @3@4@5@6 // > - // > Date format to parse a field. This could be one of - // `unixtime|timestampmilli|timestampmicro|timestampnano|ansic|unixdate|rubydate|rfc822|rfc822z|rfc850|rfc1123|rfc1123z|rfc3339|rfc3339nano|kitchen|stamp|stampmilli|stampmicro|stampnano` - // or custom time format. - // See: https://pkg.go.dev/time#Parse + // > Date format to parse a field. Can be specified as a datetime layout in Go [time.Parse](https://pkg.go.dev/time#Parse) format or by alias. + // > List of available datetime format aliases can be found [here](/pipeline/README.md#datetime-parse-formats). Format string `json:"format" default:"rfc3339nano" required:"true"` // * Format_ string @@ -80,11 +78,11 @@ func (p *Plugin) do(event *pipeline.Event, t time.Time) pipeline.ActionResult { switch p.config.Format_ { case pipeline.UnixTime: dateNode.MutateToInt64(t.Unix()) - case "timestampmilli": + case pipeline.UnixTimeMilli, "timestampmilli": // timestamp(milli|micro|nano) are left for backward compatibility dateNode.MutateToInt64(t.UnixMilli()) - case "timestampmicro": + case pipeline.UnixTimeMicro, "timestampmicro": dateNode.MutateToInt64(t.UnixMicro()) - case "timestampnano": + case pipeline.UnixTimeNano, "timestampnano": dateNode.MutateToInt64(t.UnixNano()) default: dateNode.MutateToString(t.Format(p.config.Format_)) diff --git a/plugin/action/throttle/README.md b/plugin/action/throttle/README.md index cddfd25af..0b4caa07c 100755 --- a/plugin/action/throttle/README.md +++ b/plugin/action/throttle/README.md @@ -20,7 +20,8 @@ If not set, the current time will be taken. **`time_field_format`** *`string`* *`default=rfc3339nano`* -It defines how to parse the time field format. +It defines how to parse the time field format. Can be specified as a datetime layout in Go [time.Parse](https://pkg.go.dev/time#Parse) format or by alias. +List of available datetime format aliases can be found [here](/pipeline/README.md#datetime-parse-formats).
diff --git a/plugin/action/throttle/throttle.go b/plugin/action/throttle/throttle.go index 8191e550e..342e14193 100644 --- a/plugin/action/throttle/throttle.go +++ b/plugin/action/throttle/throttle.go @@ -74,7 +74,8 @@ type Config struct { // > @3@4@5@6 // > - // > It defines how to parse the time field format. + // > It defines how to parse the time field format. Can be specified as a datetime layout in Go [time.Parse](https://pkg.go.dev/time#Parse) format or by alias. + // > List of available datetime format aliases can be found [here](/pipeline/README.md#datetime-parse-formats). TimeFieldFormat string `json:"time_field_format" default:"rfc3339nano"` // * // > @3@4@5@6 diff --git a/plugin/output/gelf/README.md b/plugin/output/gelf/README.md index 41ff37bc6..3e5738d3c 100755 --- a/plugin/output/gelf/README.md +++ b/plugin/output/gelf/README.md @@ -71,9 +71,10 @@ Which field of the event should be used as `timestamp` GELF field.
-**`timestamp_field_format`** *`string`* *`default=rfc3339nano`* *`options=ansic|unixdate|rubydate|rfc822|rfc822z|rfc850|rfc1123|rfc1123z|rfc3339|rfc3339nano|kitchen|stamp|stampmilli|stampmicro|stampnano|unixtime`* +**`timestamp_field_format`** *`string`* *`default=rfc3339nano`* -In which format timestamp field should be parsed. +In which format timestamp field should be parsed. Can be specified as a datetime layout in Go [time.Parse](https://pkg.go.dev/time#Parse) format or by alias. +List of available datetime format aliases can be found [here](/pipeline/README.md#datetime-parse-formats).
diff --git a/plugin/output/gelf/gelf.go b/plugin/output/gelf/gelf.go index 034ade23b..3fbfb0a61 100644 --- a/plugin/output/gelf/gelf.go +++ b/plugin/output/gelf/gelf.go @@ -102,8 +102,9 @@ type Config struct { // > @3@4@5@6 // > - // > In which format timestamp field should be parsed. - TimestampFieldFormat string `json:"timestamp_field_format" default:"rfc3339nano" options:"ansic|unixdate|rubydate|rfc822|rfc822z|rfc850|rfc1123|rfc1123z|rfc3339|rfc3339nano|kitchen|stamp|stampmilli|stampmicro|stampnano|unixtime"` // * + // > In which format timestamp field should be parsed. Can be specified as a datetime layout in Go [time.Parse](https://pkg.go.dev/time#Parse) format or by alias. + // > List of available datetime format aliases can be found [here](/pipeline/README.md#datetime-parse-formats). + TimestampFieldFormat string `json:"timestamp_field_format" default:"rfc3339nano"` // * // > @3@4@5@6 // > @@ -209,6 +210,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.config.timestampField = pipeline.ByteToStringUnsafe(p.formatExtraField(nil, p.config.TimestampField)) format, err := pipeline.ParseFormatName(p.config.TimestampFieldFormat) if err != nil { + // TODO: refactor plugin code to unify with datetime parsing in other plugins params.Logger.Errorf("unknown time format: %s", err.Error()) } p.config.timestampFieldFormat = format