-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
242 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
12 changes: 12 additions & 0 deletions
12
integration_tests/test_suites/dagster-gcp-live-tests/gcp_test_proj/defs.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
from dagster import AssetExecutionContext, asset | ||
|
||
|
||
@asset | ||
def my_asset(context: AssetExecutionContext) -> None: | ||
for i in range(10): | ||
print(f"Printing without context {i}") # noqa: T201 | ||
context.log.info(f"Logging using context {i}") | ||
try: | ||
raise Exception("This is an exception") | ||
except: | ||
return |
Empty file.
118 changes: 118 additions & 0 deletions
118
integration_tests/test_suites/dagster-gcp-live-tests/integration_tests/conftest.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
import json | ||
import os | ||
import shutil | ||
import signal | ||
import subprocess | ||
import time | ||
import uuid | ||
from contextlib import contextmanager | ||
from pathlib import Path | ||
from tempfile import TemporaryDirectory | ||
from typing import Any, Generator, List, Mapping | ||
|
||
import pytest | ||
import requests | ||
from dagster._core.test_utils import environ | ||
from dagster._time import get_current_timestamp | ||
from dagster._utils import process_is_alive | ||
from google.cloud import storage as gcs | ||
|
||
|
||
def integration_test_dir() -> Path: | ||
return Path(__file__).parent.parent | ||
|
||
|
||
def _dagster_is_ready(port: int) -> bool: | ||
try: | ||
response = requests.get(f"http://localhost:{port}") | ||
return response.status_code == 200 | ||
except: | ||
return False | ||
|
||
|
||
def path_to_dagster_yamls() -> Path: | ||
return Path(__file__).parent / "dagster-yamls" | ||
|
||
|
||
def delete_blobs_with_prefix(prefix: str) -> None: | ||
bucket_client = get_bucket_client(get_credentials()) | ||
for blob in bucket_client.list_blobs(prefix=prefix): | ||
bucket_client.delete_blob(blob.name) | ||
|
||
|
||
@pytest.fixture(name="dagster_yaml") | ||
def dagster_yaml_path(request) -> Generator[Path, None, None]: | ||
yield path_to_dagster_yamls() / request.param | ||
|
||
|
||
@pytest.fixture(name="dagster_home") | ||
def setup_dagster_home(dagster_yaml: Path) -> Generator[str, None, None]: | ||
"""Instantiate a temporary directory to serve as the DAGSTER_HOME.""" | ||
with TemporaryDirectory() as tmpdir: | ||
# Copy over dagster.yaml | ||
shutil.copy2(dagster_yaml, Path(tmpdir) / "dagster.yaml") | ||
with environ({"DAGSTER_HOME": tmpdir}): | ||
yield tmpdir | ||
|
||
|
||
@pytest.fixture | ||
def prefix_env() -> Generator[str, None, None]: | ||
prefix = f"prefix_{uuid.uuid4().hex}" | ||
try: | ||
with environ({"TEST_GCP_LOG_PREFIX": prefix}): | ||
yield prefix | ||
finally: | ||
delete_blobs_with_prefix(prefix) | ||
|
||
|
||
@pytest.fixture(name="dagster_dev") | ||
def setup_dagster(dagster_home: str, prefix_env: str) -> Generator[Any, None, None]: | ||
with stand_up_dagster(["dagster", "dev", "-m", "gcp_test_proj.defs"]) as process: | ||
yield process | ||
|
||
|
||
@contextmanager | ||
def stand_up_dagster( | ||
dagster_dev_cmd: List[str], port: int = 3000 | ||
) -> Generator[subprocess.Popen, None, None]: | ||
"""Stands up a dagster instance using the dagster dev CLI. dagster_defs_path must be provided | ||
by a fixture included in the callsite. | ||
""" | ||
process = subprocess.Popen( | ||
dagster_dev_cmd, | ||
env=os.environ.copy(), | ||
shell=False, | ||
preexec_fn=os.setsid, # noqa | ||
) | ||
try: | ||
dagster_ready = False | ||
initial_time = get_current_timestamp() | ||
while get_current_timestamp() - initial_time < 60: | ||
if _dagster_is_ready(port): | ||
dagster_ready = True | ||
break | ||
time.sleep(1) | ||
|
||
assert dagster_ready, "Dagster did not start within 30 seconds..." | ||
yield process | ||
finally: | ||
if process_is_alive(process.pid): | ||
os.killpg(process.pid, signal.SIGKILL) | ||
|
||
|
||
def get_credentials() -> Mapping: | ||
return json.loads(os.environ["GCP_LIVE_TEST_CREDENTIALS"]) | ||
|
||
|
||
def get_bucket_client(credentials: Mapping) -> gcs.Bucket: | ||
return gcs.Client.from_service_account_info(credentials).get_bucket("computelogmanager-tests") | ||
|
||
|
||
@pytest.fixture(name="credentials") | ||
def setup_credentials() -> Generator[Mapping, None, None]: | ||
yield get_credentials() | ||
|
||
|
||
@pytest.fixture(name="bucket_client") | ||
def setup_container_client() -> Generator[gcs.Bucket, None, None]: | ||
yield get_bucket_client(get_credentials()) |
11 changes: 11 additions & 0 deletions
11
.../test_suites/dagster-gcp-live-tests/integration_tests/dagster-yamls/json-credentials.yaml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
compute_logs: | ||
module: dagster_gcp.gcs.compute_log_manager | ||
class: GCSComputeLogManager | ||
config: | ||
json_credentials_envvar: GCP_LIVE_TEST_CREDENTIALS | ||
prefix: | ||
env: TEST_GCP_LOG_PREFIX | ||
bucket: computelogmanager-tests | ||
local_dir: "/tmp/cool" | ||
upload_interval: 30 | ||
show_url_only: true |
61 changes: 61 additions & 0 deletions
61
...on_tests/test_suites/dagster-gcp-live-tests/integration_tests/test_compute_log_manager.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
import subprocess | ||
from pathlib import Path | ||
from typing import Mapping | ||
|
||
import pytest | ||
from dagster import ( | ||
DagsterEventType, | ||
DagsterInstance, | ||
EventRecordsFilter, | ||
_check as check, | ||
) | ||
from google.cloud import storage as gcs | ||
|
||
from .conftest import get_bucket_client, get_credentials # noqa: TID252 | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"dagster_yaml", | ||
["json-credentials.yaml"], | ||
indirect=True, | ||
) | ||
def test_compute_log_manager( | ||
dagster_dev: subprocess.Popen, | ||
bucket_client: gcs.Bucket, | ||
prefix_env: str, | ||
credentials: Mapping, | ||
dagster_yaml: Path, | ||
) -> None: | ||
subprocess.run( | ||
["dagster", "asset", "materialize", "--select", "my_asset", "-m", "gcp_test_proj.defs"], | ||
check=True, | ||
) | ||
logs_captured_data = check.not_none( | ||
DagsterInstance.get() | ||
.get_event_records( | ||
EventRecordsFilter( | ||
event_type=DagsterEventType.LOGS_CAPTURED, | ||
) | ||
)[0] | ||
.event_log_entry.dagster_event | ||
).logs_captured_data | ||
|
||
assert logs_captured_data.external_stderr_url | ||
assert logs_captured_data.external_stdout_url | ||
stderr = get_content_from_url(logs_captured_data.external_stderr_url) | ||
stdout = get_content_from_url(logs_captured_data.external_stdout_url) | ||
assert stdout.count("Printing without context") == 10 | ||
assert stderr.count("Logging using context") == 10 | ||
|
||
|
||
def _parse_gcs_url_into_uri(url: str) -> str: | ||
return url.removeprefix( | ||
"https://console.cloud.google.com/storage/browser/_details/computelogmanager-tests/" | ||
) | ||
|
||
|
||
def get_content_from_url(url: str) -> str: | ||
uri = _parse_gcs_url_into_uri(url) | ||
client = get_bucket_client(get_credentials()) | ||
blob = client.blob(uri) | ||
return blob.download_as_string().decode() |
12 changes: 12 additions & 0 deletions
12
integration_tests/test_suites/dagster-gcp-live-tests/setup.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
from setuptools import find_packages, setup | ||
|
||
setup( | ||
name="gcp-test-proj", | ||
packages=find_packages(), | ||
install_requires=[ | ||
"dagster", | ||
"dagster-webserver", | ||
"dagster-gcp", | ||
], | ||
extras_require={"test": ["pytest"]}, | ||
) |
28 changes: 28 additions & 0 deletions
28
integration_tests/test_suites/dagster-gcp-live-tests/tox.ini
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
[tox] | ||
skipsdist = true | ||
|
||
[testenv] | ||
download = True | ||
passenv = | ||
CI_* | ||
COVERALLS_REPO_TOKEN | ||
BUILDKITE* | ||
TEST_GCP* | ||
GCP_LIVE_TEST_CREDENTIALS | ||
install_command = uv pip install {opts} {packages} | ||
deps = | ||
-e ../../../python_modules/dagster[test] | ||
-e ../../../python_modules/dagster-webserver | ||
-e ../../../python_modules/dagster-test | ||
-e ../../../python_modules/dagster-pipes | ||
-e ../../../python_modules/dagster-graphql | ||
-e ../../../python_modules/libraries/dagster-gcp | ||
-e . | ||
allowlist_externals = | ||
/bin/bash | ||
uv | ||
commands = | ||
# We need to rebuild the UI to ensure that the dagster-webserver can run | ||
make -C ../../.. rebuild_ui | ||
!windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster' | ||
pytest ./integration_tests --snapshot-warn-unused -vv -s {posargs} |