-
-
Notifications
You must be signed in to change notification settings - Fork 69
LogQL Plugins
- WORK IN PROGRESS!
Missing a LogQL function in cLoki? Extend functionality in in no time using cLoki Plugins Need to alias a complex query? Use macros to turn complex queries into easy to use queries
Plugins are supported via plugnplay module https://github.com/e0ipso/plugnplay . To create a plugin you have to create a nodejs project with subfolders for each plugin or add them into your cLoki plugins folder:
/
|- package.json
|- plugin_name_folder
| |- plugnplay.yml
| |- index.js
|- plugin_2_folder
|- plugnplay.yml
...
There is a number of different types of plugins supported by cLoki. Each type extends particular functionality:
- Log-range aggregator over unwrapped range:
unwrap_registry
type (vanilla LogQL example: avg_over_time) - Custom macro function to wrap or shorten an existing request statement:
macros
type
In order to initialize the plugin we need the plugnplay.yml
file:
id: derivative
name: Derivative Plugin
description: Plugin to test pluggable extensions
loader: derivative.js
type: unwrap_registry
-
id
of the plugin should be unique. -
type
of the plugin should beunwrap_registry
. -
loader
field should specify the js file exporting the plugin loader class.
The js module specified in the loader
field should export a class extending PluginLoaderBase
class from the
plugnplay package.
const {PluginLoaderBase} = require('plugnplay');
module.exports = `class extends PluginLoaderBase {
exportSync() { return {...}; }
}
The exporting class should implement one function: exportSync() {...}
.
The exportSync
function should return an object representing API different for each type of plugin.
Finally, you have to add the path to your plugin root folder to the env variable PLUGINS_PATH
.
Different paths should be separated by comma sign ,
.
In this example we will add a new unwrapped range aggregator derivative
:
derivative=(last_unwrapped_value_in_range - first_unwrapped_value_in_range) / (last_time_in_range - first_time_in_range)
You need to init a plugin with the following loader:
const {PluginLoaderBase} = require('plugnplay');
module.exports = `class extends PluginLoaderBase {
exportSync(api) {
return {
derivative = {
run: () => {},
approx: () => {}
}
}
}
}
exportSync
is a function returning an object with the function name as key and two methods: run
and approx
.
The run
method is called every time new unwrapped value accepted by the stream processor. Its declaration is:
/**
*
* @param sum {any} previous value for the current time bucket
* @param val {{unwrapped: number}} current values
* @param time {number} timestamp in ms for the current value
* @returns {any}
*/
const run = (sum, val, time) => {
sum = sum || {};
sum.first = sum && sum.first && time > sum.first.time ? sum.first : {time: time, val: val.unwrapped};
sum.last = sum && sum.last && time < sum.last ? sum.last : {time: time, val: val.unwrapped};
return sum;
}
So the run function accepts the previous aggregated value. The initial value is 0. The second is an object with current unwrapped value. And the time when the unwrapped value appeared in the database. The run function should return the new sum. Data immutability is preferred but optional.
The approx
method is called for each bucket at the end of processing. Its declaration is:
/**
* @param sum {any} sum of the time bucket you have created during "run"
* @returns {number}
*/
const approx = (sum) => {
return sum && sum.last && sum.first && sum.last.time > sum.first.time ?
(sum.last.val - sum.first.val) / (sum.last.time - sum.first.time) * 1000 : 0;
}
The only argument is the result of the latest run
call for the bucket.
The function should return number as result of the operator calculation for the provided time bucket.
The full code of the derivative
plugin:
plugnplay.yml
id: derivative
name: Derivative Plugin
description: Plugin to test pluggable extensions
loader: derivative.js
type: unwrap_registry
derivative.js:
const {PluginLoaderBase} = require('plugnplay');
module.exports = class extends PluginLoaderBase {
exportSync(api) {
return {
derivative: {
/**
*
* @param sum {any} previous value for the current time bucket
* @param val {{unwrapped: number}} current values
* @param time {number} timestamp in ms for the current value
* @returns {any}
*/
run: (sum, val, time) => {
sum = sum || {};
sum.first = sum && sum.first && time > sum.first.time ? sum.first : {
time: time,
val: val.unwrapped
};
sum.last = sum && sum.last && time < sum.last ? sum.last : {time: time, val: val.unwrapped};
return sum;
},
/**
* @param sum {any} sum of the time bucket you have created during "run"
* @returns {number}
*/
approx: (sum) => {
return sum && sum.last && sum.first && sum.last.time > sum.first.time ?
(sum.last.val - sum.first.val) / (sum.last.time - sum.first.time) * 1000 : 0;
}
}
};
}
}
If you want to create your own parser of log lines or fix some of unsupported ones, you can create a plugin of "parser_registry" type.
The plugnplay.yml file of a plugin should have "parser_registry" type. No other requirements.
id: label_to_row
name: Label to Time Series
description: Convert label to extra time series
loader: index.js
type: parser_registry
The plugin loader class should implement exportSync
method returning an object with the special structure:
exportSync (options) {
return {
<your parser function name>: {
/**
* @param parameters {string[]} stringified (!but not unquoted!) generic parameters array like
* ['parameter1', 'label2="parameter2"']
*/
map: (parameters) => {
/**
* @param entry {{labels: Object<string, string>, string: string, timestamp_ms: number}}
* @returns {{labels: Object<string, string>, string: string, timestamp_ms: number}}
*/
return (entry) => {
... here goes the parser ...
}
},
/**
* @param parameters {string[]} stringified (!but not unquoted!) generic parameters array like
* ['parameter1', 'label2="parameter2"']
*/
remap: (parameters) => {
/**
* @param emit {function({labels: Object<string, string>, string: string, timestamp_ms: number})}
* @param entry {{labels: Object<string, string>, string: string, timestamp_ms: number}}
*/
return (emit, entry) => {
... here goes the parser ...
}
}
}
}
Your parser function should implement one of map and remap functions. Use map
if you want to exactly an incoming
entry (one entry incoming to generate one entry). Remap can afford you to generate more than one entry on one incoming
or to filter out some incoming entries. Use the emit
function if you want to output an entry down to the pipeline
in that case.
Here is an example of plugin that extract labels from incoming entry and creates multiple entries like
{label: <extracted label>}: value of the label
plugnplay.yml
id: label_to_row
name: Label to Time Series
description: Convert label to extra time series
loader: index.js
type: parser_registry
index.js
const { PluginLoaderBase } = require('plugnplay')
module.exports = class extends PluginLoaderBase {
exportSync (options) {
return {
label_to_row: {
/**
*
* @param parameters {string[]}
*/
remap: (parameters) => {
const labelsToRemap = parameters.length
? JSON.parse(parameters[0]).split(',').map(p => p.trim())
: undefined
return (emit, entry) => {
if (labelsToRemap) {
for (const l of labelsToRemap) {
if (entry.labels[l]) {
const rm = {
...entry,
labels: { label: l },
string: entry.labels[l]
}
emit(rm)
}
}
return
}
for (const [l, v] of Object.entries(entry)) {
emit({
...entry,
labels: { label: l },
string: v
})
}
}
}
}
}
}
}
cLoki parses logql requests using the bnf package https://github.com/daKuleMune/nodebnf#readme
You can provide a custom bnf token representation and map it to a relevant logql request via a plugin with macros
type.
The raw ABNF description: https://github.com/lmangani/cLoki/blob/master/parser/logql.bnf .
If you are unfamiliar BNF rules, here is a good resource to get a quick introduction: http://www.cs.umsl.edu/~janikow/cs4280/bnf.pdf
A bnf description in your plugin should follow the requirements:
- one bnf rule on a string
- no multiline rules
- no comments supported
- bnf rule name should start with MACRO_ prefix
- no bnf rule name collisions
A plugin should export two fields:
const exports = {
bnf: "... bnf rules ...",
/**
*
* @param token {Token}
* @returns {string}
*/
stringify: (token) => {}
}
The bnf
field should contain bnf rules.
The stringify
function should convert a parsed query token into a legit logQL request.
Token type is a request parsed by the BNF package. It has the following fields:
Field | Header | Description |
---|---|---|
value | token.value:string | part of the request expression corresponding to the token |
Child | token.Child(child_type: string): Token | function returning the first token child with the specified type. |
Children | token.Children(child_type: string): Token[] | function returning all the token children with the specified type. |
Let's review an example of macro translating test_macro("val1")
to {test_id="val1"}
The plugnplay.yml
file
id: test_macro
name: test macro
description: A macro to test
loader: index.js
type: macros
The BNF description of the macro: MACRO_test_macro_fn ::= "test_macro" <OWSP> "(" <OWSP> <quoted_str> <OWSP> ")"
The complete loader code:
const {PluginLoaderBase} = require('plugnplay');
module.exports = class extends PluginLoaderBase {
exportSync() {
return {
bnf: `MACRO_test_macro_fn ::= "test_macro" <OWSP> "(" <OWSP> <quoted_str> <OWSP> ")"`,
/**
*
* @param token {Token}
* @returns {string}
*/
stringify: (token) => {
return `{test_id=${token.Child('quoted_str').value}}`;
}
};
}
}
You can use the common rules already defined in the core BNF description.
The raw ABNF description with all the rules: https://github.com/lmangani/cLoki/blob/master/parser/logql.bnf .
The rules defined in the BNF package are here: https://github.com/daKuleMune/nodebnf#readme
Commonly used LogQL rules:
Rule name | Example | Description |
---|---|---|
log_stream_selector | {label1 = "val1", l2 =~ "v2"} |~ "re1" |
log stream selector with label selectors and all pipeline operators |
log_stream_selector_rule | label1 = "val1" |
one label selector rule |
label | label1 |
label name |
operator | = / != / =~ / !~ |
label selector operator |
quoted_str | "qstr\"" |
one properly quoted string |
line_filter_expression | |~ "re1" |
one line filter expression |
line_filter_operator | |= / |= / !~ / != |
string filter operator |
parser_expression | | json jlbl="l1[1].l2" |
one parser expression |
label_filter_expression | | jlbl = "val1" |
one label filter in the pipeline part |
line_format_expression | | line_format "l1: {{label1}}" |
line format expression |
labels_format_expression | | line_format lbl1="l1: {{label1}}" |
label format expression |
log_range_aggregation | rate({label1="val1"} [1m]) |
log range aggregation expression |
aggregation_operator | sum(rate({label1="val1"} [1m])) by (lbl1, lbl2) |
aggregation operator expression |
unwrap_expression | {label1="val1"} |~ "re1" | unwrap lbl2 |
line selector with pipeline ending with the unwrap expression |
unwrap_function | rate(rate({label1="val1"} | unwrap int_lbl2 [1m]) by (label3) |
unwrapped log-range aggregation |
compared_agg_statement | rate(rate({label1="val1"} | unwrap int_lbl2 [1m]) by (label3) > 5 |
wrapped or unwrapped log-range aggregation comparef to a numeric const |