Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-gcp] live tests #26761

Merged
merged 1 commit into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ def build_dagster_oss_nightly_steps() -> List[BuildkiteStep]:
],
always_run_if=lambda: True,
),
PackageSpec(
"integration_tests/test_suites/dagster-gcp-live-tests",
name="gcp-live-tests",
env_vars=["GCP_LIVE_TEST_CREDENTIALS"],
always_run_if=lambda: True,
),
]
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,15 @@ def skip_if_not_azure_commit():
)


def skip_if_not_gcp_commit():
"""If no dagster-gcp files are changed, skip the gcp live tests."""
return (
None
if (any("dagster-gcp" in str(path) for path in ChangedFiles.all))
else "Not a dagster-gcp commit"
)


def build_azure_live_test_suite_steps() -> List[BuildkiteTopLevelStep]:
return PackageSpec(
os.path.join("integration_tests", "test_suites", "dagster-azure-live-tests"),
Expand All @@ -191,6 +200,15 @@ def build_azure_live_test_suite_steps() -> List[BuildkiteTopLevelStep]:
).build_steps()


def build_gcp_live_test_suite_steps() -> List[BuildkiteTopLevelStep]:
return PackageSpec(
os.path.join("integration_tests", "test_suites", "dagster-gcp-live-tests"),
env_vars=[
"GCP_LIVE_TEST_CREDENTIALS",
],
).build_steps()


def daemon_pytest_extra_cmds(version: AvailablePythonVersion, _):
return [
"export DAGSTER_DOCKER_IMAGE_TAG=$${BUILDKITE_BUILD_ID}-" + version.value,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from dagster import AssetExecutionContext, asset


@asset
def my_asset(context: AssetExecutionContext) -> None:
for i in range(10):
print(f"Printing without context {i}") # noqa: T201
context.log.info(f"Logging using context {i}")
try:
raise Exception("This is an exception")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am not sure how this is being used / tested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I ought to add a line for it but it shows up in stderr.

except:
return
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import json
import os
import shutil
import signal
import subprocess
import time
import uuid
from contextlib import contextmanager
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, Generator, List, Mapping

import pytest
import requests
from dagster._core.test_utils import environ
from dagster._time import get_current_timestamp
from dagster._utils import process_is_alive
from google.cloud import storage as gcs


def integration_test_dir() -> Path:
return Path(__file__).parent.parent


def _dagster_is_ready(port: int) -> bool:
try:
response = requests.get(f"http://localhost:{port}")
return response.status_code == 200
except:
return False


def path_to_dagster_yamls() -> Path:
return Path(__file__).parent / "dagster-yamls"


def delete_blobs_with_prefix(prefix: str) -> None:
bucket_client = get_bucket_client(get_credentials())
for blob in bucket_client.list_blobs(prefix=prefix):
bucket_client.delete_blob(blob.name)


@pytest.fixture(name="dagster_yaml")
def dagster_yaml_path(request) -> Generator[Path, None, None]:
yield path_to_dagster_yamls() / request.param


@pytest.fixture(name="dagster_home")
def setup_dagster_home(dagster_yaml: Path) -> Generator[str, None, None]:
"""Instantiate a temporary directory to serve as the DAGSTER_HOME."""
with TemporaryDirectory() as tmpdir:
# Copy over dagster.yaml
shutil.copy2(dagster_yaml, Path(tmpdir) / "dagster.yaml")
with environ({"DAGSTER_HOME": tmpdir}):
yield tmpdir


@pytest.fixture
def prefix_env() -> Generator[str, None, None]:
prefix = f"prefix_{uuid.uuid4().hex}"
try:
with environ({"TEST_GCP_LOG_PREFIX": prefix}):
yield prefix
finally:
delete_blobs_with_prefix(prefix)


@pytest.fixture(name="dagster_dev")
def setup_dagster(dagster_home: str, prefix_env: str) -> Generator[Any, None, None]:
with stand_up_dagster(["dagster", "dev", "-m", "gcp_test_proj.defs"]) as process:
yield process


@contextmanager
def stand_up_dagster(
dagster_dev_cmd: List[str], port: int = 3000
) -> Generator[subprocess.Popen, None, None]:
"""Stands up a dagster instance using the dagster dev CLI. dagster_defs_path must be provided
by a fixture included in the callsite.
"""
process = subprocess.Popen(
dagster_dev_cmd,
env=os.environ.copy(),
shell=False,
preexec_fn=os.setsid, # noqa
)
try:
dagster_ready = False
initial_time = get_current_timestamp()
while get_current_timestamp() - initial_time < 60:
if _dagster_is_ready(port):
dagster_ready = True
break
time.sleep(1)

assert dagster_ready, "Dagster did not start within 30 seconds..."
yield process
finally:
if process_is_alive(process.pid):
os.killpg(process.pid, signal.SIGKILL)


def get_credentials() -> Mapping:
return json.loads(os.environ["GCP_LIVE_TEST_CREDENTIALS"])


def get_bucket_client(credentials: Mapping) -> gcs.Bucket:
return gcs.Client.from_service_account_info(credentials).get_bucket("computelogmanager-tests")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is pulled straight from the compute log manager implementation



@pytest.fixture(name="credentials")
def setup_credentials() -> Generator[Mapping, None, None]:
yield get_credentials()


@pytest.fixture(name="bucket_client")
def setup_container_client() -> Generator[gcs.Bucket, None, None]:
yield get_bucket_client(get_credentials())
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
compute_logs:
module: dagster_gcp.gcs.compute_log_manager
class: GCSComputeLogManager
config:
json_credentials_envvar: GCP_LIVE_TEST_CREDENTIALS
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are new creds I created for this test suite

prefix:
env: TEST_GCP_LOG_PREFIX
bucket: computelogmanager-tests
local_dir: "/tmp/cool"
upload_interval: 30
show_url_only: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import subprocess
from pathlib import Path
from typing import Mapping

import pytest
from dagster import (
DagsterEventType,
DagsterInstance,
EventRecordsFilter,
_check as check,
)
from google.cloud import storage as gcs

from .conftest import get_bucket_client, get_credentials # noqa: TID252


@pytest.mark.parametrize(
"dagster_yaml",
["json-credentials.yaml"],
indirect=True,
)
def test_compute_log_manager(
dagster_dev: subprocess.Popen,
bucket_client: gcs.Bucket,
prefix_env: str,
credentials: Mapping,
dagster_yaml: Path,
) -> None:
subprocess.run(
["dagster", "asset", "materialize", "--select", "my_asset", "-m", "gcp_test_proj.defs"],
check=True,
)
logs_captured_data = check.not_none(
DagsterInstance.get()
.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.LOGS_CAPTURED,
)
)[0]
.event_log_entry.dagster_event
).logs_captured_data

