Closed
Description
Hi, can you please help with the below please, ta!
Describe the bug
I recall that AdapterManager are not yet supported in DynamicGraphManager.
To Reproduce
I am trying to combine the examples: https://github.com/Point72/csp/blob/main/examples/06_advanced/e1_dynamic.py and https://github.com/Point72/csp/blob/main/examples/04_writing_adapters/e5_adaptermanager_pushinput.py as below:
- Create adapter manager
- Inside a sub-graph, I am hoping I can use the updates from the adapter
from datetime import datetime, timedelta
import csp
from csp import ts
class Order(csp.Struct):
symbol: str
size: int
price: float
@csp.graph
def process_symbol(symbol: str, order: ts[Order], initial_order: Order, timer: ts[MyData], scalar: str) -> ts[int]:
print("Starting sub-graph to process symbol ", symbol, " with initial order: ", initial_order, " scalar: ", scalar)
csp.print(symbol + " orders", order)
csp.print(symbol + " timer", timer)
cum_size = csp.accum(timer.value)
return cum_size
@csp.node
def process_results(x: {ts[str]: ts[int]}):
if csp.ticked(x):
print(csp.now(), "cum_sizes:", dict(x.tickeditems()))
@csp.graph
def main_graph():
# We have a stream of incoming orders to deal with, we dont know the symbols up front
adapter_manager = MyAdapterManager(timedelta(seconds=0.75))
symbols = ["AAPL", "IBM", "TSLA", "GS", "JPM"]
orders = csp.curve(
Order,
[
(timedelta(seconds=0), Order(symbol="AAPL", price=135, size=100)),
(timedelta(seconds=1), Order(symbol="FB", price=350, size=-200)),
(timedelta(seconds=2), Order(symbol="GME", price=210, size=1000)),
(timedelta(seconds=3), Order(symbol="AAPL", price=138, size=-100)),
(timedelta(seconds=4), Order(symbol="FB", price=330, size=100)),
(timedelta(seconds=5), Order(symbol="AMC", price=57, size=400)),
(timedelta(seconds=6), Order(symbol="GME", price=200, size=800)),
],
)
# Get a dynamic basket keys by symbol
trigger = csp.dynamic_demultiplex(orders, orders.symbol)
some_ts = csp.count(csp.timer(timedelta(seconds=1)))
some_scalar = "howdy"
# dynamic graphs
cum_sizes = csp.dynamic(
trigger,
process_symbol,
csp.snapkey(), # csp.snapkey() provides the key that triggers a new dynamic graph as a scalar argument
csp.attach(), # csp.attach() will pass the corresponding timeseries of the key for the graph instance
csp.snap(
orders
), # csp.snap will provide the current value of the given timeseries at the time of dynamic graph instantiation
adapter_manager.subscribe("GS"), # regular time series can be passed along, which will be shared across all instances
some_scalar, # regular scalar values can be passed as arguments to the sub-graph as well
)
# cum_sizes is a dynamic basket of results, keyed by the trigger keys
process_results(cum_sizes)
def main():
csp.run(main_graph, starttime=datetime.utcnow().replace(microsecond=0), endtime=timedelta(seconds=10))
if __name__ == "__main__":
main()
Expected behavior
I was hoping I can use the adapter this way with dynamic graph manager. in the future, the adapter would subscribe to all symbols and I will be able to filter for the pertinent symbol
Error Message
No error message, just no output from
csp.print(symbol + " timer", timer)
Runtime Environment
>>> import sys, csp; print(csp.__version__); print(sys.version); print(sys.platform)
0.0.5
3.10.14 | packaged by Anaconda, Inc. | (main, May 6 2024, 19:44:50) [MSC v.1916 64 bit (AMD64)]