Skip to content

Commit

Permalink
GCS Live tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Dec 31, 2024
1 parent 942e7a0 commit 9449650
Show file tree
Hide file tree
Showing 10 changed files with 266 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,12 @@ def build_dagster_oss_nightly_steps() -> List[BuildkiteStep]:
],
always_run_if=lambda: True,
),
PackageSpec(
"integration_tests/test_suites/dagster-gcp-live-tests",
name="gcp-live-tests",
env_vars=["GCP_LIVE_TEST_CREDENTIALS"],
always_run_if=lambda: True,
),
]
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,15 @@ def skip_if_not_azure_commit():
)


def skip_if_not_gcp_commit():
"""If no dagster-gcp files are changed, skip the gcp live tests."""
return (
None
if (any("dagster-gcp" in str(path) for path in ChangedFiles.all))
else "Not a dagster-gcp commit"
)


def build_azure_live_test_suite_steps() -> List[BuildkiteTopLevelStep]:
return PackageSpec(
os.path.join("integration_tests", "test_suites", "dagster-azure-live-tests"),
Expand All @@ -191,6 +200,15 @@ def build_azure_live_test_suite_steps() -> List[BuildkiteTopLevelStep]:
).build_steps()


def build_gcp_live_test_suite_steps() -> List[BuildkiteTopLevelStep]:
return PackageSpec(
os.path.join("integration_tests", "test_suites", "dagster-gcp-live-tests"),
env_vars=[
"GCP_LIVE_TEST_CREDENTIALS",
],
).build_steps()


def daemon_pytest_extra_cmds(version: AvailablePythonVersion, _):
return [
"export DAGSTER_DOCKER_IMAGE_TAG=$${BUILDKITE_BUILD_ID}-" + version.value,
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from dagster import AssetExecutionContext, asset


@asset
def my_asset(context: AssetExecutionContext) -> None:
for i in range(10):
print(f"Printing without context {i}") # noqa: T201
context.log.info(f"Logging using context {i}")
try:
raise Exception("This is an exception")
except:
return
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import json
import os
import shutil
import signal
import subprocess
import time
import uuid
from contextlib import contextmanager
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, Generator, List, Mapping

import pytest
import requests
from dagster._core.test_utils import environ
from dagster._time import get_current_timestamp
from dagster._utils import process_is_alive
from google.cloud import storage as gcs


def integration_test_dir() -> Path:
return Path(__file__).parent.parent


def _dagster_is_ready(port: int) -> bool:
try:
response = requests.get(f"http://localhost:{port}")
return response.status_code == 200
except:
return False


def path_to_dagster_yamls() -> Path:
return Path(__file__).parent / "dagster-yamls"


def delete_blobs_with_prefix(prefix: str) -> None:
bucket_client = get_bucket_client(get_credentials())
for blob in bucket_client.list_blobs(prefix=prefix):
bucket_client.delete_blob(blob.name)


@pytest.fixture(name="dagster_yaml")
def dagster_yaml_path(request) -> Generator[Path, None, None]:
yield path_to_dagster_yamls() / request.param


@pytest.fixture(name="dagster_home")
def setup_dagster_home(dagster_yaml: Path) -> Generator[str, None, None]:
"""Instantiate a temporary directory to serve as the DAGSTER_HOME."""
with TemporaryDirectory() as tmpdir:
# Copy over dagster.yaml
shutil.copy2(dagster_yaml, Path(tmpdir) / "dagster.yaml")
with environ({"DAGSTER_HOME": tmpdir}):
yield tmpdir


@pytest.fixture
def prefix_env() -> Generator[str, None, None]:
prefix = f"prefix_{uuid.uuid4().hex}"
try:
with environ({"TEST_GCP_LOG_PREFIX": prefix}):
yield prefix
finally:
delete_blobs_with_prefix(prefix)


@pytest.fixture(name="dagster_dev")
def setup_dagster(dagster_home: str, prefix_env: str) -> Generator[Any, None, None]:
with stand_up_dagster(["dagster", "dev", "-m", "gcp_test_proj.defs"]) as process:
yield process


@contextmanager
def stand_up_dagster(
dagster_dev_cmd: List[str], port: int = 3000
) -> Generator[subprocess.Popen, None, None]:
"""Stands up a dagster instance using the dagster dev CLI. dagster_defs_path must be provided
by a fixture included in the callsite.
"""
process = subprocess.Popen(
dagster_dev_cmd,
env=os.environ.copy(),
shell=False,
preexec_fn=os.setsid, # noqa
)
try:
dagster_ready = False
initial_time = get_current_timestamp()
while get_current_timestamp() - initial_time < 60:
if _dagster_is_ready(port):
dagster_ready = True
break
time.sleep(1)

assert dagster_ready, "Dagster did not start within 30 seconds..."
yield process
finally:
if process_is_alive(process.pid):
os.killpg(process.pid, signal.SIGKILL)


def get_credentials() -> Mapping:
return json.loads(os.environ["GCP_LIVE_TEST_CREDENTIALS"])


def get_bucket_client(credentials: Mapping) -> gcs.Bucket:
return gcs.Client.from_service_account_info(credentials).get_bucket("computelogmanager-tests")


@pytest.fixture(name="credentials")
def setup_credentials() -> Generator[Mapping, None, None]:
yield get_credentials()


@pytest.fixture(name="bucket_client")
def setup_container_client() -> Generator[gcs.Bucket, None, None]:
yield get_bucket_client(get_credentials())
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
compute_logs:
module: dagster_gcp.gcs.compute_log_manager
class: GCSComputeLogManager
config:
json_credentials_envvar: GCP_LIVE_TEST_CREDENTIALS
prefix:
env: TEST_GCP_LOG_PREFIX
bucket: computelogmanager-tests
local_dir: "/tmp/cool"
upload_interval: 30
show_url_only: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import subprocess
from pathlib import Path
from typing import Mapping

import pytest
from dagster import (
DagsterEventType,
DagsterInstance,
EventRecordsFilter,
_check as check,
)
from google.cloud import storage as gcs

from .conftest import get_bucket_client, get_credentials # noqa: TID252


@pytest.mark.parametrize(
"dagster_yaml",
["json-credentials.yaml"],
indirect=True,
)
def test_compute_log_manager(
dagster_dev: subprocess.Popen,
bucket_client: gcs.Bucket,
prefix_env: str,
credentials: Mapping,
dagster_yaml: Path,
) -> None:
subprocess.run(
["dagster", "asset", "materialize", "--select", "my_asset", "-m", "gcp_test_proj.defs"],
check=True,
)
logs_captured_data = check.not_none(
DagsterInstance.get()
.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.LOGS_CAPTURED,
)
)[0]
.event_log_entry.dagster_event
).logs_captured_data