assert logs_captured_data.external_stderr_url
assert logs_captured_data.external_stdout_url
stderr = get_content_from_url(logs_captured_data.external_stderr_url)
stdout = get_content_from_url(logs_captured_data.external_stdout_url)
assert stdout.count("Printing without context") == 10
assert stderr.count("Logging using context") == 10
Comment on lines +29 to +48
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is all mostly the same as azure



def _parse_gcs_url_into_uri(url: str) -> str:
return url.removeprefix(
"https://console.cloud.google.com/storage/browser/_details/computelogmanager-tests/"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the url format can't be directly downloaded; so we need to do some gross munging to pull out the downloadable log key.

We actually hard code a similar string in the compute log manager itself, we should turn that into a constant and use it in both places ideally.

Copy link
Contributor

@mlarose mlarose Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this is overlap with my work item: https://github.com/dagster-io/dagster/pull/26723/files#diff-305fa8635f1d64e4c949271ee4a40747faf8e778da55094423b783bb6e76c36dR266 but i think it's fine to leave as is in context of this test, the test will break if the prefix changes, and it can be addressed then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup sounds good to me

)


def get_content_from_url(url: str) -> str:
uri = _parse_gcs_url_into_uri(url)
client = get_bucket_client(get_credentials())
blob = client.blob(uri)
return blob.download_as_string().decode()
12 changes: 12 additions & 0 deletions integration_tests/test_suites/dagster-gcp-live-tests/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from setuptools import find_packages, setup

setup(
name="gcp-test-proj",
packages=find_packages(),
install_requires=[
"dagster",
"dagster-webserver",
"dagster-gcp",
],
extras_require={"test": ["pytest"]},
)
28 changes: 28 additions & 0 deletions integration_tests/test_suites/dagster-gcp-live-tests/tox.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[tox]
skipsdist = true

[testenv]
download = True
passenv =
CI_*
COVERALLS_REPO_TOKEN
BUILDKITE*
TEST_GCP*
GCP_LIVE_TEST_CREDENTIALS
install_command = uv pip install {opts} {packages}
deps =
-e ../../../python_modules/dagster[test]
-e ../../../python_modules/dagster-webserver
-e ../../../python_modules/dagster-test
-e ../../../python_modules/dagster-pipes
-e ../../../python_modules/dagster-graphql
-e ../../../python_modules/libraries/dagster-gcp
-e .
allowlist_externals =
/bin/bash
uv
commands =
# We need to rebuild the UI to ensure that the dagster-webserver can run
make -C ../../.. rebuild_ui
!windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster'
pytest ./integration_tests --snapshot-warn-unused -vv -s {posargs}