Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: use fal.App for addition app #438

Merged
merged 1 commit into from
Mar 5, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions projects/fal/tests/test_apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,18 @@ class SleepInput(BaseModel):


class SleepOutput(BaseModel):
pass
slept: bool = True


class SleepApp(fal.App, keep_alive=300, max_concurrency=1):
machine_type = "XS"

@fal.endpoint("/")
async def sleep(self, input: SleepInput) -> SleepOutput:
await asyncio.sleep(input.wait_time)
return SleepOutput()
for _ in range(input.wait_time):
print("sleeping...", flush=True)
await asyncio.sleep(1)
return SleepOutput(slept=True)


class ExceptionApp(fal.App, keep_alive=300, max_concurrency=1):
Expand Down Expand Up @@ -511,34 +513,58 @@ def test_app_cancellation(test_app: str, test_cancellable_app: str):


@pytest.mark.flaky(max_runs=3)
def test_app_client_async(test_app: str):
request_handle = apps.submit(test_app, arguments={"lhs": 1, "rhs": 2})
assert request_handle.get() == {"result": 3}
def test_app_client_async():
import uuid

request_handle = apps.submit(
test_app, arguments={"lhs": 2, "rhs": 3, "wait_time": 5}
app_alias = str(uuid.uuid4()) + "-client-async-alias"

app = fal.wrap_app(SleepApp)
app_revision = app.host.register(
func=app.func,
options=app.options,
application_name=app_alias,
application_auth_mode="private",
)

for event in request_handle.iter_events(logs=True):
user = _get_user()

handle = apps.submit(f"{user.user_id}/{app_revision}", arguments={"wait_time": 1})
with pytest.raises(HTTPStatusError) as e:
# Not yet completed
handle.fetch_result()
assert e.value.response.status_code == 400

# Wait until the app is completed
assert handle.get() == {"slept": True}

# New request
handle = apps.submit(f"{user.user_id}/{app_revision}", arguments={"wait_time": 5})

for event in handle.iter_events(logs=True):
assert isinstance(event, (apps.Queued, apps.InProgress))
if isinstance(event, apps.InProgress) and event.logs:
logs = [log["message"] for log in event.logs]
assert "sleeping..." in logs
elif isinstance(event, apps.Queued):
assert event.position == 0

status = request_handle.status(logs=True)
status = handle.status(logs=True)
assert isinstance(status, apps.Completed)
if not status.logs:
# Sometimes fetching logs times out to get faster results
status = handle.status(logs=True)

assert status.logs, "Logs missing from Completed status"
assert any("sleeping..." in log["message"] for log in status.logs)

# It is safe to use fetch_result when we know for a fact the request itself
# is completed.
result = request_handle.fetch_result()
result = handle.fetch_result()

# .get() can still be used and will return the same value
result_alternative = request_handle.get()
assert result == result_alternative
assert result == {"result": 5}
get_result = handle.get()
assert result == get_result
assert result == {"slept": True}


def test_app_openapi_spec_metadata(test_app: str, request: pytest.FixtureRequest):
Expand Down
Loading