Skip to content

Dynamic Subgraphs with Adapters #379

Closed
@Roh-codeur

Description

@Roh-codeur

Hi, can you please help with the below please, ta!

Describe the bug
I recall that AdapterManager are not yet supported in DynamicGraphManager.

To Reproduce
I am trying to combine the examples: https://github.com/Point72/csp/blob/main/examples/06_advanced/e1_dynamic.py and https://github.com/Point72/csp/blob/main/examples/04_writing_adapters/e5_adaptermanager_pushinput.py as below:

  1. Create adapter manager
  2. Inside a sub-graph, I am hoping I can use the updates from the adapter
from datetime import datetime, timedelta

import csp
from csp import ts


class Order(csp.Struct):
    symbol: str
    size: int
    price: float


@csp.graph
def process_symbol(symbol: str, order: ts[Order], initial_order: Order, timer: ts[MyData], scalar: str) -> ts[int]:
    print("Starting sub-graph to process symbol ", symbol, " with initial order: ", initial_order, " scalar: ", scalar)

    csp.print(symbol + " orders", order)
    csp.print(symbol + " timer", timer)

    cum_size = csp.accum(timer.value)
    return cum_size


@csp.node
def process_results(x: {ts[str]: ts[int]}):
    if csp.ticked(x):
        print(csp.now(), "cum_sizes:", dict(x.tickeditems()))


@csp.graph
def main_graph():
    # We have a stream of incoming orders to deal with, we dont know the symbols up front
    adapter_manager = MyAdapterManager(timedelta(seconds=0.75))
    symbols = ["AAPL", "IBM", "TSLA", "GS", "JPM"]

    orders = csp.curve(
        Order,
        [
            (timedelta(seconds=0), Order(symbol="AAPL", price=135, size=100)),
            (timedelta(seconds=1), Order(symbol="FB", price=350, size=-200)),
            (timedelta(seconds=2), Order(symbol="GME", price=210, size=1000)),
            (timedelta(seconds=3), Order(symbol="AAPL", price=138, size=-100)),
            (timedelta(seconds=4), Order(symbol="FB", price=330, size=100)),
            (timedelta(seconds=5), Order(symbol="AMC", price=57, size=400)),
            (timedelta(seconds=6), Order(symbol="GME", price=200, size=800)),
        ],
    )

    # Get a dynamic basket keys by symbol
    trigger = csp.dynamic_demultiplex(orders, orders.symbol)

    some_ts = csp.count(csp.timer(timedelta(seconds=1)))
    some_scalar = "howdy"

    # dynamic graphs
    cum_sizes = csp.dynamic(
        trigger,
        process_symbol,
        csp.snapkey(),  # csp.snapkey() provides the key that triggers a new dynamic graph as a scalar argument
        csp.attach(),  # csp.attach() will pass the corresponding timeseries of the key for the graph instance
        csp.snap(
            orders
        ),  # csp.snap will provide the current value of the given timeseries at the time of dynamic graph instantiation
        adapter_manager.subscribe("GS"),  # regular time series can be passed along, which will be shared across all instances
        some_scalar,  # regular scalar values can be passed as arguments to the sub-graph as well
    )

    # cum_sizes is a dynamic basket of results, keyed by the trigger keys
    process_results(cum_sizes)


def main():
    csp.run(main_graph, starttime=datetime.utcnow().replace(microsecond=0), endtime=timedelta(seconds=10))


if __name__ == "__main__":
    main()

Expected behavior
I was hoping I can use the adapter this way with dynamic graph manager. in the future, the adapter would subscribe to all symbols and I will be able to filter for the pertinent symbol

Error Message
No error message, just no output from

csp.print(symbol + " timer", timer) 

Runtime Environment

>>> import sys, csp; print(csp.__version__); print(sys.version); print(sys.platform)
0.0.5
3.10.14 | packaged by Anaconda, Inc. | (main, May  6 2024, 19:44:50) [MSC v.1916 64 bit (AMD64)]


Metadata

Metadata

Assignees

No one assigned

    Labels

    adapter: generalIssues and PRs related to input/output adapters in generaltype: featureIssues and PRs related to new features

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions