Skip to content

Commit 3f6c0c0

Browse files
authored
Add coverage for flow run with agent to integration test (PrefectHQ#8374)
1 parent 98c4cf0 commit 3f6c0c0

File tree

3 files changed

+107
-0
lines changed

3 files changed

+107
-0
lines changed

.github/workflows/integration-tests.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,9 @@ jobs:
163163
- name: Run integration flows with client@dev, server@${{ matrix.prefect-version }}
164164
if: ${{ ! matrix.server-incompatible }}
165165
run: >
166+
TEST_SERVER_VERSION=${{ matrix.prefect-version }}
166167
PREFECT_API_URL="http://127.0.0.1:4200/api"
168+
TEST_CLIENT_VERSION=$(python -c 'import prefect; print(prefect.__version__)')
167169
./scripts/run-integration-flows.py
168170
169171
- name: Start server@dev
@@ -181,6 +183,8 @@ jobs:
181183
run: >
182184
docker run
183185
--env PREFECT_API_URL="http://127.0.0.1:4200/api"
186+
--env TEST_SERVER_VERSION=$(python -c 'import prefect; print(prefect.__version__)')
187+
--env TEST_CLIENT_VERSION=${{ matrix.client_version }}
184188
--volume $(pwd)/flows:/opt/prefect/integration/flows
185189
--volume $(pwd)/scripts:/opt/prefect/integration/scripts
186190
--network host

flows/agent.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import os
2+
import pathlib
3+
import subprocess
4+
5+
import anyio
6+
from packaging.version import Version
7+
8+
import prefect
9+
from prefect.deployments import Deployment
10+
from prefect.utilities.callables import parameter_schema
11+
12+
13+
@prefect.flow
14+
def hello(name: str = "world"):
15+
prefect.get_run_logger().info(f"Hello {name}!")
16+
17+
18+
async def apply_deployment_20(deployment):
19+
async with prefect.get_client() as client:
20+
flow_id = await client.create_flow_from_name(deployment.flow_name)
21+
return await client.create_deployment(
22+
flow_id=flow_id,
23+
name=deployment.name,
24+
path=deployment.path,
25+
entrypoint=deployment.entrypoint,
26+
)
27+
28+
29+
async def create_flow_run(deployment_id):
30+
async with prefect.get_client() as client:
31+
return await client.create_flow_run_from_deployment(
32+
deployment_id, parameters={"name": "integration tests"}
33+
)
34+
35+
36+
async def read_flow_run(flow_run_id):
37+
async with prefect.get_client() as client:
38+
return await client.read_flow_run(flow_run_id)
39+
40+
41+
def main():
42+
# Create deployment
43+
if Version(prefect.__version__) < Version("2.1.0"):
44+
deployment = Deployment(
45+
name="test-deployment",
46+
flow_name=hello.name,
47+
parameter_openapi_schema=parameter_schema(hello),
48+
path=str(pathlib.Path(__file__).parent),
49+
entrypoint=f"{__file__}:hello",
50+
)
51+
deployment_id = anyio.run(apply_deployment_20, deployment)
52+
else:
53+
deployment = Deployment.build_from_flow(flow=hello, name="test-deployment")
54+
deployment_id = deployment.apply()
55+
56+
# Create a flow run
57+
flow_run = anyio.run(create_flow_run, deployment_id)
58+
59+
TEST_SERVER_VERSION = os.environ.get("TEST_SERVER_VERSION")
60+
61+
if Version(prefect.__version__) < Version("2.1"):
62+
# work queue is positional instead of a flag and requires creation
63+
subprocess.run(["prefect", "work-queue", "create", "test"])
64+
try:
65+
subprocess.run(["prefect", "agent", "start", "test"], timeout=30)
66+
except subprocess.TimeoutExpired:
67+
pass
68+
elif (
69+
Version(prefect.__version__) > Version("2.6")
70+
and TEST_SERVER_VERSION
71+
and Version(TEST_SERVER_VERSION) < Version("2.7")
72+
and Version(TEST_SERVER_VERSION) > Version("2.5")
73+
):
74+
print(
75+
"A CANCELLING state type was added in 2.7 so when the agent (client) "
76+
"is running 2.7+ and the server is running 2.6 checks for cancelled flows "
77+
"will fail. This is a known incompatibility."
78+
)
79+
return
80+
elif Version(prefect.__version__) < Version("2.6"):
81+
# --run-once is not available so just run for a bit
82+
try:
83+
subprocess.run(
84+
["prefect", "agent", "start", "--work-queue", "default"], timeout=30
85+
)
86+
except subprocess.TimeoutExpired:
87+
pass
88+
else:
89+
subprocess.check_call(
90+
["prefect", "agent", "start", "--run-once", "--work-queue", "default"],
91+
)
92+
93+
flow_run = anyio.run(read_flow_run, flow_run.id)
94+
assert flow_run.state.is_completed()
95+
96+
97+
if __name__ == "__main__":
98+
main()

scripts/run-integration-flows.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
1313
PREFECT_API_URL="http://localhost:4200" ./scripts/run-integration-flows.py
1414
"""
15+
import os
1516
import runpy
1617
import sys
1718
from pathlib import Path
@@ -25,6 +26,10 @@
2526
def run_flows(search_path: Union[str, Path]):
2627
count = 0
2728
print(f"Running integration tests with client version: {__version__}")
29+
server_version = os.environ.get("TEST_SERVER_VERSION")
30+
if server_version:
31+
print(f"and server version: {server_version}")
32+
2833
for file in sorted(Path(search_path).glob("*.py")):
2934
print(f" {file.relative_to(search_path)} ".center(90, "-"), flush=True)
3035
runpy.run_path(file, run_name="__main__")

0 commit comments

Comments
 (0)