Skip to content
Tim Paine edited this page Jan 27, 2024 · 37 revisions

csp ( Composable Stream Processing ) is a functional-like reactive language that makes time-series stream processing simple to do.  The main reactive engine is a c++ based engine which has been exposed to python ( other languages may optionally be extended in future versions ).  CSP applications define a connected graph of components using a declarative language ( which is essentially python ).  Once a graph is constructed it can be run using the c++ engine.  Graphs are composed of some number of "input" adapters, a set of connected calculation "nodes" and at the end sent off to "output" adapters.  Inputs as well as the engine can be seamlessly run in simulation mode using historical input adapters or in realtime mode using realtime input adapters.

Graph building concepts

When writing csp code there will be runtime components in the form of csp.node methods, as well as graph-building components in the form of csp.graph components.

It is important to understand that csp.graph components will only be executed once at application startup in order to construct the graph.  Once the graph is constructed, csp.graph code is no longer needed.  Once the graph is run, only inputs, csp.nodes and outputs will be active as data flows through the graph, driven by input ticks.  For example, this is a simple bit of graph code:

import csp
from csp import ts
from datetime import datetime


@csp.node
def spread(bid: ts[float], ask: ts[float]) -> ts[float]:
    if csp.valid(bid, ask):
        return ask - bid


@csp.graph
def my_graph():
    bid = csp.const(1.0)
    ask = csp.const(2.0)
    bid = csp.multiply( bid, csp.const(4) )
    ask = csp.multiply( ask, csp.const(3) )
    s = spread(bid, ask)

    csp.print('spread', s)
    csp.print('bid', bid)
    csp.print('ask', ask)


if __name__ == '__main__':
    csp.run(my_graph, starttime=datetime.utcnow())

In this simple example my_graph is defined as a csp.graph component.  This method will be called once by csp.run in order to construct the graph.  csp.const defines a constant value as a timeseries which will tick once upon startup, this is effectively an input.

bid = csp.multiply(bid, csp.const(4) ) will insert a csp.multiply node to do timeseries multiplication.  bid and ask are then connected to the user defined spread csp.node.  bid/ask and the calculated spread are then linked to the csp.print output to print the results.

In order to help visualize this graph, you can call csp.show_graph:

359407708

The result of this would be:

2020-04-02 15:33:38.256724 bid:4.0  
2020-04-02 15:33:38.256724 ask:6.0  
2020-04-02 15:33:38.256724 spread:2.0

Anatomy of a csp.node

The heart of a calculation graph are the csp.nodes that run the computations.  csp.node methods can take any number of scalar and timeseries arguments, and can return 0 → N timeseries outputs.  Timeseries inputs/outputs should be thought of as the edges that connect components of the graph.  These "edges" can tick whenever they have a new value.  Every tick is associated with a value and the time of the tick.  csp.nodes can have various other features, here is a an example of a csp.node that demonstrates many of the features.  Keep in mind that nodes will execute repeatedly as inputs tick with new data.  They may ( or may not ) generate an output as a result of an input tick

@csp.node
def demo_node( n : int, xs : ts[ float ], ys : ts[ float ] ) -> ts[ float ]:

    with csp.alarms():
        # Define an alarm time-series of type bool
        alarm = csp.alarm( bool )

    with csp.state():
        # Create a state variable bound to the node
        s_sum = 0.0

    with csp.start():
        # Code block that executes once on start of the engine
        # one can set timeseries properties here as well, such as
        # csp.set_buffering_policy( xs, tick_count = 5 )
        # csp.set_buffering_policy( xs, tick_history = timedelta( minutes = 1 ) )
        # csp.make_passive( xs )
        csp.schedule_alarm( alarm, timedelta( seconds = 1 ), True )

    with csp.stop():
        # code block to execute when the engine is done

    if csp.ticked( xs, ys ) and csp.valid( xs,ys ):
        s_sum += xs * ys   

    if csp.ticked(alarm):
        csp.schedule_alarm( alarm, timedelta( seconds = 1 ), True )
        return s_sum

Lets review line by line

1) Every csp node must start with the @csp.node decorator

