Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve server-client communication error handling #6578

Open
wants to merge 4 commits into
base: 8.4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6578.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improved handling of any internal errors when executing commands against a running workflow.
70 changes: 56 additions & 14 deletions cylc/flow/network/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
import asyncio
import getpass
import json
from typing import Optional, Tuple
from typing import (
TYPE_CHECKING,
Optional,
Tuple,
Union,
)

import zmq
import zmq.asyncio
Expand All @@ -30,34 +35,71 @@
CylcError,
CylcVersionError,
ServiceFileError,
WorkflowStopped
WorkflowStopped,
)
from cylc.flow.hostuserutil import get_fqdn_by_host
from cylc.flow.workflow_files import (
ContactFileFields,
KeyType,
KeyOwner,
KeyInfo,
KeyOwner,
KeyType,
get_workflow_srv_dir,
load_contact_file,
get_workflow_srv_dir
)


if TYPE_CHECKING:
# BACK COMPAT: typing_extensions.TypedDict
# FROM: Python 3.7
# TO: Python 3.11
from typing_extensions import TypedDict


API = 5 # cylc API version
MSG_TIMEOUT = "TIMEOUT"

if TYPE_CHECKING:
class ResponseDict(TypedDict, total=False):
"""Structure of server response messages.

def encode_(message):
"""Convert the structure holding a message field from JSON to a string."""
try:
return json.dumps(message)
except TypeError as exc:
return json.dumps({'errors': [{'message': str(exc)}]})
Confusingly, has similar format to GraphQL execution result.
But if we change this now we could break compatibility for
issuing commands to/receiving responses from workflows running in
different versions of Cylc 8.
"""
data: object
"""For most Cylc commands that issue GQL mutations, the data field will
look like:
data: {
<mutationName1>: {
result: [
{
id: <workflow/task ID>,
response: [<success_bool>, <message>]
},
...
]
}
}
but this is not 100% consistent unfortunately
"""
error: Union[Exception, str, dict]
"""If an error occurred that could not be handled.
(usually a dict {message: str, traceback?: str}).
"""
user: str
cylc_version: str
"""Server (i.e. running workflow) Cylc version.

Going forward, we include this so we can more easily handle any future
back-compat issues."""


def decode_(message):
"""Convert an encoded message string to JSON with an added 'user' field."""
def load_server_response(message: str) -> 'ResponseDict':
"""Convert a JSON message string to dict with an added 'user' field."""
msg = json.loads(message)
msg['user'] = getpass.getuser() # assume this is the user
if 'user' not in msg:
msg['user'] = getpass.getuser() # assume this is the user
return msg


Expand Down
70 changes: 43 additions & 27 deletions cylc/flow/network/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,31 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Client for workflow runtime API."""

from abc import ABCMeta, abstractmethod
from abc import (
ABCMeta,
abstractmethod,
)
import asyncio
import json
import os
from shutil import which
import socket
import sys
from typing import Any, Optional, Union, Dict
from typing import (
TYPE_CHECKING,
Any,
Dict,
Optional,
Union,
)

import zmq
import zmq.asyncio

from cylc.flow import LOG
from cylc.flow import (
LOG,
__version__ as CYLC_VERSION,
)
from cylc.flow.exceptions import (
ClientError,
ClientTimeout,
Expand All @@ -36,16 +49,17 @@
)
from cylc.flow.hostuserutil import get_fqdn_by_host
from cylc.flow.network import (
encode_,
decode_,
ZMQSocketBase,
get_location,
ZMQSocketBase
load_server_response,
)
from cylc.flow.network.client_factory import CommsMeth
from cylc.flow.network.server import PB_METHOD_MAP
from cylc.flow.workflow_files import (
detect_old_contact_file,
)
from cylc.flow.workflow_files import detect_old_contact_file


if TYPE_CHECKING:
from cylc.flow.network import ResponseDict


class WorkflowRuntimeClientBase(metaclass=ABCMeta):
Expand Down Expand Up @@ -270,7 +284,7 @@ async def async_request(
args: Optional[Dict[str, Any]] = None,
timeout: Optional[float] = None,
req_meta: Optional[Dict[str, Any]] = None
) -> object:
) -> Union[bytes, object]:
"""Send an asynchronous request using asyncio.

