-
Notifications
You must be signed in to change notification settings - Fork 1
Pipelines
Pipelines are a framework to allow for complex processing tasks to be done on the data. Alarm processing is implemented as a pipeline, PID control is a pipeline, and other tasks can also be done. This framework also allows for data from multiple sensors to be considered in the generation of alarms, which was not possible previously. Basically almost any complex automatable task that relies on the freshest of data can be done via the Pipeline framework.
Pipelines have one of two states: active
and inactive
.
silent_until
defines whether a pipeline is silent or not. If it is a timestamp that is in the future or -1 (silenced forever), anything that involves affecting the outside world is suppressed.
This means that silent control pipelines keep their buffers full and are appraised of developments in whatever they control, but don't actually do anything. Silent alarm pipelines keep watching for deviations from normal conditions but don't log new alarms.
Pipelines only run as fast as the sensors they depend on.
Various problems show up if identical timestamps start appearing for the same quantity, so an input node will throw an error if it gets the same value twice from the database.
If you speed up the readout_interval
of the sensors a pipeline depends on, the pipeline will speed up to match automatically; pipelines don't have an independent processing interval.
Additionally, the pipeline does have a constant drift of 1ms per cycle (more if we miss getting a new value), because you don't want a pipeline to get slightly ahead of the sensors it needs.
When pipelines start it takes a few cycles for the internal buffers to fill up. If one node errors out, that process cycle is terminated, and some buffer nodes will error if they don't have enough in the buffer. Pipelines automatically silence themselves for as many cycles as they think are necessary.
A pipeline is a directed graph. You'll want to start with drawing a picture of what that graph should look like. As an example we'll look at a PID feedback controller for the thermosiphon. Normally this is the job of the cryocon, but that's why this is an example.
First, what data inputs are we going to need?
For the thermosiphon we want the temperature of the bottom copper block (T_TS_01
) and the temperature setpoint (T_TS_03
).
Let's also low-pass filter the temperature so we don't try to control on noise.
The variable we want to control on is the difference between the setpoint and current value (known as the "error"), so we'll want to calculate that.
Then we'll need the PID node that actually does the PID control of the heater. Here's what the graph could look like:
There are a couple of rules that must be followed. First, graph nodes inputting data into the pipeline must be endpoints, ie you can't have anything upstream of them, so we aren't able to put bottom temp and bottom setpoint into the same leg of the graph. Second, to join input streams coming from different input nodes you have to do a merging operation. This is how the pipeline deals with the inputs all having their own timestamps, and other rare cases where you have multiple variables with the same name floating around. Third, this is a directed graph. Data flows from input nodes to things downstream of them. Nodes only have at best a vague idea of the existence of things downstream.
With these rules in mind, we can transform Figure 1 into something more concrete.
We have two InfluxSourceNodes
because we're pulling two values from Influx.
We lowpass filter the temperature, merge the two input streams, evaluate the error, and then let the PID controller do its magic.
Now we need to turn this into a configuration that Doberman will understand.
There are, for the most part, three different parts of the config and we'll look at each in turn.
First is the pipeline definition, where we tell Doberman what the graph looks like.
All nodes are required to specify a type
and a name
, most need an input_var
, and who is upstream
of them. See below for the complete documentation.
"pipeline": [
{
"name": "source_temp",
"type": "InfluxSourceNode",
"input_var": "T_TS_01",
"output_var": "temp_bot"
},
{
"name": "source_sp",
"type": "InfluxSourceNode",
"input_var": "T_TS_03",
"output_var": "temp_sp"
},
{
"name": "lowpass_filter",
"type": "LowPassFilter",
"input_var": "temp_bot",
"upstream": ["source_temp"]
},
{
"name": "merge",
"type": "MergeNode",
"upstream": ["lowpass_filter", "source_sp"],
"input_var": "",
},
{
"name": "eval_error",
"type": "EvalNode",
"upstream": ["merge"],
"input_var": ["temp_sp", "temp_bot"],
"operation": "v['temp_sp'] - (c['scale'] * v['temp_bot'] + c['offset'])",
"output_var": "err"
},
{
"name": "pid",
"type": "PIDControl",
"upstream": ["eval_error"],
"control_target": "revpi1",
"control_value": "thermosiphon_heater",
"input_var": "err"
}
]
Important things:
- The order in which you specify nodes doesn't matter.
- The PIDControl node doesn't exist at time of writing.
- The quotes in the operation of the EvalNode are important. Use single quotes inside the string, as shown above.
-
upstream
must be a list, even if there's only one node, and it references the names of other nodes. If you don't specify unique names for nodes in a pipeline, expect things to behave strangely. - Things specified in the pipeline field can't be changed without restarting the pipeline.
- The filter could be downstream of the merge node without affecting the overall behavior.
Many nodes also take "runtime" parameters that are updated before every cycle of the pipeline. For a full description see below. The nodes here take the following options:
"node_config": {
"general": {
# general options
},
"lowpass_filter": {
"length": 5
},
"eval_error": {
"scale": 2.4,
"offset": 273.14
},
"pid": {
# something reasonable here
}
}
Again, the names of nodes need to match.
Options in the general
field are issued to all nodes.
Do not specify length
here.
Finally, the pipeline needs a name
"name": "example_pipeline"
If you aren't doing this from the website you'll also need to specify status
(inactive
), depends_on
(a list of the names of the sensors this pipeline needs), and silent_until
(0
). I recommend using the website because it will update the cross-references.
All together, the config looks like this. Note that fields automatically added by the website are not listed.
{
"name": "example_pipeline",
"node_config": {
"lowpass_filter": {
"length": 5
},
"eval_error": {
"scale": 2.4,
"offset": 273.14
},
"pid": {
# something reasonable here
}
},
"pipeline": [
{
"name": "source_temp",
"type": "InfluxSourceNode",
"input_var": "T_TS_01",
"output_var": "temp_bot"
},
{
"name": "source_sp",
"type": "InfluxSourceNode",
"input_var": "T_TS_03",
"output_var": "temp_sp"
},
{
"name": "filter",
"type": "MedianFilterNode",
"input_var": "temp_bot",
"upstream": ["source_temp"]
},
{
"name": "merge",
"type": "MergeNode",
"upstream": ["filter", "source_sp"]
},
{
"name": "eval_error",
"type": "EvalNode",
"upstream": ["merge"],
"input_var": ["temp_sp", "temp_bot"],
"operation": "v['temp_sp'] - (c['scale'] * v['temp_bot'] + c['offset'])",
"output_var": "err"
},
{
"name": "pid",
"type": "PIDControl",
"upstream": ["eval_error"],
"control_target": "revpi1",
"control_value": "thermosiphon_heater",
"input_var": "err"
}
]
}
Note that the PIDControl node doesn't exist as of writing, but I hope it's otherwise clear.
Nodes take some options during construction and others during runtime. Construction options must be specified in the pipeline
field, runtime or config options in the node_config
field.
Construction options:
-
type
: the type of node (probably listed on this page) -
upstream
: a list of names of upstream nodes, required for everything except SourceNodes -
input_var
: the name (ie, Doberman sensor name, oroutput_var
of something further up the graph) of the input variable this node expects. MergeNodes ignore this field, EvalNodes require a list of such quantities. Not all nodes will actually use this field but it must be specified (blank is ok in these cases). - (Optional)
output_var
: the name of the output variable. Defaults toinput_var
Config options:
Do not specify length
as a config value at the highest scope (ie, node_config.general
). Things will behave in unexpected ways. This field must be specified only for specific nodes. Different types of node will do different things with this value.
This node pulls the most recent value from Influx.
Config options:
None
This node filters its input by taking the median of its buffer.
Construction options:
-
strict_length
: is the node allowed to run without a full buffer (defaults to False if not set)
Config options:
-
length
: The median of this number of values will be propagated downstream. If an even number is specified, the average of the two numbers adjacent to the middle is used.
This node merges streams originating from different SourceNodes. You can use them whenever, but they are required when you have multiple SourceNodes otherwise the timestamps will go wonky.
Construction options:
-
merge_how
: how do you want timestamps (and other common fields) to be merged? Options areavg min max newest oldest
. Default isavg
.
Config options:
None. Do not allow a length
field to be passed to this node (most probably by specifying length
in node_config.general
).
This node calculates the average of its input buffer using the trapezoid rule. The result is scaled to the time interval present in the data.
Construction options:
-
strict_length
: is the node allowed to run without a full buffer (defaults to False if not set)
Config options:
-
length
: how many values the buffer should contain (not time, you'll need to do the math yourself). Don't set it to 1 or it will crash -
t_offset
: how many of the most recent values do you skip? Useful for effecting a delayed response.
This node calculates the derivative of its input buffer via a least-squares fit.
Construction options:
-
strict_length
: is the node allowed to run without a full buffer (defaults to False if not set)
Config options:
-
length
: how many items the buffer should contain. Don't set this to 1 or it will crash.
This node performs some polynomial transformation of its input.
Config options:
-
transform
: a polynomial transform by which to multiply the integral in the formsum(a_i*x**i)
, wherex
is the input value anda_i
are the polynomial coefficients.[0,1]
gives the unchanged value, keep adding more terms for a higher-order polynomial. Little-endian ordering (ie, first term is the constant, second term is the linear coefficient, third quadratic, etc). Technically optional, but kinda pointless otherwise
This node writes a value back to Influx. The sensor name will be the output_var
and the value will be the input_var
.
You must specify output_var
as a valid (=existing!!!) Sensor name, and the corresponding sensor document will determine the organisational parameters.
Construction options:
None
Config options:
None
These nodes throw an error if a value shows up later than it's supposed to. While the DeviceRespondingInfluxNode
queries the database for new values, the DeviceRespondingSyncNode
gets them directly from the device through the internal communication system.
Config options:
None
This node checks a remote heartbeat file and issues alarms to the phone numbers in the file when the timestamp is too old.
Config options:
-
silence_duration
: how long should the pipeline be silenced after this node issued an alarm -
max_delay_sms
: how old in seconds can the timestamp be before this node sends SMS to the given phone numbers (defaults to 180s if not set) -
max_delay_phone
: how old in second can the timestamp be before this node sends SMS and phone calls to the given phone numbers (defaults to 600s if not set) -
directory
: Where is the remote heartbeat file located (defaults to/scratch
if not set) -
experiment_name
: look at the file calledremote_hb_<experiment_name>
This node does a simple alarm calculation, based on low <= val <= high
Config options:
None
This node does checks whether an integer value is equal to one of the defined alarm_values
.
Config options:
None
Sometimes a states is given as a byte where each bit stands for a different status (e.g. 6 = 110 -> status 0 is OFF, status 1 and status 2 are ON).
Config options:
None
This node if the alarm_is_triggered
field of any sensor in sensors_to_check
is True
and returns 1 or 0, accordingly.
Config options:
-
sensors_to_check
: Either list of sensor names orany
to check all sensors in the database
Checks whether a sensor was at a alarm_value
for longer than a max_duration
Construction options: None
Config options:
-
alarm_value
: value that should trigger the alarm -
max_duration
: maximal duration for which the value is allowed to be at alarm_value before logging an alarm -
alarm_level
: base level of the alarm (note we don't take the one from the sensor since this is reserved for the IntegerAlarmNode (SimpleAlarmNode)).
This node executes a user-specified operation. Input values are collected into a dict named v
and config values into a dict named c
, so the operation can access them. For instance, v['input_var1'] + 0.3*math.sin(v['input_var2'] + c['offset'])
is a valid two-input operation. The math library is available.
Construction options:
-
operation
: the mathematical operation you want performed. For reasons, it'll work best if you use single quotes inside the string. -
input_var
: this needs to be a list of names ofoutput_var
quantities from further up the processing chain -
output_var
: if you don't include this expect crashes
Config options:
-
c
: a dict of constants to be used in your operation.
This covers things like the valve control nodes and anything else wishing to affect a change in the system.
Construction options:
-
control_target
: the name of the sensor that handles thing that needs to be controlled, egrevpi1
-
control_value
: the name of the quantity that is being controlled, for instancefast_cooling_valve
Config options:
-
default_output
: What should this node do if something goes wrong during pipeline operation? The value of this field (if present) will be used as an output, so you can have valves close etc when things go awry. If not specified, disable this feature.
Additional arguments determined by the child class.
This node controls a binary output, probably a solenoid valve, but could be a pump or other thing that has a binary operational states. The input can be either a single variable (which causes the output to be set to 1 if it evaluates to True, and 0 if to False) or two variables (one for setting output to 1, one for setting ouput to 0, if both are false the output is left untouched).
Construction options:
See 'Generic Control Nodes'
Config options:
-
one_input
: If 1 (or evaluates True), expect a single input variable which defines the output (True: 1, False: 0). If 0 (or evaluates False), two input variables are needed. One decides when the output goes on, one when it goes off. If both input variables are False, the output is left unchanged. -
input_var
: Either the name of a single input variable from higher up the pipeline (if one_input = 1) or a list of two input variables (if one_input = 0)
This node controls an analog output like a setpoint or something.
Construction options:
See 'Generic Control Nodes'
Config options:
-
min_output
: the minimum value the output should take. Don't specify to disable. -
max_output
: the maximum value the output should take. Don't specify to disable.
This node controls another pipeline based on the value of input variables.
Construction options:
-
actions
: a dictionary of {input_var: actions_if_true}, where actions_if_true is a list of actions to be carried out if input_var evaluates to True. Each item in the list should be a (command, target) pair, where target is the name of the pipeline to which comand is sent. - Unlike other control pipelines,
control_value
andcontrol_target
are not required.
While the above example forms a single graph, there is no reason why two or more disjoint graphs or sub-pipelines cannot form one "pipeline". This affords some organizational conveniences such as aggregating alarms from certain subsystems. For instance all alarms issued based on UPS parameters can be lumped together into one disjoint UPS alarm pipeline, rather than having one alarm pipeline for each of a half-dozen parameters. Or, automated control for an array of valves can be lumped together, so if the control needs to be paused temporarily, it only needs to be done once rather than once for each valve. A couple of additional features are provided to support such disjoint pipelines. In the case of an error in a sub-pipeline, only that sub-pipeline is affected; other sub-pipelines will run as "normal". For disjoint alarm pipelines, if it has been automatically silenced due to the logging of an alarm, and another alarm at a higher level comes, this second alarm will be issued. For instance, if a level 1 alarm gets logged, and then a level 2 alarm while the pipeline is still silenced after the level 1, the level 2 alarm will be issued, and the "silence level" increased to 2.