2) CSP nodes are fully typed and type-checking is strictly enforced.  All arguments must be typed, as well as all outputs. Outputs are typed using function annotation syntax. 

Single outputs can be unnamed, for multiple outputs they must be named. When using multiple outputs, annotate the type using def my_node(inputs) → csp.Outputs(name1=ts[<T>], name2=ts[<V>]) where T and V are the respective types of name1 and name2.

Note the syntax of timeseries inputs, they are denoted by ts[ type ].  Scalars can be passed in as regular types, in this example we pass in n which expects a type of int

4) with csp.alarms()- nodes can (optionally) declare internal alarms, every instance of the node will get its own alarm that can be scheduled and act just like a timeseries input. All alarms must be declared within the alarms context.

6) Instantiate an alarm in the alarms context using the csp.alarm( typ ) function. This creates an alarm which is a time-series of type typ.

8) with csp.state() - optional state variables can be defined under the state context. Note that variables declared in state will live across invocations of the method.

10) An example declaration and initialization of state variable s_sum. It is good practice to name state variables with s_name. 

12) with csp.start()- an optional block to execute code at the start of the engine.  Generally this is used to setup initial timers or set input timeseries properties such as buffer sizes, or to make inputs passive

15-16) csp.set_buffering_policy - nodes can request a vertain amount of history be kept on the incoming time series, this can be denoted in number of ticks or in time.  By setting a buffering policy, nodes can access historical values of the timeseries ( by default only the last value is kept )

17) csp.make_passive / csp.make_active - Nodes may not need to react to all of their inputs, they may just need their latest value.  For performance purposes the node can mark an input as passive to avoid triggering the node unnecessarily.  make_Active can be called to reactivate an input.

18) csp.schedule_alarm - scheduled a one-shot tick on the given alarm input.  The values given are the timedelta before the alarm triggers and the value it will have when it triggers.  Note that schedule_alarm can be called multiple times on the same alarm to schedule multiple triggers

20) with csp.stop() is an optional block that can be called when the engine is done running

23) all nodes will have if conditions to react to different inputs.  csp.ticked() takes any number of inputs and returns true if any of the inputs ticked.  csp.valid similar takes any number of inputs however it only returns true if all inputs are valid.  Valid means that an input has had at least one tick and so it has a "current value".

24) One of the benefits of CSP is that you always have easy access to the latest value of all inputs.  xs and ys on line 18 will always have the latest value of both inputs, even if only one of them just ticked.

26) This demonstrates how an alarm can be treated like any other input

28) We tick our running "sum" as an output here every second.

Basket inputs

In addition to single time-series inputs, a node can also accept a basket of time series as an argument.  A basket is essentially a collection of timeseries which can be passed in as a single argument.  Baskets can either be list baskets or dict baskets.  Individual timeseries in a basket can tick independently, and they can be looked at and reacted to individually or as a collection.  For example:

@csp.node
def demo_basket_node( list_basket : [ ts[ int ] ], dict_basket : { str : ts[ int ] } ) -> ts[ float ]:
    
    if csp.ticked( list_basket ):
        return sum( list_basket.validvalues() )
    
    if csp.ticked( list_basket[ 3 ] ):
        return list_basket[3]

    if csp.ticked( dict_basket ):
        # can iterate over ticked key,items
        # for key,value in dict_basket.tickeditems(): ...
        return sum( dict_basket.tickedvalues() )

2) Note the syntax of basket inputs.  list baskets are noted as [ts[type]]  ( a list of time series ) and dict baskets are {key_type : ts[ts_type]} ( a dictionary of timeseries keyed by type key_type  ).

4) Just like single timeseries, we can react to a basket if it ticked.  The convention is the same as passing multiple inputs to csp.ticked, csp.ticked  is true if any basket input ticked.  csp.valid  is true is all basket inputs are valid.

5) baskets have various iterators to access their inputs:

  • tickedvalues - iterator of values of all ticked inputs
  • **tickedkeys   ** - iterator of keys of all ticked inputs ( keys are list index for list baskets )
  • **tickeditems   **- iterator of (key,value) tuples of ticked inputs
  • **validvalues   **- iterator of values of all valid inputs
  • **validkeys     ** - iterator of keys of all valid inputs
  • **validitems     **- iterator of (key,value) tuples of valid inputs
  • keys               - list of keys on the basket ( dictionary baskets only )

