1
- # Import `BoostApp` class
2
- from temporal_boost import BoostApp , BoostLoggerConfig , BoostHTTPRoute
1
+ """
2
+ For development purposes
3
+ """
3
4
5
+ # Import `BoostApp` class
6
+ from temporal_boost import BoostApp , BoostLoggerConfig
4
7
from temporalio import activity
5
8
from temporalio import workflow
9
+ from datetime import timedelta
10
+ from dataclasses import dataclass
6
11
7
-
8
- from aiohttp import web
12
+ from example_asgi_app import fastapi_app
9
13
10
14
# Create `BoostApp` object
11
- app : BoostApp = BoostApp (
12
- logger_config = BoostLoggerConfig (json = False ),
13
- )
15
+ app : BoostApp = BoostApp (logger_config = BoostLoggerConfig (json = False ), use_pydantic = True )
16
+
17
+
18
+ @dataclass
19
+ class TestModel :
20
+ foo : str
21
+ bar : int
14
22
15
23
16
24
# Describe your activities/workflows
17
25
@activity .defn (name = "test_boost_activity_1" )
18
- async def test_boost_activity_1 (foo : str , bar : str ) -> str :
19
- app .logger .info ("This is built-in logger" )
20
- return f"1_{ foo } { bar } "
26
+ async def test_boost_activity_1 (payload : TestModel ) -> TestModel :
27
+ payload .foo = f"{ payload .foo } +activity1"
28
+ payload .bar = payload .bar + 1
29
+ return payload
21
30
22
31
23
32
@activity .defn (name = "test_boost_activity_2" )
24
- async def test_boost_activity_2 (foo : str , bar : str ) -> str :
25
- return f"2_{ foo } { bar } "
33
+ async def test_boost_activity_2 (payload : TestModel ) -> TestModel :
34
+ payload .foo = f"{ payload .foo } +activity2"
35
+ payload .bar = payload .bar + 1
36
+ return payload
26
37
27
38
28
- @workflow .defn (name = "TestCronWorkflow" , sandboxed = False )
29
- class TestCronWorkflow :
39
+ @workflow .defn (sandboxed = False )
40
+ class MyWorkflow :
30
41
@workflow .run
31
- async def run (self ) -> None :
32
- print ("With is cron workflow" )
33
- return None
42
+ async def run (self , foo : str ):
43
+ start_payload : TestModel = TestModel (foo = "hello" , bar = 0 )
44
+ print (type (start_payload ))
45
+ result_1 = await workflow .execute_activity (
46
+ test_boost_activity_1 ,
47
+ start_payload ,
48
+ task_queue = "task_q_1" ,
49
+ start_to_close_timeout = timedelta (minutes = 1 ),
50
+ )
51
+ print (type (result_1 ))
52
+ result_2 = await workflow .execute_activity (
53
+ test_boost_activity_2 ,
54
+ result_1 ,
55
+ task_queue = "task_q_2" ,
56
+ start_to_close_timeout = timedelta (minutes = 1 ),
57
+ )
58
+ print (type (result_2 ))
59
+ return result_2
34
60
35
61
36
62
# Add async workers to your app
@@ -42,28 +68,12 @@ async def run(self) -> None:
42
68
metrics_endpoint = "0.0.0.0:9000" ,
43
69
)
44
70
app .add_worker ("worker_2" , "task_q_2" , activities = [test_boost_activity_2 ])
45
- # Example of CRON worker
46
- app .add_worker (
47
- "test_cron" ,
48
- "task_q_3" ,
49
- workflows = [TestCronWorkflow ],
50
- cron_schedule = "* * * * *" ,
51
- cron_runner = TestCronWorkflow .run ,
52
- )
53
71
72
+ app .add_worker ("worker_3" , "task_q_3" , workflows = [MyWorkflow ])
54
73
55
- async def aaa (request ):
56
- import json
57
- q : dict = {"foo" : "bar" }
58
- return web .Response (text = json .dumps (q ), content_type = "application/json" )
74
+ # app.add_http_worker("test_http_worker_!", "0.0.0.0", 8000, routes=[])
59
75
60
-
61
- app .add_http_worker (
62
- "test_http_1" , host = "127.0.0.1" , port = 8899 ,
63
- routes = [
64
- BoostHTTPRoute ("/" , aaa )
65
- ]
66
- )
76
+ app .add_asgi_worker ("asgi_worker" , fastapi_app , "0.0.0.0" , 8000 )
67
77
68
78
# Run your app and start workers with CLI
69
79
app .run ()
0 commit comments