Has the same arguments and return values as ``serial_request``.
Expand All @@ -292,12 +306,12 @@ async def async_request(
if req_meta:
msg['meta'].update(req_meta)
LOG.debug('zmq:send %s', msg)
message = encode_(msg)
message = json.dumps(msg)
self.socket.send_string(message)

# receive response
if self.poller.poll(timeout):
res = await self.socket.recv()
res: bytes = await self.socket.recv()
else:
self.timeout_handler()
raise ClientTimeout(
Expand All @@ -307,26 +321,28 @@ async def async_request(
' --comms-timeout option;'
'\n* or check the workflow log.'
)
LOG.debug('zmq:recv %s', res)

if msg['command'] in PB_METHOD_MAP:
response = {'data': res}
else:
response = decode_(
res.decode() if isinstance(res, bytes) else res
)
LOG.debug('zmq:recv %s', response)
if command in PB_METHOD_MAP:
return res

response: ResponseDict = load_server_response(res.decode())

try:
return response['data']
except KeyError:
error = response.get(
'error',
{'message': f'Received invalid response: {response}'},
)
raise ClientError(
error.get('message'), # type: ignore
error.get('traceback'), # type: ignore
) from None
error = response.get('error')
if not error:
error = (
f"Received invalid response for Cylc {CYLC_VERSION}: "
f"{response}"
)
wflow_cylc_ver = response.get('cylc_version')
if wflow_cylc_ver:
error += (
f"\n(Workflow is running in Cylc {wflow_cylc_ver})"
)
raise ClientError(str(error)) from None

def get_header(self) -> dict:
"""Return "header" data to attach to each request for traceability.
Expand Down
44 changes: 39 additions & 5 deletions cylc/flow/network/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,36 @@

import asyncio
import sys
from typing import Callable, Dict, List, Tuple, Optional, Union, Type
from typing import (
Callable,
Dict,
List,
Optional,
Tuple,
Type,
Union,
)

from ansimarkup import ansiprint

from cylc.flow import __version__ as CYLC_VERSION
from cylc.flow.async_util import unordered_map
from cylc.flow.exceptions import CylcError, WorkflowStopped
from cylc.flow.exceptions import (
CylcError,
WorkflowStopped,
)
import cylc.flow.flags
from cylc.flow.id_cli import parse_ids_async
from cylc.flow.terminal import DIM


# Known error messages for incompatibilites between this version of Cylc (that
# is running the command) and the version of Cylc running the workflow:
KNOWN_INCOMPAT = {
'Unknown argument "onResume" on field "trigger" of type "Mutations".',
}


def call_multi(*args, **kwargs):
"""Call a function for each workflow in a list of IDs.

Expand Down Expand Up @@ -220,21 +239,36 @@


def _report(
response: dict,
response: Union[dict, list],
) -> Tuple[Optional[str], Optional[str], bool]:
"""Report the result of a GraphQL operation.

This analyses GraphQL mutation responses to determine the outcome.

Args:
response: The GraphQL response.
response: The workflow server response (NOT necessarily conforming to
GraphQL execution result spec).

Returns:
(stdout, stderr, outcome)

"""
try:
ret: List[Tuple[Optional[str], Optional[str], bool]] = []
if not isinstance(response, dict):
if isinstance(response, list) and response[0].get('error'):
# If operating on workflow running in older Cylc version,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that you are trying to catch as much as possible for the future, but might we be able to identify particular problems and provide a more obvious error?

--- a/cylc/flow/network/multi.py
+++ b/cylc/flow/network/multi.py
@@ -38,6 +38,13 @@ from cylc.flow.id_cli import parse_ids_async
 from cylc.flow.terminal import DIM
 
 
+KNOWN_ERRS = {
+    'Unknown argument "onResume" on field "trigger" of type "Mutations".':
+        'Tasks in workflows running using versions of Cylc older than 8.4.0'
+        ' cannot be triggered using Cylc 8.4.0.'
+}
+
+
 def call_multi(*args, **kwargs):
     """Call a function for each workflow in a list of IDs.
 
@@ -251,6 +258,10 @@ def _report(
             if isinstance(response, list) and response[0].get('error'):
                 # If operating on workflow running in older Cylc version,
                 # may get a error response like [{'error': '...'}]
+                if response[0].get('error').get('message') in KNOWN_ERRS:
+                    raise CylcError(
+                        KNOWN_ERRS[response[0].get('error').get('message')]
+                    )
                 raise Exception(response)
             raise Exception(f"Unexpected response: {response}")
         for mutation_response in response.values():

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've gone for a slightly simpler version

# may get a error response like [{'error': '...'}]
if response[0]['error'].get('message') in KNOWN_INCOMPAT:
raise Exception(

Check warning on line 263 in cylc/flow/network/multi.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/multi.py#L263

Added line #L263 was not covered by tests
"This command is no longer compatible with the "
"version of Cylc running the workflow. Please stop & "
f"restart the workflow with Cylc {CYLC_VERSION} "
"or higher."
f"\n\n{response}"
)
raise Exception(response)
raise Exception(f"Unexpected response: {response}")

Check warning on line 271 in cylc/flow/network/multi.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/network/multi.py#L270-L271

Added lines #L270 - L271 were not covered by tests
for mutation_response in response.values():
# extract the result of each mutation result in the response
success, msg = mutation_response['result'][0]['response']
Expand Down Expand Up @@ -268,7 +302,7 @@
# response returned is not in the expected format - this shouldn't
# happen but we need to protect against it
err_msg = ''
if cylc.flow.flags.verbosity > 1: # debug mode
if cylc.flow.flags.verbosity > 0: # verbose mode
# print the full result to stderr
err_msg += f'\n <{DIM}>response={response}</{DIM}>'
return (
Expand Down
Loading
Loading