6-7) This demonstrates the ability to access an individual element of a basket and react to it as well as access its current value

Node Outputs

Nodes can return any number of outputs ( including no outputs, in which case it is considered an "output" or sink node [ see Graph Pruning ] ).  Nodes with single outputs can return the output as an unnamed output.  Nodes returning multiple outputs must have them be named.  When a node is called at graph building time, if its is a single unnamed node the return variable is an edge representing the output which can be passed into other nodes.  If the outputs are named, the return value is an object with the outputs available as attributes.  For example ( examples below demonstrate various ways to output the data as well )

@csp.node
def single_unnamed_outputs( n : ts[ int ] ) -> ts[ int ]:
    # can either do
    # return value
    # or 
    # csp.output( value ) to continue processes after output


@csp.node
def multiple_named_outputs( n : ts[ int ] ) -> csp.Outputs( y = ts[ int ], z = ts[ float ] ):
    # can do
    # csp.output( y = v, z = v2 ) to output to multiple outputs
    # or separate the outputs to tick out at spearate points:
    # csp.output( y = v )
    # ...
    # csp.output( z = v2 )
    # or can return multiple values with:
    return csp.output( y = v1, z = v2 )

@csp.graph
def my_graph( n : ts[ int ] ):
    x = single_unnamed_outputs( n )
    # x represents the output edge of single_unnamed_outputs, 
    # we can pass it a time series input to other nodes
    csp.print( 'x', x )


    result = multiple_named_outputs( n )
    # result holds all the outputs of multiple_named_outputs, which can be accessed as attributes
    csp.print( 'y', result.y )
    csp.print( 'z', result.z )

Basket Outputs

Similarly to inputs, a node can also produce a basket of timeseries as an output. For example:

class MyStruct( csp.Struct ):
    symbol : str
    index  : int
    value  : float

@csp.node
def demo_basket_output_node( in_: ts[ MyStruct ], symbols: [ str ], num_symbols: int ) -> csp.Outputs(
        dict_basket = csp.OutputBasket(Dict[str, ts[ float ]], shape="symbols"),
        list_basket = csp.OutputBasket(List[ts[ float ]], shape="num_symbols" )
    ):           

    if csp.ticked( in_ ):
        # output to dict basket
        csp.output( dict_basket[in_.symbol], in_.value )
        # csp.output( dict_basket = { in_.symbol: in_.value } ) # alternate output syntax, can output multiple keys at once
        
        # output to list basket
        csp.output( list_basket[in_.index], in_.value )
        # csp.output( list_basket = { in_.index: in_.value } ) # alternate output syntax, can output multiple keys at once

7) Note the output declaration syntax. A basket output can be either named or unnamed (both examples here are named), and its shape can be specified two ways. The shape parameter is used with a scalar value that defines the shape of the basket, or the name of the scalar argument ( a dict basket expects shape to be a list of keys.  lists basket expects shape to be an int ).  shape_of is used to take the shape of an input basket and apply it to the output basket.

12-17) There are several choices for output syntax. The following work for both list and dict baskets:

  • csp.output( basket = { key: value, key2 : value2, ... } ) 
  • csp.output( basket[key], value ) 
  • csp.output( { key: value } ) # only works if the basket is the only output 

Generic Types

CSP supports syntax for generic types as well.  To denote a generic type we use a string ( typically 'T'  is used ) to denote a generic type.  When a node is called the type of the argument will get bound to the given type variable, and further inputs / outputs will be checked and bound to said typevar.  Note that the string syntax '~T'  denotes the argument expects the value of a type, rather than a type itself:

@csp.node
def sample( trigger : ts[ object ], x : ts[ 'T' ] ) -> ts[ 'T' ]:
    '''will return current value of x on trigger ticks'''
    with csp.state():
        csp.make_passive( x )

    if csp.ticked( trigger ) and csp.valid( x ):
        return x


@csp.node
def const( value : '~T' ) -> ts[ 'T' ]:
    ...

