From 802184941571c514f5414ae05d3700832792686a Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Mon, 30 Dec 2024 15:28:00 -0500 Subject: [PATCH] GCS Live tests --- .../pipelines/dagster_oss_nightly_pipeline.py | 6 + .../dagster_buildkite/steps/integration.py | 18 +++ .../gcp_test_proj/__init__.py | 0 .../gcp_test_proj/defs.py | 12 ++ .../integration_tests/__init__.py | 0 .../integration_tests/conftest.py | 118 ++++++++++++++++++ .../dagster-yamls/json-credentials.yaml | 11 ++ .../test_compute_log_manager.py | 61 +++++++++ .../dagster-gcp-live-tests/setup.py | 12 ++ .../dagster-gcp-live-tests/tox.ini | 28 +++++ 10 files changed, 266 insertions(+) create mode 100644 integration_tests/test_suites/dagster-gcp-live-tests/gcp_test_proj/__init__.py create mode 100644 integration_tests/test_suites/dagster-gcp-live-tests/gcp_test_proj/defs.py create mode 100644 integration_tests/test_suites/dagster-gcp-live-tests/integration_tests/__init__.py create mode 100644 integration_tests/test_suites/dagster-gcp-live-tests/integration_tests/conftest.py create mode 100644 integration_tests/test_suites/dagster-gcp-live-tests/integration_tests/dagster-yamls/json-credentials.yaml create mode 100644 integration_tests/test_suites/dagster-gcp-live-tests/integration_tests/test_compute_log_manager.py create mode 100644 integration_tests/test_suites/dagster-gcp-live-tests/setup.py create mode 100644 integration_tests/test_suites/dagster-gcp-live-tests/tox.ini diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/pipelines/dagster_oss_nightly_pipeline.py b/.buildkite/dagster-buildkite/dagster_buildkite/pipelines/dagster_oss_nightly_pipeline.py index 9e7117715bfb8..4cea252484b86 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/pipelines/dagster_oss_nightly_pipeline.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/pipelines/dagster_oss_nightly_pipeline.py @@ -83,6 +83,12 @@ def build_dagster_oss_nightly_steps() -> List[BuildkiteStep]: ], always_run_if=lambda: True, ), + PackageSpec( + "integration_tests/test_suites/dagster-gcp-live-tests", + name="gcp-live-tests", + env_vars=["GCP_LIVE_TEST_CREDENTIALS"], + always_run_if=lambda: True, + ), ] ) diff --git a/.buildkite/dagster-buildkite/dagster_buildkite/steps/integration.py b/.buildkite/dagster-buildkite/dagster_buildkite/steps/integration.py index ad10aae2449c9..9e59bc08debf5 100644 --- a/.buildkite/dagster-buildkite/dagster_buildkite/steps/integration.py +++ b/.buildkite/dagster-buildkite/dagster_buildkite/steps/integration.py @@ -176,6 +176,15 @@ def skip_if_not_azure_commit(): ) +def skip_if_not_gcp_commit(): + """If no dagster-gcp files are changed, skip the gcp live tests.""" + return ( + None + if (any("dagster-gcp" in str(path) for path in ChangedFiles.all)) + else "Not a dagster-gcp commit" + ) + + def build_azure_live_test_suite_steps() -> List[BuildkiteTopLevelStep]: return PackageSpec( os.path.join("integration_tests", "test_suites", "dagster-azure-live-tests"), @@ -191,6 +200,15 @@ def build_azure_live_test_suite_steps() -> List[BuildkiteTopLevelStep]: ).build_steps() +def build_gcp_live_test_suite_steps() -> List[BuildkiteTopLevelStep]: + return PackageSpec( + os.path.join("integration_tests", "test_suites", "dagster-gcp-live-tests"), + env_vars=[ + "GCP_LIVE_TEST_CREDENTIALS", + ], + ).build_steps() + + def daemon_pytest_extra_cmds(version: AvailablePythonVersion, _): return [ "export DAGSTER_DOCKER_IMAGE_TAG=$${BUILDKITE_BUILD_ID}-" + version.value, diff --git a/integration_tests/test_suites/dagster-gcp-live-tests/gcp_test_proj/__init__.py b/integration_tests/test_suites/dagster-gcp-live-tests/gcp_test_proj/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/integration_tests/test_suites/dagster-gcp-live-tests/gcp_test_proj/defs.py b/integration_tests/test_suites/dagster-gcp-live-tests/gcp_test_proj/defs.py new file mode 100644 index 0000000000000..507bcfae53314 --- /dev/null +++ b/integration_tests/test_suites/dagster-gcp-live-tests/gcp_test_proj/defs.py @@ -0,0 +1,12 @@ +from dagster import AssetExecutionContext, asset + + +@asset +def my_asset(context: AssetExecutionContext) -> None: + for i in range(10): + print(f"Printing without context {i}") # noqa: T201 + context.log.info(f"Logging using context {i}") + try: + raise Exception("This is an exception") + except: + return diff --git a/integration_tests/test_suites/dagster-gcp-live-tests/integration_tests/__init__.py b/integration_tests/test_suites/dagster-gcp-live-tests/integration_tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/integration_tests/test_suites/dagster-gcp-live-tests/integration_tests/conftest.py b/integration_tests/test_suites/dagster-gcp-live-tests/integration_tests/conftest.py new file mode 100644 index 0000000000000..3cafc547bcfa0 --- /dev/null +++ b/integration_tests/test_suites/dagster-gcp-live-tests/integration_tests/conftest.py @@ -0,0 +1,118 @@ +import json +import os +import shutil +import signal +import subprocess +import time +import uuid +from contextlib import contextmanager +from pathlib import Path +from tempfile import TemporaryDirectory +from typing import Any, Generator, List, Mapping + +import pytest +import requests +from dagster._core.test_utils import environ +from dagster._time import get_current_timestamp +from dagster._utils import process_is_alive +from google.cloud import storage as gcs + + +def integration_test_dir() -> Path: + return Path(__file__).parent.parent + + +def _dagster_is_ready(port: int) -> bool: + try: + response = requests.get(f"http://localhost:{port}") + return response.status_code == 200 + except: + return False + + +def path_to_dagster_yamls() -> Path: + return Path(__file__).parent / "dagster-yamls" + + +def delete_blobs_with_prefix(prefix: str) -> None: + bucket_client = get_bucket_client(get_credentials()) + for blob in bucket_client.list_blobs(prefix=prefix): + bucket_client.delete_blob(blob.name) + + +@pytest.fixture(name="dagster_yaml") +def dagster_yaml_path(request) -> Generator[Path, None, None]: + yield path_to_dagster_yamls() / request.param + + +@pytest.fixture(name="dagster_home") +def setup_dagster_home(dagster_yaml: Path) -> Generator[str, None, None]: + """Instantiate a temporary directory to serve as the DAGSTER_HOME.""" + with TemporaryDirectory() as tmpdir: + # Copy over dagster.yaml + shutil.copy2(dagster_yaml, Path(tmpdir) / "dagster.yaml") + with environ({"DAGSTER_HOME": tmpdir}): + yield tmpdir + + +@pytest.fixture +def prefix_env() -> Generator[str, None, None]: + prefix = f"prefix_{uuid.uuid4().hex}" + try: + with environ({"TEST_GCP_LOG_PREFIX": prefix}): + yield prefix + finally: + delete_blobs_with_prefix(prefix) + + +@pytest.fixture(name="dagster_dev") +def setup_dagster(dagster_home: str, prefix_env: str) -> Generator[Any, None, None]: + with stand_up_dagster(["dagster", "dev", "-m", "gcp_test_proj.defs"]) as process: + yield process + + +@contextmanager +def stand_up_dagster( + dagster_dev_cmd: List[str], port: int = 3000 +) -> Generator[subprocess.Popen, None, None]: + """Stands up a dagster instance using the dagster dev CLI. dagster_defs_path must be provided + by a fixture included in the callsite. + """ + process = subprocess.Popen( + dagster_dev_cmd, + env=os.environ.copy(), + shell=False, + preexec_fn=os.setsid, # noqa + ) + try: + dagster_ready = False + initial_time = get_current_timestamp() + while get_current_timestamp() - initial_time < 60: + if _dagster_is_ready(port): + dagster_ready = True + break + time.sleep(1) + + assert dagster_ready, "Dagster did not start within 30 seconds..." + yield process + finally: + if process_is_alive(process.pid): + os.killpg(process.pid, signal.SIGKILL) + + +def get_credentials() -> Mapping: + return json.loads(os.environ["GCP_LIVE_TEST_CREDENTIALS"]) + + +def get_bucket_client(credentials: Mapping) -> gcs.Bucket: + return gcs.Client.from_service_account_info(credentials).get_bucket("computelogmanager-tests") + + +@pytest.fixture(name="credentials") +def setup_credentials() -> Generator[Mapping, None, None]: + yield get_credentials() + + +@pytest.fixture(name="bucket_client") +def setup_container_client() -> Generator[gcs.Bucket, None, None]: + yield get_bucket_client(get_credentials()) diff --git a/integration_tests/test_suites/dagster-gcp-live-tests/integration_tests/dagster-yamls/json-credentials.yaml b/integration_tests/test_suites/dagster-gcp-live-tests/integration_tests/dagster-yamls/json-credentials.yaml new file mode 100644 index 0000000000000..48319c0fd3250 --- /dev/null +++ b/integration_tests/test_suites/dagster-gcp-live-tests/integration_tests/dagster-yamls/json-credentials.yaml @@ -0,0 +1,11 @@ +compute_logs: + module: dagster_gcp.gcs.compute_log_manager + class: GCSComputeLogManager + config: + json_credentials_envvar: GCP_LIVE_TEST_CREDENTIALS + prefix: + env: TEST_GCP_LOG_PREFIX + bucket: computelogmanager-tests + local_dir: "/tmp/cool" + upload_interval: 30 + show_url_only: true \ No newline at end of file diff --git a/integration_tests/test_suites/dagster-gcp-live-tests/integration_tests/test_compute_log_manager.py b/integration_tests/test_suites/dagster-gcp-live-tests/integration_tests/test_compute_log_manager.py new file mode 100644 index 0000000000000..6094b563b567f --- /dev/null +++ b/integration_tests/test_suites/dagster-gcp-live-tests/integration_tests/test_compute_log_manager.py @@ -0,0 +1,61 @@ +import subprocess +from pathlib import Path +from typing import Mapping + +import pytest +from dagster import ( + DagsterEventType, + DagsterInstance, + EventRecordsFilter, + _check as check, +) +from google.cloud import storage as gcs + +from .conftest import get_bucket_client, get_credentials # noqa: TID252 + + +@pytest.mark.parametrize( + "dagster_yaml", + ["json-credentials.yaml"], + indirect=True, +) +def test_compute_log_manager( + dagster_dev: subprocess.Popen, + bucket_client: gcs.Bucket, + prefix_env: str, + credentials: Mapping, + dagster_yaml: Path, +) -> None: + subprocess.run( + ["dagster", "asset", "materialize", "--select", "my_asset", "-m", "gcp_test_proj.defs"], + check=True, + ) + logs_captured_data = check.not_none( + DagsterInstance.get() + .get_event_records( + EventRecordsFilter( + event_type=DagsterEventType.LOGS_CAPTURED, + ) + )[0] + .event_log_entry.dagster_event + ).logs_captured_data + + assert logs_captured_data.external_stderr_url + assert logs_captured_data.external_stdout_url + stderr = get_content_from_url(logs_captured_data.external_stderr_url) + stdout = get_content_from_url(logs_captured_data.external_stdout_url) + assert stdout.count("Printing without context") == 10 + assert stderr.count("Logging using context") == 10 + + +def _parse_gcs_url_into_uri(url: str) -> str: + return url.removeprefix( + "https://console.cloud.google.com/storage/browser/_details/computelogmanager-tests/" + ) + + +def get_content_from_url(url: str) -> str: + uri = _parse_gcs_url_into_uri(url) + client = get_bucket_client(get_credentials()) + blob = client.blob(uri) + return blob.download_as_string().decode() diff --git a/integration_tests/test_suites/dagster-gcp-live-tests/setup.py b/integration_tests/test_suites/dagster-gcp-live-tests/setup.py new file mode 100644 index 0000000000000..596bf7401b356 --- /dev/null +++ b/integration_tests/test_suites/dagster-gcp-live-tests/setup.py @@ -0,0 +1,12 @@ +from setuptools import find_packages, setup + +setup( + name="gcp-test-proj", + packages=find_packages(), + install_requires=[ + "dagster", + "dagster-webserver", + "dagster-gcp", + ], + extras_require={"test": ["pytest"]}, +) diff --git a/integration_tests/test_suites/dagster-gcp-live-tests/tox.ini b/integration_tests/test_suites/dagster-gcp-live-tests/tox.ini new file mode 100644 index 0000000000000..94cd3e0414fab --- /dev/null +++ b/integration_tests/test_suites/dagster-gcp-live-tests/tox.ini @@ -0,0 +1,28 @@ +[tox] +skipsdist = true + +[testenv] +download = True +passenv = + CI_* + COVERALLS_REPO_TOKEN + BUILDKITE* + TEST_GCP* + GCP_LIVE_TEST_CREDENTIALS +install_command = uv pip install {opts} {packages} +deps = + -e ../../../python_modules/dagster[test] + -e ../../../python_modules/dagster-webserver + -e ../../../python_modules/dagster-test + -e ../../../python_modules/dagster-pipes + -e ../../../python_modules/dagster-graphql + -e ../../../python_modules/libraries/dagster-gcp + -e . +allowlist_externals = + /bin/bash + uv +commands = + # We need to rebuild the UI to ensure that the dagster-webserver can run + make -C ../../.. rebuild_ui + !windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster' + pytest ./integration_tests --snapshot-warn-unused -vv -s {posargs} \ No newline at end of file