From 533d08c7a695ef972afa0f6a887b3b452bf5d858 Mon Sep 17 00:00:00 2001 From: Martin Paces Date: Wed, 19 Feb 2025 19:12:41 +0100 Subject: [PATCH 1/6] Sending WPS POST requests with the proper content type header. --- src/viresclient/_wps/wps.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/src/viresclient/_wps/wps.py b/src/viresclient/_wps/wps.py index fd40ca1..cc07532 100644 --- a/src/viresclient/_wps/wps.py +++ b/src/viresclient/_wps/wps.py @@ -103,6 +103,7 @@ class WPS10Service: headers - optional dictionary of the HTTP headers sent with each request """ + DEFAULT_CONTENT_TYPE = "application/xml; charset=utf-8" STATUS = { "{http://www.opengis.net/wps/1.0.0}ProcessAccepted": "ACCEPTED", @@ -120,12 +121,17 @@ def __init__(self, url, headers=None, logger=None): self.headers = headers or {} self.logger = self._LoggerAdapter(logger or getLogger(__name__), {}) - def retrieve(self, request, handler=None): + def retrieve(self, request, handler=None, content_type=None): """Send a synchronous POST WPS request to a server and retrieve the output. """ + headers = { + **self.headers, + "Content-Type": content_type or self.DEFAULT_CONTENT_TYPE, + } + return self._retrieve( - Request(self.url, request, self.headers), handler, self.error_handler + Request(self.url, request, headers), handler, self.error_handler ) def retrieve_async( @@ -136,13 +142,14 @@ def retrieve_async( cleanup_handler=None, polling_interval=1, output_name="output", + content_type=None, ): """Send an asynchronous POST WPS request to a server and retrieve the output. """ timer = Timer() status, percentCompleted, status_url, execute_response = self.submit_async( - request + request, content_type=content_type, ) wpsstatus = WPSStatus() wpsstatus.update(status, percentCompleted, status_url, execute_response) @@ -212,13 +219,17 @@ def parse_output_reference(xml, identifier): ) return elm_reference.attrib["href"] - def submit_async(self, request): + def submit_async(self, request, content_type=None): """Send a POST WPS asynchronous request to a server and retrieve the status URL. """ self.logger.debug("Submitting asynchronous job.") + headers = { + **self.headers, + "Content-Type": content_type or self.DEFAULT_CONTENT_TYPE, + } return self._retrieve( - Request(self.url, request, self.headers), + Request(self.url, request, headers), self.parse_status, self.error_handler, ) From c3985114c454086efccf9491f694c61e58ee0ec8 Mon Sep 17 00:00:00 2001 From: Martin Paces Date: Wed, 19 Feb 2025 19:22:47 +0100 Subject: [PATCH 2/6] Handle gracefully relative response URLs. --- src/viresclient/_wps/wps.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/viresclient/_wps/wps.py b/src/viresclient/_wps/wps.py index cc07532..e6dd34c 100644 --- a/src/viresclient/_wps/wps.py +++ b/src/viresclient/_wps/wps.py @@ -27,12 +27,9 @@ # THE SOFTWARE. # ------------------------------------------------------------------------------- -try: - from urllib.error import HTTPError - from urllib.request import Request, urlopen -except ImportError: - # Python 2 backward compatibility - from urllib2 import urlopen, Request, HTTPError +from urllib.error import HTTPError +from urllib.request import Request, urlopen +from urllib.parse import urljoin from contextlib import closing from logging import LoggerAdapter, getLogger @@ -152,7 +149,10 @@ def retrieve_async( request, content_type=content_type, ) wpsstatus = WPSStatus() - wpsstatus.update(status, percentCompleted, status_url, execute_response) + wpsstatus.update( + status, percentCompleted, urljoin(self.url, status_url), + execute_response + ) def log_wpsstatus(wpsstatus): self.logger.info( @@ -176,7 +176,9 @@ def log_wpsstatus_percentCompleted(wpsstatus): last_status = wpsstatus.status last_percentCompleted = wpsstatus.percentCompleted - wpsstatus.update(*self.poll_status(wpsstatus.url)) + wpsstatus.update( + *self.poll_status(urljoin(self.url, wpsstatus.url)) + ) if wpsstatus.status != last_status: log_wpsstatus(wpsstatus) @@ -204,7 +206,9 @@ def retrieve_async_output(self, status_url, output_name, handler=None): """Retrieve asynchronous job output reference.""" self.logger.debug("Retrieving asynchronous job output '%s'.", output_name) output_url = self.parse_output_reference(status_url, output_name) - return self._retrieve(Request(output_url, None, self.headers), handler) + return self._retrieve( + Request(urljoin(self.url, output_url), None, self.headers), handler + ) @staticmethod def parse_output_reference(xml, identifier): From 1533d5ac296694ce237d424cfd22a5d25ccb4f47 Mon Sep 17 00:00:00 2001 From: Martin Paces Date: Wed, 19 Feb 2025 19:34:58 +0100 Subject: [PATCH 3/6] Handle gracefully xlink:href references. --- src/viresclient/_wps/wps.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/viresclient/_wps/wps.py b/src/viresclient/_wps/wps.py index e6dd34c..cc3c46e 100644 --- a/src/viresclient/_wps/wps.py +++ b/src/viresclient/_wps/wps.py @@ -221,7 +221,10 @@ def parse_output_reference(xml, identifier): elm_reference = elm.find( "./{http://www.opengis.net/wps/1.0.0}Reference" ) - return elm_reference.attrib["href"] + return ( + elm_reference.attrib.get("{http://www.w3.org/1999/xlink}href") or + elm_reference.attrib["href"] + ) def submit_async(self, request, content_type=None): """Send a POST WPS asynchronous request to a server and retrieve From bf41e4b901c3c6d641f76983f3ce19fe06f82672 Mon Sep 17 00:00:00 2001 From: Martin Paces Date: Wed, 19 Feb 2025 21:49:55 +0100 Subject: [PATCH 4/6] Implementing re-trials for failed WPS asynchronous job status requests. - 3 re-tries after 20 seconds - changing the default logging level to ERROR --- src/viresclient/_client.py | 4 +++- src/viresclient/_client_aeolus.py | 4 ++-- src/viresclient/_client_swarm.py | 4 ++-- src/viresclient/_wps/wps.py | 33 ++++++++++++++++++++++++++----- 4 files changed, 35 insertions(+), 10 deletions(-) diff --git a/src/viresclient/_client.py b/src/viresclient/_client.py index c64e83f..4f5946d 100644 --- a/src/viresclient/_client.py +++ b/src/viresclient/_client.py @@ -70,6 +70,8 @@ "NO_LOGGING": CRITICAL + 1, } +DEFAULT_LOGGING_LEVEL = "ERROR" + # File type to WPS output name RESPONSE_TYPES = { "csv": "text/csv", @@ -258,7 +260,7 @@ def __init__( url=None, token=None, config=None, - logging_level="NO_LOGGING", + logging_level=DEFAULT_LOGGING_LEVEL, server_type=None, ): self._server_type = server_type diff --git a/src/viresclient/_client_aeolus.py b/src/viresclient/_client_aeolus.py index edddfcd..c20568d 100644 --- a/src/viresclient/_client_aeolus.py +++ b/src/viresclient/_client_aeolus.py @@ -3,7 +3,7 @@ import pandas as pd -from ._client import ClientRequest, WPSInputs +from ._client import ClientRequest, WPSInputs, DEFAULT_LOGGING_LEVEL from ._data import CONFIG_AEOLUS from ._data_handling import ReturnedDataFile @@ -222,7 +222,7 @@ class AeolusRequest(ClientRequest): """ - def __init__(self, url=None, token=None, config=None, logging_level="NO_LOGGING"): + def __init__(self, url=None, token=None, config=None, logging_level=DEFAULT_LOGGING_LEVEL): super().__init__(url, token, config, logging_level, server_type="Aeolus") # self._available = self._set_available_data() self._request_inputs = AeolusWPSInputs() diff --git a/src/viresclient/_client_swarm.py b/src/viresclient/_client_swarm.py index 8fd16ba..e5895d8 100644 --- a/src/viresclient/_client_swarm.py +++ b/src/viresclient/_client_swarm.py @@ -12,7 +12,7 @@ from pandas import read_csv from tqdm import tqdm -from ._client import TEMPLATE_FILES, ClientRequest, WPSInputs +from ._client import TEMPLATE_FILES, ClientRequest, WPSInputs, DEFAULT_LOGGING_LEVEL from ._data import CONFIG_SWARM from ._data_handling import ReturnedDataFile from ._wps.environment import JINJA2_ENVIRONMENT @@ -1344,7 +1344,7 @@ class SwarmRequest(ClientRequest): "SwarmCI", ] - def __init__(self, url=None, token=None, config=None, logging_level="NO_LOGGING"): + def __init__(self, url=None, token=None, config=None, logging_level=DEFAULT_LOGGING_LEVEL): super().__init__(url, token, config, logging_level, server_type="Swarm") self._available = self._get_available_data() diff --git a/src/viresclient/_wps/wps.py b/src/viresclient/_wps/wps.py index cc3c46e..26fe7f1 100644 --- a/src/viresclient/_wps/wps.py +++ b/src/viresclient/_wps/wps.py @@ -30,12 +30,10 @@ from urllib.error import HTTPError from urllib.request import Request, urlopen from urllib.parse import urljoin - from contextlib import closing from logging import LoggerAdapter, getLogger from time import sleep from xml.etree import ElementTree - from .time_util import Timer NS_OWS11 = "http://www.opengis.net/ows/1.1" @@ -101,6 +99,8 @@ class WPS10Service: request """ DEFAULT_CONTENT_TYPE = "application/xml; charset=utf-8" + RETRY_TIME = 20 # seconds + STATUS_POLL_RETRIES = 3 # re-try attempts STATUS = { "{http://www.opengis.net/wps/1.0.0}ProcessAccepted": "ACCEPTED", @@ -244,9 +244,32 @@ def submit_async(self, request, content_type=None): def poll_status(self, status_url): """Poll status of an asynchronous WPS job.""" self.logger.debug("Polling asynchronous job status.") - return self._retrieve( - Request(status_url, None, self.headers), self.parse_status - ) + + for index in range(self.STATUS_POLL_RETRIES + 1): + + if index == 0: + self.logger.debug("Polling asynchronous job status.") + else: + self.logger.debug("Polling asynchronous job status. Retry attempt #%s.", index) + + try: + return self._retrieve( + Request(status_url, None, self.headers), self.parse_status + ) + except Exception as error: + if index < self.STATUS_POLL_RETRIES: + self.logger.error( + "Status poll failed. Retrying in %s seconds. %s: %s", + self.RETRY_TIME, error.__class__.__name__, error + ) + else: + self.logger.error( + "Status poll failed. No more retries. %s: %s", + error.__class__.__name__, error + ) + raise + + sleep(self.RETRY_TIME) @classmethod def parse_status(cls, response): From 71b3973bf2490d695f21e5771707b86070d3f24d Mon Sep 17 00:00:00 2001 From: Ashley Smith Date: Thu, 20 Feb 2025 11:38:29 +0000 Subject: [PATCH 5/6] Style autofixes by pre-commit --- src/viresclient/_client_aeolus.py | 6 +++-- src/viresclient/_client_swarm.py | 6 +++-- src/viresclient/_wps/wps.py | 37 ++++++++++++++++++------------- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/src/viresclient/_client_aeolus.py b/src/viresclient/_client_aeolus.py index c20568d..857a1e4 100644 --- a/src/viresclient/_client_aeolus.py +++ b/src/viresclient/_client_aeolus.py @@ -3,7 +3,7 @@ import pandas as pd -from ._client import ClientRequest, WPSInputs, DEFAULT_LOGGING_LEVEL +from ._client import DEFAULT_LOGGING_LEVEL, ClientRequest, WPSInputs from ._data import CONFIG_AEOLUS from ._data_handling import ReturnedDataFile @@ -222,7 +222,9 @@ class AeolusRequest(ClientRequest): """ - def __init__(self, url=None, token=None, config=None, logging_level=DEFAULT_LOGGING_LEVEL): + def __init__( + self, url=None, token=None, config=None, logging_level=DEFAULT_LOGGING_LEVEL + ): super().__init__(url, token, config, logging_level, server_type="Aeolus") # self._available = self._set_available_data() self._request_inputs = AeolusWPSInputs() diff --git a/src/viresclient/_client_swarm.py b/src/viresclient/_client_swarm.py index e5895d8..480e4d6 100644 --- a/src/viresclient/_client_swarm.py +++ b/src/viresclient/_client_swarm.py @@ -12,7 +12,7 @@ from pandas import read_csv from tqdm import tqdm -from ._client import TEMPLATE_FILES, ClientRequest, WPSInputs, DEFAULT_LOGGING_LEVEL +from ._client import DEFAULT_LOGGING_LEVEL, TEMPLATE_FILES, ClientRequest, WPSInputs from ._data import CONFIG_SWARM from ._data_handling import ReturnedDataFile from ._wps.environment import JINJA2_ENVIRONMENT @@ -1344,7 +1344,9 @@ class SwarmRequest(ClientRequest): "SwarmCI", ] - def __init__(self, url=None, token=None, config=None, logging_level=DEFAULT_LOGGING_LEVEL): + def __init__( + self, url=None, token=None, config=None, logging_level=DEFAULT_LOGGING_LEVEL + ): super().__init__(url, token, config, logging_level, server_type="Swarm") self._available = self._get_available_data() diff --git a/src/viresclient/_wps/wps.py b/src/viresclient/_wps/wps.py index 26fe7f1..6cf7464 100644 --- a/src/viresclient/_wps/wps.py +++ b/src/viresclient/_wps/wps.py @@ -27,13 +27,14 @@ # THE SOFTWARE. # ------------------------------------------------------------------------------- -from urllib.error import HTTPError -from urllib.request import Request, urlopen -from urllib.parse import urljoin from contextlib import closing from logging import LoggerAdapter, getLogger from time import sleep +from urllib.error import HTTPError +from urllib.parse import urljoin +from urllib.request import Request, urlopen from xml.etree import ElementTree + from .time_util import Timer NS_OWS11 = "http://www.opengis.net/ows/1.1" @@ -98,9 +99,10 @@ class WPS10Service: headers - optional dictionary of the HTTP headers sent with each request """ + DEFAULT_CONTENT_TYPE = "application/xml; charset=utf-8" - RETRY_TIME = 20 # seconds - STATUS_POLL_RETRIES = 3 # re-try attempts + RETRY_TIME = 20 # seconds + STATUS_POLL_RETRIES = 3 # re-try attempts STATUS = { "{http://www.opengis.net/wps/1.0.0}ProcessAccepted": "ACCEPTED", @@ -146,12 +148,12 @@ def retrieve_async( """ timer = Timer() status, percentCompleted, status_url, execute_response = self.submit_async( - request, content_type=content_type, + request, + content_type=content_type, ) wpsstatus = WPSStatus() wpsstatus.update( - status, percentCompleted, urljoin(self.url, status_url), - execute_response + status, percentCompleted, urljoin(self.url, status_url), execute_response ) def log_wpsstatus(wpsstatus): @@ -176,9 +178,7 @@ def log_wpsstatus_percentCompleted(wpsstatus): last_status = wpsstatus.status last_percentCompleted = wpsstatus.percentCompleted - wpsstatus.update( - *self.poll_status(urljoin(self.url, wpsstatus.url)) - ) + wpsstatus.update(*self.poll_status(urljoin(self.url, wpsstatus.url))) if wpsstatus.status != last_status: log_wpsstatus(wpsstatus) @@ -222,8 +222,8 @@ def parse_output_reference(xml, identifier): "./{http://www.opengis.net/wps/1.0.0}Reference" ) return ( - elm_reference.attrib.get("{http://www.w3.org/1999/xlink}href") or - elm_reference.attrib["href"] + elm_reference.attrib.get("{http://www.w3.org/1999/xlink}href") + or elm_reference.attrib["href"] ) def submit_async(self, request, content_type=None): @@ -250,7 +250,9 @@ def poll_status(self, status_url): if index == 0: self.logger.debug("Polling asynchronous job status.") else: - self.logger.debug("Polling asynchronous job status. Retry attempt #%s.", index) + self.logger.debug( + "Polling asynchronous job status. Retry attempt #%s.", index + ) try: return self._retrieve( @@ -260,12 +262,15 @@ def poll_status(self, status_url): if index < self.STATUS_POLL_RETRIES: self.logger.error( "Status poll failed. Retrying in %s seconds. %s: %s", - self.RETRY_TIME, error.__class__.__name__, error + self.RETRY_TIME, + error.__class__.__name__, + error, ) else: self.logger.error( "Status poll failed. No more retries. %s: %s", - error.__class__.__name__, error + error.__class__.__name__, + error, ) raise From b3073109013bea6f4939e9f2fa812066ba5e9490 Mon Sep 17 00:00:00 2001 From: Ashley Smith Date: Thu, 20 Feb 2025 12:00:35 +0000 Subject: [PATCH 6/6] Version bump and release notes for v0.12.2 --- docs/release_notes.rst | 8 ++++++++ src/viresclient/__init__.py | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/docs/release_notes.rst b/docs/release_notes.rst index 06a526e..5c2b460 100644 --- a/docs/release_notes.rst +++ b/docs/release_notes.rst @@ -4,6 +4,14 @@ Release notes Change log ---------- +Changes from 0.12.1 to 0.12.2 +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +- **Internal WPS fixes which may be required to access the server in the future** +- Improved robustness during asynchronous requests (the client now repeats the failed job status polling 3 times with 20 seconds interval) + +See `PR#121 `_ for details + Changes from 0.12.0 to 0.12.1 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/src/viresclient/__init__.py b/src/viresclient/__init__.py index 44d9efc..9db756d 100644 --- a/src/viresclient/__init__.py +++ b/src/viresclient/__init__.py @@ -35,4 +35,4 @@ from ._config import ClientConfig, set_token from ._data_handling import ReturnedData, ReturnedDataFile -__version__ = "0.12.1" +__version__ = "0.12.2"