sample  takes a timeseries of type 'T'  as an input, and returns a timeseries of type 'T' .  This allows us to pass in a ts[int]  for example, and get a ts[int]  as an output, or ts[bool]  → ts[bool] 

const  takes value as an instance of type T , and returns a timeseries of type T .  So we can call const(5)  and get a ts[int]  output, or const('hello!')  and get a ts[str]  output, etc...

Engine Time

The CSP engine always maintains its current view of time.  The current time of the engine can be accessed at any time within a csp.node by calling csp.now() 

Graph Propagation and Single-dispatch

The CSP Graph propagation algorithm ensures that all nodes are executed once per engine cycle, and in the correct order.  Correct order means, that all input dependencies of a given node are gauranteed to have been evaluated before a given node is executed.  Take this graph for example:

359407953

On a given cycle lets say the bid input ticks.  The CSP Engine will ensure that mid is executed, followed by spread and only once spread's output is updated will quote be called.  When quote executes it will have the latest values of the mid and spread calc for this cycle.  

Graph Pruning

One should note a subtle optimization technique in CSP graphs.  Any part of a graph that is created at graph building time, but is NOT connected to any output nodes, will be pruned from the graph and will not exist during runtime.  An output is defined as either an output adapter or a csp.node without any outputs of its own.  The idea here is that we can avoid doing work if it doesn't result in any output being generated.  In general its best practice for all csp.nodes to be *side-effect free, *in other words they shouldn't mutate any state outside of the node.  Assuming all nodes are side-effect free, pruning the graph would not have any noticeable effects.

Anatomy of a csp.graph

To reiterate, csp.graph methods are called in order to construct the graph and are only executed before the engine is run.  csp.graph methods don't do anything special, they are essentially regular python methods, but they can be defined to accept inputs and generate outputs similar to csp.nodes.  This is solely used for type checking.  csp.graph methods can be created to encapsulate components of a graph, and can be called from other csp.graph methods in order to help facilitate graph building.  Simple example:

@csp.graph
def calc_symbol_pnl( symbol : str, trades : ts[ Trade ] ) -> ts[ float ]:
    # sub-graph code needed to compute pnl for given symbol and symbol's trades
    # sub-graph can subscribe to market data for the symbol as needed
    ...


