Skip to content

Pipelines

Jaron Grigat edited this page May 22, 2024 · 24 revisions

Brief

Pipelines are a framework to allow for complex processing tasks to be done on the data. Alarm processing is implemented as a pipeline, PID control is a pipeline, and other tasks can also be done. This framework also allows for data from multiple sensors to be considered in the generation of alarms, which was not possible previously. Basically almost any complex automatable task that relies on the freshest of data can be done via the Pipeline framework.

Usage

Pipelines have one of two states: active and inactive. silent_until defines whether a pipeline is silent or not. If it is a timestamp that is in the future or -1 (silenced forever), anything that involves affecting the outside world is suppressed. This means that silent control pipelines keep their buffers full and are appraised of developments in whatever they control, but don't actually do anything. Silent alarm pipelines keep watching for deviations from normal conditions but don't log new alarms.

Pipelines only run as fast as the sensors they depend on. Various problems show up if identical timestamps start appearing for the same quantity, so an input node will throw an error if it gets the same value twice from the database. If you speed up the readout_interval of the sensors a pipeline depends on, the pipeline will speed up to match automatically; pipelines don't have an independent processing interval. Additionally, the pipeline does have a constant drift of 1ms per cycle (more if we miss getting a new value), because you don't want a pipeline to get slightly ahead of the sensors it needs.

When pipelines start it takes a few cycles for the internal buffers to fill up. If one node errors out, that process cycle is terminated, and some buffer nodes will error if they don't have enough in the buffer. Pipelines automatically silence themselves for as many cycles as they think are necessary.

How to create a pipeline

A pipeline is a directed graph. You'll want to start with drawing a picture of what that graph should look like. As an example we'll look at a PID feedback controller for the thermosiphon. Normally this is the job of the cryocon, but that's why this is an example.

First, what data inputs are we going to need? For the thermosiphon we want the temperature of the bottom copper block (T_TS_01) and the temperature setpoint (T_TS_03). Let's also low-pass filter the temperature so we don't try to control on noise. The variable we want to control on is the difference between the setpoint and current value (known as the "error"), so we'll want to calculate that. Then we'll need the PID node that actually does the PID control of the heater. Here's what the graph could look like:

Basic graph

There are a couple of rules that must be followed. First, graph nodes inputting data into the pipeline must be endpoints, ie you can't have anything upstream of them, so we aren't able to put bottom temp and bottom setpoint into the same leg of the graph. Second, to join input streams coming from different input nodes you have to do a merging operation. This is how the pipeline deals with the inputs all having their own timestamps, and other rare cases where you have multiple variables with the same name floating around. Third, this is a directed graph. Data flows from input nodes to things downstream of them. Nodes only have at best a vague idea of the existence of things downstream.

With these rules in mind, we can transform Figure 1 into something more concrete.

More concrete graph

We have two InfluxSourceNodes because we're pulling two values from Influx. We lowpass filter the temperature, merge the two input streams, evaluate the error, and then let the PID controller do its magic. Now we need to turn this into a configuration that Doberman will understand. There are, for the most part, three different parts of the config and we'll look at each in turn.

First is the pipeline definition, where we tell Doberman what the graph looks like. All nodes are required to specify a type and a name, most need an input_var, and who is upstream of them. See below for the complete documentation.

"pipeline": [
    {
        "name": "source_temp",
        "type": "InfluxSourceNode",
        "input_var": "T_TS_01",
        "output_var": "temp_bot"
    },
    {
        "name": "source_sp",
        "type": "InfluxSourceNode",
        "input_var": "T_TS_03",
        "output_var": "temp_sp"
    },
    {
        "name": "lowpass_filter",
        "type": "LowPassFilter",
        "input_var": "temp_bot",
        "upstream": ["source_temp"]
    },
    {
        "name": "merge",
        "type": "MergeNode",
        "upstream": ["lowpass_filter", "source_sp"],
        "input_var": "",
    },
    {
        "name": "eval_error",
        "type": "EvalNode",
        "upstream": ["merge"],
        "input_var": ["temp_sp", "temp_bot"],
        "operation": "v['temp_sp'] - (c['scale'] * v['temp_bot'] + c['offset'])",
        "output_var": "err"
    },
    {
        "name": "pid",
        "type": "PIDControl",
        "upstream": ["eval_error"],
        "control_target": "revpi1",
        "control_value": "thermosiphon_heater",
        "input_var": "err"
    }
]

