-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[single-grpc-server][rfc] introduce a heartbeat to proxy server #26777
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
import logging | ||
import sys | ||
import threading | ||
import time | ||
from contextlib import ExitStack | ||
from typing import TYPE_CHECKING, Dict, Optional | ||
|
||
|
@@ -48,9 +49,10 @@ def __init__( | |
server_termination_event: threading.Event, | ||
instance_ref: Optional[InstanceRef], | ||
logger: logging.Logger, | ||
server_heartbeat: bool, | ||
server_heartbeat_timeout: int, | ||
): | ||
super(DagsterProxyApiServicer, self).__init__() | ||
|
||
self._loadable_target_origin = loadable_target_origin | ||
self._fixed_server_id = fixed_server_id | ||
self._container_image = container_image | ||
|
@@ -63,8 +65,8 @@ def __init__( | |
|
||
self._client = None | ||
self._load_error = None | ||
self._heartbeat_shutdown_event = None | ||
self._heartbeat_thread = None | ||
self._client_heartbeat_shutdown_event = None | ||
self._client_heartbeat_thread = None | ||
|
||
self._exit_stack = ExitStack() | ||
|
||
|
@@ -100,6 +102,17 @@ def __init__( | |
daemon=True, | ||
) | ||
|
||
self.__last_heartbeat_time = time.time() | ||
self.__server_heartbeat_thread = None | ||
if server_heartbeat: | ||
self.__server_heartbeat_thread = threading.Thread( | ||
target=self._server_heartbeat_thread, | ||
args=(server_heartbeat_timeout,), | ||
name="grpc-server-heartbeat", | ||
daemon=True, | ||
) | ||
self.__server_heartbeat_thread.start() | ||
|
||
self.__cleanup_thread.start() | ||
|
||
# Map runs to the client that launched them, so that we can route | ||
|
@@ -121,22 +134,22 @@ def _reload_location(self): | |
self._logger.exception("Failure while loading code") | ||
|
||
if self._client: | ||
self._heartbeat_shutdown_event = threading.Event() | ||
self._heartbeat_thread = threading.Thread( | ||
self._client_heartbeat_shutdown_event = threading.Event() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rename these properties because now it's confusing that there's a client and server heartbeat thread |
||
self._client_heartbeat_thread = threading.Thread( | ||
target=client_heartbeat_thread, | ||
args=( | ||
self._client, | ||
self._heartbeat_shutdown_event, | ||
self._client_heartbeat_shutdown_event, | ||
), | ||
name="grpc-client-heartbeat", | ||
daemon=True, | ||
) | ||
self._heartbeat_thread.start() | ||
self._client_heartbeat_thread.start() | ||
|
||
def ReloadCode(self, request, context): | ||
with self._reload_lock: # can only call this method once at a time | ||
old_heartbeat_shutdown_event = self._heartbeat_shutdown_event | ||
old_heartbeat_thread = self._heartbeat_thread | ||
old_heartbeat_shutdown_event = self._client_heartbeat_shutdown_event | ||
old_heartbeat_thread = self._client_heartbeat_thread | ||
old_client = self._client | ||
|
||
self._reload_location() # Creates and starts a new heartbeat thread | ||
|
@@ -156,13 +169,13 @@ def cleanup(self): | |
# In case ShutdownServer was not called | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. clean up the server one here too There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dpeng817 did you see this one? you want something like this with the server heartbeat thread I think: https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_grpc/server.py#L462-L463 |
||
self._shutdown_once_executions_finish_event.set() | ||
|
||
if self._heartbeat_shutdown_event: | ||
self._heartbeat_shutdown_event.set() | ||
self._heartbeat_shutdown_event = None | ||
if self._client_heartbeat_shutdown_event: | ||
self._client_heartbeat_shutdown_event.set() | ||
self._client_heartbeat_shutdown_event = None | ||
|
||
if self._heartbeat_thread: | ||
self._heartbeat_thread.join() | ||
self._heartbeat_thread = None | ||
if self._client_heartbeat_thread: | ||
self._client_heartbeat_thread.join() | ||
self._client_heartbeat_thread = None | ||
|
||
self._exit_stack.close() | ||
|
||
|
@@ -186,6 +199,18 @@ def _query(self, api_name: str, request, _context, timeout: int = DEFAULT_GRPC_T | |
raise Exception("No available client to code serer") | ||
return check.not_none(self._client)._get_response(api_name, request, timeout) # noqa | ||
|
||
def _server_heartbeat_thread(self, heartbeat_timeout: int) -> None: | ||
while True: | ||
if self._server_termination_event.is_set(): | ||
break | ||
|
||
self._shutdown_once_executions_finish_event.wait(heartbeat_timeout) | ||
if self._shutdown_once_executions_finish_event.is_set(): | ||
break | ||
|
||
if self.__last_heartbeat_time < time.time() - heartbeat_timeout: | ||
self._shutdown_once_executions_finish_event.set() | ||
|
||
def _streaming_query( | ||
self, api_name: str, request, _context, timeout: int = DEFAULT_GRPC_TIMEOUT | ||
): | ||
|
@@ -216,6 +241,7 @@ def StreamingExternalRepository(self, request, context): | |
return self._streaming_query("StreamingExternalRepository", request, context) | ||
|
||
def Heartbeat(self, request, context): | ||
self.__last_heartbeat_time = time.time() | ||
return self._query("Heartbeat", request, context) | ||
|
||
def StreamingPing(self, request, context): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ | |
import re | ||
import subprocess | ||
import sys | ||
import time | ||
|
||
import pytest | ||
from dagster import _seven | ||
|
@@ -548,7 +549,46 @@ def test_load_timeout(): | |
assert "StatusCode.UNAVAILABLE" in str(timeout_exception) | ||
|
||
|
||
def test_load_timeout_code_server_cli(): | ||
def test_server_heartbeat_timeout_code_server_cli() -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like this new test is failing in CI? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm weird. Passed for me locally. |
||
"""Test that without a heartbeat from the calling process, the server will eventually time out.""" | ||
port = find_free_port() | ||
python_file = file_relative_path(__file__, "grpc_repo.py") | ||
|
||
subprocess_args = [ | ||
"dagster", | ||
"code-server", | ||
"start", | ||
"--port", | ||
str(port), | ||
"--python-file", | ||
python_file, | ||
"--heartbeat", | ||
"--heartbeat-timeout", | ||
"1", | ||
] | ||
|
||
process = subprocess.Popen(subprocess_args) | ||
|
||
try: | ||
client = DagsterGrpcClient(port=port, host="localhost") | ||
wait_for_grpc_server( | ||
process, | ||
DagsterGrpcClient(port=port, host="localhost"), | ||
subprocess_args, | ||
) | ||
# Send out an initial heartbeat, ensure server is alive to begin with. | ||
client.ping("foobar") | ||
client.shutdown_server() | ||
assert process.poll() is None | ||
time.sleep(2) | ||
assert process.poll() == 0 | ||
|
||
finally: | ||
process.terminate() | ||
process.wait() | ||
|
||
|
||
def test_load_timeout_code_server_cli() -> None: | ||
port = find_free_port() | ||
python_file = file_relative_path(__file__, "grpc_repo_that_times_out.py") | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe worth clarifying here that this only comes into play if --heartbeat is set