@csp.graph
def calc_portfolio_pnl( symbols : [ str ] ) -> ts[ float ]:
    symbol_pnl = []
    for symbol in symbols:
        symbol_trades = trade_adapter.subcribe( symbol )
        symbol_pnl.append( calc_symbol_pnl( symbol, symbol_trades )

    return csp.sum( symbol_pnl )

In this simple example we have a csp.graph component calc_symbol_pnl which encapsulates computing pnl for a single symbol.  calc_portfolio_pnl is a graph that computes portfolio level pnl, it invokes the symbol-level pnl calc for every symbol, then sums up the results for the portfolio level pnl

Historical Buffers

CSP provides access to historical input data as well. By default only the last value of an input is kept in memory, however one can request history to be kept on an input either by number of ticks or by time using csp.set_buffering_policy.

The methods csp.value_at, csp.time_at and csp.item_at can be used to retrieve historical input values. Each node should call** csp.set_buffering_policy** to make sure that its inputs are configured to store sufficiently long history for correct implementation. For example, let's assume that we have a stream of data and we want to create equally sized buckets from the data.  A possible implementation of such a node would be:

@csp.node
def data_bin_generator(bin_size: int, input: ts['T']) -> ts[['T']]:
    with csp.start():
        assert bin_size > 0
        # This makes sure that input stores at least bin_size entries
        csp.set_buffering_policy(input, tick_count=bin_size)
    if csp.ticked(input) and (csp.num_ticks(input) % bin_size == 0):
        return [csp.value_at(input, -i) for i in range(bin_size)]

In this example, we use csp.set_buffering_policy(input, tick_count=bin_size) to ensure that the buffer history contains at least bin_size elements.  Note that an input can be shared by multiple nodes, if multiple nodes provide size requirements, the buffer size would be resolved to the maximum size to support all requests.

Alternatively, csp.set_buffering_policy supports a timedelta parameter tick_history instead of tick_count.  If tick_history is provided, the buffer will scale dynamically to ensure that any period of length tick_history will fit into the history buffer. 

To identify when there are enough samples to construct a bin we use csp.num_ticks(input) % bin_size == 0. The function csp.num_ticks returns the number or total ticks for a given time series. NOTE: The actual size of the history buffer is usually less than csp.num_ticks as buffer is dynamically truncated to satisfy the set policy. 

The past values in this example are accessed using csp.value_at. The various historical access methods take the same arguments and return the value, time and tuple of (time,value) respectively:

  • csp.value_at( ts, index_or_time, duplicate_policy = DuplicatePolicy.LAST_VALUE, default = UNSET ) - returns value at requested index_or_time
  • csp.time_at( ts, index_or_time, duplicate_policy = DuplicatePolicy.LAST_VALUE, default = UNSET ) - returns datetime at requested index_or_time
  • csp.item_at( ts, index_or_time, duplicate_policy = DuplicatePolicy.LAST_VALUE, default = UNSET ) - returns tuple of (datetime,value) at requested index_or_time
    • **ts ** - the name of the input
    • **index_or_time: **
      • If providing an index, this represents how many ticks back to rereieve and should be <= 0.  0 indicates the current value, -1 is the previous value, etc. 
      • If providing  time one can either provide a datetime for absolute time, or a timedelta for how far back to access.  NOTE that timedelta must be negative to represent time in the past..
    • duplicate_policy: when requesting history by datetime or timedelta, its possible that there could be multiple values that match the given time.  duplicate_policy can be provided to control the behavior of what to return in this case**.  The default policy is to return the LAST_VALUE that exists at the given time.  **
    • **default:  **value to be returned if the requested time is out of the history bounds (if default is not provided and a request is out of bounds an exception will be raised).

To illustrate the usage of history access using the timedelta indexing, consider a possible implementation of a function that sums up samples taken every second for each periods of n_seconds of the input time series. If the value ticks slower than every second then this implementation could sample the same value more than once (this is just an illustration, it's NOT recommended to use such implementation in real application as it could be implemented more efficiently): 

@csp.node
def sample_sum(n_seconds: int, input: ts[int], default_sample_value: int = 0) -> ts[int]:
    with csp.alarms():
        a = csp.alarm( bool] )
    with csp.start():
        assert n_seconds > 0
        # This makes sure that input stores at least n_seconds seconds
        csp.set_buffering_policy(input, tick_history=timedelta(seconds=n_seconds))
        # Flag the input as passive since we don't need to react to its ticks
        csp.make_passive(input)
        # Schedule the first sample in n_seconds-1 from start, to also capture the initial value
        csp.schedule_alarm(a, timedelta(seconds=n_seconds - 1), True)
    if csp.ticked(a):
        # Schedule the next sample in n_seconds from start
        csp.schedule_alarm(a, timedelta(seconds=n_seconds), True)
        res = 0
        for i in range(n_seconds):
            res += csp.value_at(input, timedelta(seconds=-i), default=default_sample_value)
        return res

Historical Range Access

In similar fashion, the methods csp.values_atcsp.times_at and csp.items_at can be used to retrieve a range of historical input values as numpy arrays. The bin generator example above can be accomplished more efficiently with range access:

@csp.node
def data_bin_generator(bin_size: int, input: ts['T']) -> ts[['T']]:
    with csp.start():
        assert bin_size > 0
        # This makes sure that input stores at least bin_size entries
        csp.set_buffering_policy(input, tick_count=bin_size)
    if csp.ticked(input) and (csp.num_ticks(input) % bin_size == 0):
        return csp.values_at( input, -bin_size + 1, 0 ).tolist()

The past values in this example are accessed using csp.values_at. The various historical access methods take the same arguments and return the value, time and tuple of (times,values) respectively:

  • csp.values_at( ts, start_index_or_time, end_index_or_time, start_index_policy = TimeIndexPolicy.INCLUSIVE, end_index_policy = TimeIndexPolicy.INCLUSIVE ) - returns values in specified range as a numpy array
  • csp.times_at( ts, start_index_or_time, end_index_or_time, start_index_policy = TimeIndexPolicy.INCLUSIVE, end_index_policy = TimeIndexPolicy.INCLUSIVE ) - returns times in specified range as a numpy array
  • csp.items_at( ts, start_index_or_time, end_index_or_time, start_index_policy = TimeIndexPolicy.INCLUSIVE, end_index_policy = TimeIndexPolicy.INCLUSIVE ) - returns a tuple of (times, values) numpy arrays
    • ts - the name of the input
    • start_index_or_time:
      • If providing an index, this represents how many ticks back to retrieve and should be <= 0.  0 indicates the current value, -1 is the previous value, etc. 
      • If providing  time one can either provide a datetime for absolute time, or a timedelta for how far back to access. ** NOTE that timedelta must be negative** to represent time in the past..
      • If None is provided, the range will begin "from the beginning" - i.e., the oldest tick in the buffer.
    • ****end_index_or_time:****same as start_index_or_time
      • If **None **is provided, the range will go "until the end" - i.e., the newest tick in the buffer.
    • **start_index_policy: **only for use with datetime/timedelta as the start and end parameters.
      • TimeIndexPolicy.INCLUSIVE: if there is a tick exactly at the requested time, include it
      • **TimeIndexPolicy.EXCLUSIVE: **if there is a tick exactly at the requested time, exclude it
      • **TimeIndexPolicy.EXTRAPOLATE: **if there is a tick at the beginning timestamp, include it. Otherwise, if there is a tick before the beginning timestamp, force a tick at the beginning timestamp with the prevailing value at the time
    • **end_index_policy: **only for use with datetime/timedelta and the start and end parameters.
      • **TimeIndexPolicy.INCLUSIVE: **if there is a tick exactly at the requested time, include it
      • **TimeIndexPolicy.EXCLUSIVE: **if there is a tick exactly at the requested time, exclude it
      • **TimeIndexPolicy.EXTRAPOLATE: **if there is a tick at the end timestamp, include it. Otherwise, if there is a tick before the end timestamp, force a tick at the end timestamp with the prevailing value at the time

Range access is optimized at the C++ layer and for this reason its far more efficient than calling the single value access methods in a loop, and they should be substituted in where possible.

Below is a rolling average example to illustrate the use of timedelta indexing. Note that timedelta( seconds = -n_seconds ) is equivalent to csp.now() - timedelta( seconds = n_seconds ), since datetime indexing is supported. 

@csp.node
def rolling_average( x : ts[ float ], n_seconds : int ) -> ts[ float ]:
    with csp.start():
        assert n_seconds > 0
        csp.set_buffering_policy( x, tick_history = timedelta( seconds = n_seconds ) )
    if csp.ticked( x ):
        avg = np.mean( csp.values_at( x, timedelta( seconds = -n_seconds ), timedelta( seconds = 0 ),
                                      csp.TimeIndexPolicy.INCLUSIVE, csp.TimeIndexPolicy.INCLUSIVE) )
        csp.output( avg )

When accessing all elements within the buffering policy window like this, it would be more succinct to pass None as the start and end time, but datetime/timedelta allows for more general use (e.g. rolling average between 5 seconds and 1 second ago, or average specifically between 9:30:00 and 10:00:00)

Cyclical graph - csp.feedback

By definition of the graph building code, csp graphs can only produce acyclical graphs,  However, there are many occasions where a cycle may be required.  For example, lets say you want part of your graph to simulate an exchange.  That part of the graph would need to accept new orders and return acks and executions.  However, the acks / executions would likely need to feedback into the same part of the graph that generated the orders.  For this reason, the csp.feedback construct exists.  Using csp.feedback one can wire a feedback as an input to a node, and effectively bind the actual edge that feeds it later in the graph.  Note that internally the graph is still acyclical.  Internally csp.feedback creates a pair of output and input adapters that are bound together.  When a timeseries that is bound to a feedback ticks, it is fed to the feedback which then schedules the tick on its bound input to be executed on the next engine cycle.  The next engine cycle will execute with the same engine time as the cycle that generated it, but it will be evaluated in a subsequent cycle.

  • csp.feedback( ts_type ) - ts_type is the type of the timeseries ( ie int, str ).  this returns an instance of a feedback object
    • out() - this method returns the timeseries edge which can be passed as an input to your node
    • **bind( ts ) **- this method is called to bind an edge as the source of the feedback after the fact

A simple example should help demonstrate a possible usage.  Lets say we want to simulate acking orders that are generated from a node called my_algo.  In addition to generating the orders, my_algo also wants needs to receive the execution reports ( this is demonstrated in example e_13_feedback.py)

The graph code would look something like this:

# Simulate acking an order
@csp.node
def my_exchange( order:ts[Order]) -> ts[ExecReport]:
    # ... impl details ...

@csp.node
def my_algo( exec_report:ts[ExecReport]) -> ts[Order]:
    # .. impl details ...

@csp.graph
def my_graph():
    # create the feedback first so that we can refer to it later
    exec_report_fb = csp.feedback( ExecReport )

    # generate orders, passing feedback out() which isn't bound yet
    orders = my_algo(exec_report_fb.out() )

    # get exec_reports from "simulator"
    exec_report = my_exchange( orders )

    # now bind the exec reports to the feedback, finishing the "loop"
    exec_report_fb.bind( exec_report )

The graph would end up looking like this.  It remains acyclical, but the FeedbackOutputDef is bound to the FeedbackInputDef here, any tick to out will push the tick to in on the next cycle:

366521848

Collecting Graph Outputs

If the csp.graph passed to csp.run has outputs, the full timeseries will be returned from csp.run like so:

outputs example

import csp
from datetime import datetime, timedelta

@csp.graph
def my_graph() -> ts[int]:
    return csp.merge(csp.const(1), csp.const(2, timedelta(seconds=1)))

if __name__ == '__main__':
    res = csp.run(my_graph, starttime=datetime(2021,11,8))
    print(res)

result:

{0: [(datetime.datetime(2021, 11, 8, 0, 0), 1), (datetime.datetime(2021, 11, 8, 0, 0, 1), 2)]}

Note that the result is a list of (datetime, value) tuples.

You can also use csp.add_graph_output to add outputs.  These do not need to be in the top-level graph called directly from csp.run.  This gives the same result:

add_graph_output example

@csp.graph
def my_graph():
    csp.add_graph_output('a', csp.merge(csp.const(1), csp.const(2, timedelta(seconds=1))))

In addition to python outputs like above, you can set the optional csp.run argument output_numpy to True to get outputs as numpy arrays:

numpy outputs

result = csp.run(my_graph, starttime=datetime(2021,11,8), output_numpy=True)

result:

{0: (array(['2021-11-08T00:00:00.000000000', '2021-11-08T00:00:01.000000000'], dtype='datetime64[ns]'), array([1, 2], dtype=int64))}

Note that the result there is a tuple per output, containing two numpy arrays, one with the datetimes and one with the values.  

Realtime / Simulation Modes

The csp engine can be run in two flavors, realtime and simulation. 

In simulation mode, the engine is always run at full speed slurping in time-based data from its input adapters and running them through the graph.All inputs in simulation are driven off the provided timestamped data of its inputs.

In realtime mode, the engine runs in wallclock time as of "now".  Realtime engines can get data from realtime adapters which source data on separate threads and pass them through to the engine ( ie think of activeMQ events happening on an activeMQ thread and being passed along to the engine in "realtime" ).

Since engines can run in both simulated and realtime mode, users should always use csp.now() to get the current time in csp.nodes

Simulation Mode

Simulation mode is the default mode of the engine . As stated above, simulation mode is used when you want your engine to crunch through historical data as fast as possible.  In simulation mode, the engine runs on some historical data that is fed in through various adapters.  The adapters provide events by time, and they are streamed into the engine via the adapter timeseries in time order.  csp.timer and csp.node alarms are scheduled and executed in "historical time" as well.  Note that there is no strict requirement for simulated runs to run on historical dates.  As long as the engine is not in realtime mode, it remains in simulation mode until the provided endtime, even if endtime is in the future.

Realtime Mode

Realtime mode is opted into by passing realtime=True to csp.run(...).  When run in realtime mode, the engine will run in simulation mode from the provided starttime → wallclock "now" as of the time of calling run.  Once the simulation run is done, the engine switches into realtime mode.  Under realtime mode, external realtime adapters will be able to send data into the engine thread.  All time based inputs such as csp.timer and alarms will switch to executing in wallclock time as well.

As always, csp.now() should still be used in csp.node code, even when running in realtime mode.  csp.now() will be the time assigned to the current engine cycle.

csp.PushMode

When consuming data from input adapters there are three choices on how one can consume the data:

PushMode EngineMode Description
LAST_VALUE Simulation all ticks from input source with duplicate timestamps ( on the same timeseries ) will tick once with the last value on a given timestamp
  Realtime all ticks that occurred since previous engine cycle will collapse / conflate to the latest value
NON_COLLAPSING Simulation all ticks from input source with duplicate timestamps ( on the same timeseries ) will tick once per engine cycle. subsequent cycles will execute with the same time
  Realtime all ticks that occurred since previous engine cycle will be ticked across subsequent engine cycles as fast as possible
BURST Simulation all ticks from input source with duplicate timestamps ( on the same timeseries ) will tick once with a list of all values
  Realtime all ticks that occurred since previous engine cycle will tick once with a list of all the values

Realtime Group Event Synchronization

The CSP framework supports properly synchronizing events across multiple timeseries that are sourced from the same realtime adapter.  A classical example of this is a market data feed.  Say you consume bid, ask and trade as 3 separate time series for the same product / exchange.  Since the data flows in asynchronously from a separate thread, bid, ask and trade events could end up executing in the engine at arbitrary slices of time, leading to crossed books and trades that are out of range of the bid/ask.  The engine can properly provide a correct synchronous view of all the inputs, regardless of their PushModes.  Its up to adapter implementations to determine which inputs are part of a synchronous "PushGroup".  

Here's a classical example.  An Application wants to consume conflating bid/ask as LAST_VALUE but it doesn't want to conflate trades, so its consumed as NON_COLLAPSING. 

Lets say we have this sequence of events on the actual market data feed's thread, coming in one the wire in this order.  The columns denote the time the callbacks come in off the market data thread.

Event T T+1 T+2 T+3 T+4 T+5 T+6
BID 100.00 100.01
99.97 99.98 99.99
ASK 100.02
100.03


100.00
TRADE

100.02

100.03

Without any synchronization you can end up with nonsensical views based on random timing.  Here's one such possibility ( bid/ask are still LAST_VALUE, trade is NON_COLLAPSING ). 

Over here ET is engine time.  Lets assume engine had a huge delay and hasnt processed any data submitted above yet. Without any synchronization, bid/ask would completely conflate, and trade would unroll over multiple engine cycles

Event ET ET+1
BID 99.99
ASK 100.00
TRADE 100.02 100.03

However, since market data adapters will group bid/ask/trade inputs together, the engine wont let bid/ask events advance ahead of trade events since trade is NON_COLLAPSING.  NON_COLLAPSING inputs will essentially act as a barrier, not allowing events ahead of the barrier tick before the barrier is complete.  Lets assume again that the engine had a huge delay and hasn't processed any data submitted above.  With proper barrier synchronizations the engine cycles would look like this under the same conditions:

Event ET ET+1 ET+2
BID 100.01 99.99
ASK 100.03
100.00
TRADE 100.02 100.03

Note how the last ask tick of 100.00 got held up to a separate cycle (ET+2) so that trade could tick with the correct view of bid/ask at the time of the second trade (ET+1)

As another example, lets say the engine got delayed briefly at wire time T, so it was able to process T+1 data.  Similarly it got briefly delayed at time T+4 until after T+6.  The engine would be able to process all data at time T+1, T+2, T+3 and T+6, leading to this sequence of engine cycles.  The equivalent "wire time" is denoted in parenthesis

Event ET (T+1) ET+1 (T+2) ET+2 (T+3) ET+3 (T+5) ET+4 (T+6)
BID 100.01
99.97 99.99
ASK 100.02 100.03

100.00
TRADE
100.02
100.03
Clone this wiki locally