Important things:

  • The order in which you specify nodes doesn't matter.
  • The PIDControl node doesn't exist at time of writing.
  • The quotes in the operation of the EvalNode are important. Use single quotes inside the string, as shown above.
  • upstream must be a list, even if there's only one node, and it references the names of other nodes. If you don't specify unique names for nodes in a pipeline, expect things to behave strangely.
  • Things specified in the pipeline field can't be changed without restarting the pipeline.
  • The filter could be downstream of the merge node without affecting the overall behavior.

Many nodes also take "runtime" parameters that are updated before every cycle of the pipeline. For a full description see below. The nodes here take the following options:

"node_config": {
    "general": {
        # general options
    },
    "lowpass_filter": {
        "length": 5
    },
    "eval_error": {
        "scale": 2.4,
        "offset": 273.14
    },
    "pid": {
        # something reasonable here
    }
}

Again, the names of nodes need to match. Options in the general field are issued to all nodes. Do not specify length here.

Finally, the pipeline needs a name

"name": "example_pipeline"

If you aren't doing this from the website you'll also need to specify status (inactive), depends_on (a list of the names of the sensors this pipeline needs), and silent_until (0). I recommend using the website because it will update the cross-references.

All together, the config looks like this. Note that fields automatically added by the website are not listed.

{
    "name": "example_pipeline",
    "node_config": {
        "lowpass_filter": {
            "length": 5
        },
        "eval_error": {
            "scale": 2.4,
            "offset": 273.14
        },
        "pid": {
            # something reasonable here
        }
    },
    "pipeline": [
        {
            "name": "source_temp",
            "type": "InfluxSourceNode",
            "input_var": "T_TS_01",
            "output_var": "temp_bot"
        },
        {
            "name": "source_sp",
            "type": "InfluxSourceNode",
            "input_var": "T_TS_03",
            "output_var": "temp_sp"
        },
        {
            "name": "filter",
            "type": "MedianFilterNode",
            "input_var": "temp_bot",
            "upstream": ["source_temp"]
        },
        {
            "name": "merge",
            "type": "MergeNode",
            "upstream": ["filter", "source_sp"]
        },
        {
            "name": "eval_error",
            "type": "EvalNode",
            "upstream": ["merge"],
            "input_var": ["temp_sp", "temp_bot"],
            "operation": "v['temp_sp'] - (c['scale'] * v['temp_bot'] + c['offset'])",
            "output_var": "err"
        },
        {
            "name": "pid",
            "type": "PIDControl",
            "upstream": ["eval_error"],
            "control_target": "revpi1",
            "control_value": "thermosiphon_heater",
            "input_var": "err"
        }
    ]
}

Note that the PIDControl node doesn't exist as of writing, but I hope it's otherwise clear.

Node options

Nodes take some options during construction and others during runtime. Construction options must be specified in the pipeline field, runtime or config options in the node_config field.

Common options

Construction options:

  • type: the type of node (probably listed on this page)
  • upstream: a list of names of upstream nodes, required for everything except SourceNodes
  • input_var: the name (ie, Doberman sensor name, or output_var of something further up the graph) of the input variable this node expects. MergeNodes ignore this field, EvalNodes require a list of such quantities. Not all nodes will actually use this field but it must be specified (blank is ok in these cases).
  • (Optional) output_var: the name of the output variable. Defaults to input_var

Config options:

