-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use a single code server #26818
Open
dpeng817
wants to merge
1
commit into
dpeng817/use_code_server_start
Choose a base branch
from
dpeng817/single_code_server
base: dpeng817/use_code_server_start
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+246
−136
Open
Use a single code server #26818
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,11 +2,14 @@ | |
import os | ||
import subprocess | ||
import sys | ||
import tempfile | ||
import time | ||
from contextlib import contextmanager | ||
from pathlib import Path | ||
from typing import Optional | ||
from typing import Iterator, Optional | ||
|
||
import click | ||
import yaml | ||
|
||
import dagster._check as check | ||
from dagster._annotations import deprecated | ||
|
@@ -21,6 +24,12 @@ | |
working_directory_option, | ||
workspace_option, | ||
) | ||
from dagster._core.remote_representation.origin import ( | ||
GrpcServerCodeLocationOrigin, | ||
ManagedGrpcPythonEnvCodeLocationOrigin, | ||
) | ||
from dagster._core.workspace.context import WorkspaceProcessContext | ||
from dagster._grpc.server import GrpcServerCommand | ||
from dagster._serdes import serialize_value | ||
from dagster._serdes.ipc import interrupt_ipc_subprocess, open_ipc_subprocess | ||
from dagster._utils.log import configure_loggers | ||
|
@@ -122,8 +131,7 @@ def dev_command( | |
configure_loggers(formatter=log_format, log_level=log_level.upper()) | ||
logger = logging.getLogger("dagster") | ||
|
||
# Sanity check workspace args | ||
get_workspace_load_target(kwargs) | ||
workspace_target = get_workspace_load_target(kwargs) | ||
|
||
dagster_home_path = os.getenv("DAGSTER_HOME") | ||
|
||
|
@@ -140,103 +148,117 @@ def dev_command( | |
) | ||
|
||
with get_possibly_temporary_instance_for_cli("dagster dev", logger=logger) as instance: | ||
logger.info("Launching Dagster services...") | ||
|
||
args = [ | ||
"--instance-ref", | ||
serialize_value(instance.get_ref()), | ||
"--code-server-log-level", | ||
code_server_log_level, | ||
] | ||
|
||
if kwargs.get("workspace"): | ||
for workspace in check.tuple_elem(kwargs, "workspace"): | ||
args.extend(["--workspace", workspace]) | ||
|
||
if kwargs.get("python_file"): | ||
for python_file in check.tuple_elem(kwargs, "python_file"): | ||
args.extend(["--python-file", python_file]) | ||
|
||
if kwargs.get("module_name"): | ||
for module_name in check.tuple_elem(kwargs, "module_name"): | ||
args.extend(["--module-name", module_name]) | ||
|
||
if kwargs.get("working_directory"): | ||
args.extend(["--working-directory", check.str_elem(kwargs, "working_directory")]) | ||
|
||
if kwargs.get("grpc_port"): | ||
args.extend(["--grpc-port", str(kwargs["grpc_port"])]) | ||
|
||
if kwargs.get("grpc_host"): | ||
args.extend(["--grpc-host", str(kwargs["grpc_host"])]) | ||
|
||
if kwargs.get("grpc_socket"): | ||
args.extend(["--grpc-socket", str(kwargs["grpc_socket"])]) | ||
|
||
if kwargs.get("use_ssl"): | ||
args.extend(["--use-ssl"]) | ||
|
||
webserver_process = open_ipc_subprocess( | ||
[sys.executable, "-m", "dagster_webserver"] | ||
+ (["--port", port] if port else []) | ||
+ (["--host", host] if host else []) | ||
+ (["--dagster-log-level", log_level]) | ||
+ (["--log-format", log_format]) | ||
+ (["--live-data-poll-rate", live_data_poll_rate] if live_data_poll_rate else []) | ||
+ args | ||
) | ||
daemon_process = open_ipc_subprocess( | ||
[ | ||
sys.executable, | ||
"-m", | ||
"dagster._daemon", | ||
"run", | ||
"--log-level", | ||
log_level, | ||
"--log-format", | ||
log_format, | ||
] | ||
+ args | ||
) | ||
try: | ||
while True: | ||
time.sleep(_CHECK_SUBPROCESS_INTERVAL) | ||
|
||
if webserver_process.poll() is not None: | ||
raise Exception( | ||
"dagster-webserver process shut down unexpectedly with return code" | ||
f" {webserver_process.returncode}" | ||
) | ||
with WorkspaceProcessContext( | ||
instance, | ||
workspace_target, | ||
code_server_log_level=code_server_log_level, | ||
server_command=GrpcServerCommand.CODE_SERVER_START, | ||
) as context: | ||
with _temp_grpc_socket_workspace_file(context) as workspace_file: | ||
logger.info("Launching Dagster services...") | ||
|
||
if daemon_process.poll() is not None: | ||
raise Exception( | ||
"dagster-daemon process shut down unexpectedly with return code" | ||
f" {daemon_process.returncode}" | ||
) | ||
args = [ | ||
"--instance-ref", | ||
serialize_value(instance.get_ref()), | ||
"--workspace", | ||
workspace_file, | ||
"--code-server-log-level", | ||
code_server_log_level, | ||
] | ||
|
||
except KeyboardInterrupt: | ||
logger.info("KeyboardInterrupt received") | ||
except: | ||
logger.exception("An unexpected exception has occurred") | ||
finally: | ||
logger.info("Shutting down Dagster services...") | ||
interrupt_ipc_subprocess(daemon_process) | ||
interrupt_ipc_subprocess(webserver_process) | ||
|
||
try: | ||
webserver_process.wait(timeout=_SUBPROCESS_WAIT_TIMEOUT) | ||
except subprocess.TimeoutExpired: | ||
logger.warning( | ||
"dagster-webserver process did not terminate cleanly, killing the process" | ||
) | ||
webserver_process.kill() | ||
if kwargs.get("use_ssl"): | ||
args.extend(["--use-ssl"]) | ||
|
||
try: | ||
daemon_process.wait(timeout=_SUBPROCESS_WAIT_TIMEOUT) | ||
except subprocess.TimeoutExpired: | ||
logger.warning( | ||
"dagster-daemon process did not terminate cleanly, killing the process" | ||
webserver_process = open_ipc_subprocess( | ||
[sys.executable, "-m", "dagster_webserver"] | ||
+ (["--port", port] if port else []) | ||
+ (["--host", host] if host else []) | ||
+ (["--dagster-log-level", log_level]) | ||
+ (["--log-format", log_format]) | ||
+ ( | ||
["--live-data-poll-rate", live_data_poll_rate] | ||
if live_data_poll_rate | ||
else [] | ||
) | ||
+ args | ||
) | ||
daemon_process = open_ipc_subprocess( | ||
[ | ||
sys.executable, | ||
"-m", | ||
"dagster._daemon", | ||
"run", | ||
"--log-level", | ||
log_level, | ||
"--log-format", | ||
log_format, | ||
] | ||
+ args | ||
) | ||
daemon_process.kill() | ||
try: | ||
while True: | ||
time.sleep(_CHECK_SUBPROCESS_INTERVAL) | ||
|
||
if webserver_process.poll() is not None: | ||
raise Exception( | ||
"dagster-webserver process shut down unexpectedly with return code" | ||
f" {webserver_process.returncode}" | ||
) | ||
|
||
if daemon_process.poll() is not None: | ||
raise Exception( | ||
"dagster-daemon process shut down unexpectedly with return code" | ||
f" {daemon_process.returncode}" | ||
) | ||
|
||
except KeyboardInterrupt: | ||
logger.info("KeyboardInterrupt received") | ||
except: | ||
logger.exception("An unexpected exception has occurred") | ||
finally: | ||
logger.info("Shutting down Dagster services...") | ||
interrupt_ipc_subprocess(daemon_process) | ||
interrupt_ipc_subprocess(webserver_process) | ||
|
||
try: | ||
webserver_process.wait(timeout=_SUBPROCESS_WAIT_TIMEOUT) | ||
except subprocess.TimeoutExpired: | ||
logger.warning( | ||
"dagster-webserver process did not terminate cleanly, killing the process" | ||
) | ||
webserver_process.kill() | ||
|
||
try: | ||
daemon_process.wait(timeout=_SUBPROCESS_WAIT_TIMEOUT) | ||
except subprocess.TimeoutExpired: | ||
logger.warning( | ||
"dagster-daemon process did not terminate cleanly, killing the process" | ||
) | ||
daemon_process.kill() | ||
|
||
logger.info("Dagster services shut down.") | ||
|
||
|
||
logger.info("Dagster services shut down.") | ||
@contextmanager | ||
def _temp_grpc_socket_workspace_file(context: WorkspaceProcessContext) -> Iterator[str]: | ||
location_specs = [] | ||
with tempfile.NamedTemporaryFile(mode="w+") as temp_file: | ||
for origin in context._origins: # noqa: SLF001 | ||
if isinstance(origin, ManagedGrpcPythonEnvCodeLocationOrigin): | ||
grpc_endpoint = context._grpc_server_registry.get_grpc_endpoint(origin) # noqa: SLF001 | ||
server_spec = { | ||
"location_name": origin.location_name, | ||
"socket": grpc_endpoint.socket, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. local grpc servers can also use ports (notably on windows) |
||
} | ||
elif isinstance(origin, GrpcServerCodeLocationOrigin): | ||
server_spec = { | ||
"location_name": origin.location_name, | ||
"host": origin.host, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can include host either way (it will just be localhost in the managed case) |
||
"port": origin.port, | ||
} | ||
else: | ||
check.failed(f"Unexpected origin type {origin}") | ||
location_specs.append({"grpc_server": server_spec}) | ||
temp_file.write(yaml.dump({"load_from": location_specs})) | ||
temp_file.flush() | ||
yield temp_file.name |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add this as a real code_location_origins property on WorkspaceProcessContext?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe a method on WorkspaceProcessContext that returns a list of server specs and handles the origin switching there?