-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[dagster-gcp] live tests #26761
[dagster-gcp] live tests #26761
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
from dagster import AssetExecutionContext, asset | ||
|
||
|
||
@asset | ||
def my_asset(context: AssetExecutionContext) -> None: | ||
for i in range(10): | ||
print(f"Printing without context {i}") # noqa: T201 | ||
context.log.info(f"Logging using context {i}") | ||
try: | ||
raise Exception("This is an exception") | ||
except: | ||
return |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
import json | ||
import os | ||
import shutil | ||
import signal | ||
import subprocess | ||
import time | ||
import uuid | ||
from contextlib import contextmanager | ||
from pathlib import Path | ||
from tempfile import TemporaryDirectory | ||
from typing import Any, Generator, List, Mapping | ||
|
||
import pytest | ||
import requests | ||
from dagster._core.test_utils import environ | ||
from dagster._time import get_current_timestamp | ||
from dagster._utils import process_is_alive | ||
from google.cloud import storage as gcs | ||
|
||
|
||
def integration_test_dir() -> Path: | ||
return Path(__file__).parent.parent | ||
|
||
|
||
def _dagster_is_ready(port: int) -> bool: | ||
try: | ||
response = requests.get(f"http://localhost:{port}") | ||
return response.status_code == 200 | ||
except: | ||
return False | ||
|
||
|
||
def path_to_dagster_yamls() -> Path: | ||
return Path(__file__).parent / "dagster-yamls" | ||
|
||
|
||
def delete_blobs_with_prefix(prefix: str) -> None: | ||
bucket_client = get_bucket_client(get_credentials()) | ||
for blob in bucket_client.list_blobs(prefix=prefix): | ||
bucket_client.delete_blob(blob.name) | ||
|
||
|
||
@pytest.fixture(name="dagster_yaml") | ||
def dagster_yaml_path(request) -> Generator[Path, None, None]: | ||
yield path_to_dagster_yamls() / request.param | ||
|
||
|
||
@pytest.fixture(name="dagster_home") | ||
def setup_dagster_home(dagster_yaml: Path) -> Generator[str, None, None]: | ||
"""Instantiate a temporary directory to serve as the DAGSTER_HOME.""" | ||
with TemporaryDirectory() as tmpdir: | ||
# Copy over dagster.yaml | ||
shutil.copy2(dagster_yaml, Path(tmpdir) / "dagster.yaml") | ||
with environ({"DAGSTER_HOME": tmpdir}): | ||
yield tmpdir | ||
|
||
|
||
@pytest.fixture | ||
def prefix_env() -> Generator[str, None, None]: | ||
prefix = f"prefix_{uuid.uuid4().hex}" | ||
try: | ||
with environ({"TEST_GCP_LOG_PREFIX": prefix}): | ||
yield prefix | ||
finally: | ||
delete_blobs_with_prefix(prefix) | ||
|
||
|
||
@pytest.fixture(name="dagster_dev") | ||
def setup_dagster(dagster_home: str, prefix_env: str) -> Generator[Any, None, None]: | ||
with stand_up_dagster(["dagster", "dev", "-m", "gcp_test_proj.defs"]) as process: | ||
yield process | ||
|
||
|
||
@contextmanager | ||
def stand_up_dagster( | ||
dagster_dev_cmd: List[str], port: int = 3000 | ||
) -> Generator[subprocess.Popen, None, None]: | ||
"""Stands up a dagster instance using the dagster dev CLI. dagster_defs_path must be provided | ||
by a fixture included in the callsite. | ||
""" | ||
process = subprocess.Popen( | ||
dagster_dev_cmd, | ||
env=os.environ.copy(), | ||
shell=False, | ||
preexec_fn=os.setsid, # noqa | ||
) | ||
try: | ||
dagster_ready = False | ||
initial_time = get_current_timestamp() | ||
while get_current_timestamp() - initial_time < 60: | ||
if _dagster_is_ready(port): | ||
dagster_ready = True | ||
break | ||
time.sleep(1) | ||
|
||
assert dagster_ready, "Dagster did not start within 30 seconds..." | ||
yield process | ||
finally: | ||
if process_is_alive(process.pid): | ||
os.killpg(process.pid, signal.SIGKILL) | ||
|
||
|
||
def get_credentials() -> Mapping: | ||
return json.loads(os.environ["GCP_LIVE_TEST_CREDENTIALS"]) | ||
|
||
|
||
def get_bucket_client(credentials: Mapping) -> gcs.Bucket: | ||
return gcs.Client.from_service_account_info(credentials).get_bucket("computelogmanager-tests") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is pulled straight from the compute log manager implementation |
||
|
||
|
||
@pytest.fixture(name="credentials") | ||
def setup_credentials() -> Generator[Mapping, None, None]: | ||
yield get_credentials() | ||
|
||
|
||
@pytest.fixture(name="bucket_client") | ||
def setup_container_client() -> Generator[gcs.Bucket, None, None]: | ||
yield get_bucket_client(get_credentials()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
compute_logs: | ||
module: dagster_gcp.gcs.compute_log_manager | ||
class: GCSComputeLogManager | ||
config: | ||
json_credentials_envvar: GCP_LIVE_TEST_CREDENTIALS | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these are new creds I created for this test suite |
||
prefix: | ||
env: TEST_GCP_LOG_PREFIX | ||
bucket: computelogmanager-tests | ||
local_dir: "/tmp/cool" | ||
upload_interval: 30 | ||
show_url_only: true |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
import subprocess | ||
from pathlib import Path | ||
from typing import Mapping | ||
|
||
import pytest | ||
from dagster import ( | ||
DagsterEventType, | ||
DagsterInstance, | ||
EventRecordsFilter, | ||
_check as check, | ||
) | ||
from google.cloud import storage as gcs | ||
|
||
from .conftest import get_bucket_client, get_credentials # noqa: TID252 | ||
|
||
|
||
@pytest.mark.parametrize( | ||
"dagster_yaml", | ||
["json-credentials.yaml"], | ||
indirect=True, | ||
) | ||
def test_compute_log_manager( | ||
dagster_dev: subprocess.Popen, | ||
bucket_client: gcs.Bucket, | ||
prefix_env: str, | ||
credentials: Mapping, | ||
dagster_yaml: Path, | ||
) -> None: | ||
subprocess.run( | ||
["dagster", "asset", "materialize", "--select", "my_asset", "-m", "gcp_test_proj.defs"], | ||
check=True, | ||
) | ||
logs_captured_data = check.not_none( | ||
DagsterInstance.get() | ||
.get_event_records( | ||
EventRecordsFilter( | ||
event_type=DagsterEventType.LOGS_CAPTURED, | ||
) | ||
)[0] | ||
.event_log_entry.dagster_event | ||
).logs_captured_data | ||
|
||
assert logs_captured_data.external_stderr_url | ||
assert logs_captured_data.external_stdout_url | ||
stderr = get_content_from_url(logs_captured_data.external_stderr_url) | ||
stdout = get_content_from_url(logs_captured_data.external_stdout_url) | ||
assert stdout.count("Printing without context") == 10 | ||
assert stderr.count("Logging using context") == 10 | ||
Comment on lines
+29
to
+48
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is all mostly the same as azure |
||
|
||
|
||
def _parse_gcs_url_into_uri(url: str) -> str: | ||
return url.removeprefix( | ||
"https://console.cloud.google.com/storage/browser/_details/computelogmanager-tests/" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the url format can't be directly downloaded; so we need to do some gross munging to pull out the downloadable log key. We actually hard code a similar string in the compute log manager itself, we should turn that into a constant and use it in both places ideally. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think this is overlap with my work item: https://github.com/dagster-io/dagster/pull/26723/files#diff-305fa8635f1d64e4c949271ee4a40747faf8e778da55094423b783bb6e76c36dR266 but i think it's fine to leave as is in context of this test, the test will break if the prefix changes, and it can be addressed then. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yup sounds good to me |
||
) | ||
|
||
|
||
def get_content_from_url(url: str) -> str: | ||
uri = _parse_gcs_url_into_uri(url) | ||
client = get_bucket_client(get_credentials()) | ||
blob = client.blob(uri) | ||
return blob.download_as_string().decode() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
from setuptools import find_packages, setup | ||
|
||
setup( | ||
name="gcp-test-proj", | ||
packages=find_packages(), | ||
install_requires=[ | ||
"dagster", | ||
"dagster-webserver", | ||
"dagster-gcp", | ||
], | ||
extras_require={"test": ["pytest"]}, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
[tox] | ||
skipsdist = true | ||
|
||
[testenv] | ||
download = True | ||
passenv = | ||
CI_* | ||
COVERALLS_REPO_TOKEN | ||
BUILDKITE* | ||
TEST_GCP* | ||
GCP_LIVE_TEST_CREDENTIALS | ||
install_command = uv pip install {opts} {packages} | ||
deps = | ||
-e ../../../python_modules/dagster[test] | ||
-e ../../../python_modules/dagster-webserver | ||
-e ../../../python_modules/dagster-test | ||
-e ../../../python_modules/dagster-pipes | ||
-e ../../../python_modules/dagster-graphql | ||
-e ../../../python_modules/libraries/dagster-gcp | ||
-e . | ||
allowlist_externals = | ||
/bin/bash | ||
uv | ||
commands = | ||
# We need to rebuild the UI to ensure that the dagster-webserver can run | ||
make -C ../../.. rebuild_ui | ||
!windows: /bin/bash -c '! pip list --exclude-editable | grep -e dagster' | ||
pytest ./integration_tests --snapshot-warn-unused -vv -s {posargs} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i am not sure how this is being used / tested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea I ought to add a line for it but it shows up in stderr.