Do not specify length as a config value at the highest scope (ie, node_config.general). Things will behave in unexpected ways. This field must be specified only for specific nodes. Different types of node will do different things with this value.

InfluxSourceNode

This node pulls the most recent value from Influx.

Config options:

None

MedianFilterNode

This node filters its input by taking the median of its buffer.

Construction options:

  • strict_length: is the node allowed to run without a full buffer (defaults to False if not set)

Config options:

  • length: The median of this number of values will be propagated downstream. If an even number is specified, the average of the two numbers adjacent to the middle is used.

MergeNode

This node merges streams originating from different SourceNodes. You can use them whenever, but they are required when you have multiple SourceNodes otherwise the timestamps will go wonky.

Construction options:

  • merge_how: how do you want timestamps (and other common fields) to be merged? Options are avg min max newest oldest. Default is avg.

Config options:

None. Do not allow a length field to be passed to this node (most probably by specifying length in node_config.general).

IntegralNode

This node calculates the average of its input buffer using the trapezoid rule. The result is scaled to the time interval present in the data.

Construction options:

  • strict_length: is the node allowed to run without a full buffer (defaults to False if not set)

Config options:

  • length: how many values the buffer should contain (not time, you'll need to do the math yourself). Don't set it to 1 or it will crash
  • t_offset: how many of the most recent values do you skip? Useful for effecting a delayed response.

DerivativeNode

This node calculates the derivative of its input buffer via a least-squares fit.

Construction options:

  • strict_length: is the node allowed to run without a full buffer (defaults to False if not set)

Config options:

  • length: how many items the buffer should contain. Don't set this to 1 or it will crash.

PolynomialNode

This node performs some polynomial transformation of its input.

Config options:

  • transform: a polynomial transform by which to multiply the integral in the form sum(a_i*x**i), where x is the input value and a_i are the polynomial coefficients. [0,1] gives the unchanged value, keep adding more terms for a higher-order polynomial. Little-endian ordering (ie, first term is the constant, second term is the linear coefficient, third quadratic, etc). Technically optional, but kinda pointless otherwise

InfluxSinkNode

This node writes a value back to Influx. The sensor name will be the output_var and the value will be the input_var. You must specify output_var as a valid (=existing!!!) Sensor name, and the corresponding sensor document will determine the organisational parameters.

Construction options:

None

Config options:

None

DeviceRespondingInfluxNode and DeviceRespondingSyncNode

These nodes throw an error if a value shows up later than it's supposed to. While the DeviceRespondingInfluxNode queries the database for new values, the DeviceRespondingSyncNode gets them directly from the device through the internal communication system.

Config options:

None

CheckRemoteHeartbeatNode

This node checks a remote heartbeat file and issues alarms to the phone numbers in the file when the timestamp is too old.

Config options:

  • silence_duration: how long should the pipeline be silenced after this node issued an alarm
  • max_delay_sms: how old in seconds can the timestamp be before this node sends SMS to the given phone numbers (defaults to 180s if not set)
  • max_delay_phone: how old in second can the timestamp be before this node sends SMS and phone calls to the given phone numbers (defaults to 600s if not set)
  • directory: Where is the remote heartbeat file located (defaults to /scratch if not set)
  • experiment_name: look at the file called remote_hb_<experiment_name>

SimpleAlarmNode

This node does a simple alarm calculation, based on low <= val <= high

Config options:

None

IntegerAlarmNode

This node does checks whether an integer value is equal to one of the defined alarm_values.

Config options:

None

BitmaskIntegerAlarmNode

Sometimes a states is given as a byte where each bit stands for a different status (e.g. 6 = 110 -> status 0 is OFF, status 1 and status 2 are ON).

Config options:

None

TriggeredAlarmsNode

This node if the alarm_is_triggered field of any sensor in sensors_to_check is True and returns 1 or 0, accordingly.

Config options:

  • sensors_to_check: Either list of sensor names or any to check all sensors in the database

TimeSinceAlarmNode

Checks whether a sensor was at a alarm_value for longer than a max_duration

Construction options: None

Config options:

  • alarm_value: value that should trigger the alarm
  • max_duration: maximal duration for which the value is allowed to be at alarm_value before logging an alarm
  • alarm_level: base level of the alarm (note we don't take the one from the sensor since this is reserved for the IntegerAlarmNode (SimpleAlarmNode)).

EvalNode

This node executes a user-specified operation. Input values are collected into a dict named v and config values into a dict named c, so the operation can access them. For instance, v['input_var1'] + 0.3*math.sin(v['input_var2'] + c['offset']) is a valid two-input operation. The math library is available.

Construction options:

  • operation: the mathematical operation you want performed. For reasons, it'll work best if you use single quotes inside the string.
  • input_var: this needs to be a list of names of output_var quantities from further up the processing chain
  • output_var: if you don't include this expect crashes

Config options:

  • c: a dict of constants to be used in your operation.

Generic control nodes

This covers things like the valve control nodes and anything else wishing to affect a change in the system.

Construction options:

  • control_target: the name of the sensor that handles thing that needs to be controlled, eg revpi1
  • control_value: the name of the quantity that is being controlled, for instance fast_cooling_valve

Config options:

  • default_output: What should this node do if something goes wrong during pipeline operation? The value of this field (if present) will be used as an output, so you can have valves close etc when things go awry. If not specified, disable this feature.

Additional arguments determined by the child class.

DigitalControlNode

This node controls a binary output, probably a solenoid valve, but could be a pump or other thing that has a binary operational states. The input can be either a single variable (which causes the output to be set to 1 if it evaluates to True, and 0 if to False) or two variables (one for setting output to 1, one for setting ouput to 0, if both are false the output is left untouched).

Construction options:

See 'Generic Control Nodes'

Config options:

  • one_input: If 1 (or evaluates True), expect a single input variable which defines the output (True: 1, False: 0). If 0 (or evaluates False), two input variables are needed. One decides when the output goes on, one when it goes off. If both input variables are False, the output is left unchanged.
  • input_var: Either the name of a single input variable from higher up the pipeline (if one_input = 1) or a list of two input variables (if one_input = 0)

AnalogControlNode

This node controls an analog output like a setpoint or something.

Construction options:

See 'Generic Control Nodes'

Config options:

  • min_output: the minimum value the output should take. Don't specify to disable.
  • max_output: the maximum value the output should take. Don't specify to disable.

PipelineControlNode

This node controls another pipeline based on the value of input variables.

Construction options:

  • actions: a dictionary of {input_var: actions_if_true}, where actions_if_true is a list of actions to be carried out if input_var evaluates to True. Each item in the list should be a (command, target) pair, where target is the name of the pipeline to which comand is sent.
  • Unlike other control pipelines, control_value and control_target are not required.

Disjoint pipelines

While the above example forms a single graph, there is no reason why two or more disjoint graphs or sub-pipelines cannot form one "pipeline". This affords some organizational conveniences such as aggregating alarms from certain subsystems. For instance all alarms issued based on UPS parameters can be lumped together into one disjoint UPS alarm pipeline, rather than having one alarm pipeline for each of a half-dozen parameters. Or, automated control for an array of valves can be lumped together, so if the control needs to be paused temporarily, it only needs to be done once rather than once for each valve. A couple of additional features are provided to support such disjoint pipelines. In the case of an error in a sub-pipeline, only that sub-pipeline is affected; other sub-pipelines will run as "normal". For disjoint alarm pipelines, if it has been automatically silenced due to the logging of an alarm, and another alarm at a higher level comes, this second alarm will be issued. For instance, if a level 1 alarm gets logged, and then a level 2 alarm while the pipeline is still silenced after the level 1, the level 2 alarm will be issued, and the "silence level" increased to 2.