assert logs_captured_data.external_stderr_url
assert logs_captured_data.external_stdout_url
stderr = get_content_from_url(logs_captured_data.external_stderr_url)
stdout = get_content_from_url(logs_captured_data.external_stdout_url)
assert stdout.count("Printing without context") == 10
assert stderr.count("Logging using context") == 10


def _parse_gcs_url_into_uri(url: str) -> str:
return url.removeprefix(
"https://console.cloud.google.com/storage/browser/_details/computelogmanager-tests/"
)


def get_content_from_url(url: str) -> str:
uri = _parse_gcs_url_into_uri(url)
client = get_bucket_client(get_credentials())
blob = client.blob(uri)
return blob.download_as_string().decode()
12 changes: 12 additions & 0 deletions integration_tests/test_suites/dagster-gcp-live-tests/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from setuptools import find_packages, setup

setup(
name="gcp-test-proj",
packages=find_packages(),
install_requires=[
"dagster",
"dagster-webserver",
"dagster-gcp",
],
extras_require={"test": ["pytest"]},
)
28 changes: 28 additions & 0 deletions integration_tests/test_suites/dagster-gcp-live-tests/tox.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[tox]
skipsdist = true

[testenv]
download = True
passenv =
CI_*
COVERALLS_REPO_TOKEN
BUILDKITE*
TEST_GCP*
GCP_LIVE_TEST_CREDENTIALS
install_command = uv pip install {opts} {packages}
deps =
-e ../../../python_modules/dagster[test]
-e ../../../python_modules/dagster-webserver
-e ../../../python_modules/dagster-test
-e ../../../python_modules/dagster-pipes
-e ../../../python_modules/dagster-graphql
-e ../../../python_modules/libraries/dagster-gcp
-e .
allowlist_externals =
/bin/bash
uv
commands =
# We need to rebuild the UI to ensure that the dagster-webserver can run
make -C ../../.. rebuild_ui
!windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster'
pytest ./integration_tests --snapshot-warn-unused -vv -s {posargs}

0 comments on commit 9449650

Please sign in to comment.