diff --git a/docs/release_notes.rst b/docs/release_notes.rst index 06a526e..5c2b460 100644 --- a/docs/release_notes.rst +++ b/docs/release_notes.rst @@ -4,6 +4,14 @@ Release notes Change log ---------- +Changes from 0.12.1 to 0.12.2 +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +- **Internal WPS fixes which may be required to access the server in the future** +- Improved robustness during asynchronous requests (the client now repeats the failed job status polling 3 times with 20 seconds interval) + +See `PR#121 `_ for details + Changes from 0.12.0 to 0.12.1 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/src/viresclient/__init__.py b/src/viresclient/__init__.py index 44d9efc..9db756d 100644 --- a/src/viresclient/__init__.py +++ b/src/viresclient/__init__.py @@ -35,4 +35,4 @@ from ._config import ClientConfig, set_token from ._data_handling import ReturnedData, ReturnedDataFile -__version__ = "0.12.1" +__version__ = "0.12.2" diff --git a/src/viresclient/_client.py b/src/viresclient/_client.py index c64e83f..4f5946d 100644 --- a/src/viresclient/_client.py +++ b/src/viresclient/_client.py @@ -70,6 +70,8 @@ "NO_LOGGING": CRITICAL + 1, } +DEFAULT_LOGGING_LEVEL = "ERROR" + # File type to WPS output name RESPONSE_TYPES = { "csv": "text/csv", @@ -258,7 +260,7 @@ def __init__( url=None, token=None, config=None, - logging_level="NO_LOGGING", + logging_level=DEFAULT_LOGGING_LEVEL, server_type=None, ): self._server_type = server_type diff --git a/src/viresclient/_client_aeolus.py b/src/viresclient/_client_aeolus.py index edddfcd..857a1e4 100644 --- a/src/viresclient/_client_aeolus.py +++ b/src/viresclient/_client_aeolus.py @@ -3,7 +3,7 @@ import pandas as pd -from ._client import ClientRequest, WPSInputs +from ._client import DEFAULT_LOGGING_LEVEL, ClientRequest, WPSInputs from ._data import CONFIG_AEOLUS from ._data_handling import ReturnedDataFile @@ -222,7 +222,9 @@ class AeolusRequest(ClientRequest): """ - def __init__(self, url=None, token=None, config=None, logging_level="NO_LOGGING"): + def __init__( + self, url=None, token=None, config=None, logging_level=DEFAULT_LOGGING_LEVEL + ): super().__init__(url, token, config, logging_level, server_type="Aeolus") # self._available = self._set_available_data() self._request_inputs = AeolusWPSInputs() diff --git a/src/viresclient/_client_swarm.py b/src/viresclient/_client_swarm.py index 8fd16ba..480e4d6 100644 --- a/src/viresclient/_client_swarm.py +++ b/src/viresclient/_client_swarm.py @@ -12,7 +12,7 @@ from pandas import read_csv from tqdm import tqdm -from ._client import TEMPLATE_FILES, ClientRequest, WPSInputs +from ._client import DEFAULT_LOGGING_LEVEL, TEMPLATE_FILES, ClientRequest, WPSInputs from ._data import CONFIG_SWARM from ._data_handling import ReturnedDataFile from ._wps.environment import JINJA2_ENVIRONMENT @@ -1344,7 +1344,9 @@ class SwarmRequest(ClientRequest): "SwarmCI", ] - def __init__(self, url=None, token=None, config=None, logging_level="NO_LOGGING"): + def __init__( + self, url=None, token=None, config=None, logging_level=DEFAULT_LOGGING_LEVEL + ): super().__init__(url, token, config, logging_level, server_type="Swarm") self._available = self._get_available_data() diff --git a/src/viresclient/_wps/wps.py b/src/viresclient/_wps/wps.py index fd40ca1..6cf7464 100644 --- a/src/viresclient/_wps/wps.py +++ b/src/viresclient/_wps/wps.py @@ -27,16 +27,12 @@ # THE SOFTWARE. # ------------------------------------------------------------------------------- -try: - from urllib.error import HTTPError - from urllib.request import Request, urlopen -except ImportError: - # Python 2 backward compatibility - from urllib2 import urlopen, Request, HTTPError - from contextlib import closing from logging import LoggerAdapter, getLogger from time import sleep +from urllib.error import HTTPError +from urllib.parse import urljoin +from urllib.request import Request, urlopen from xml.etree import ElementTree from .time_util import Timer @@ -104,6 +100,10 @@ class WPS10Service: request """ + DEFAULT_CONTENT_TYPE = "application/xml; charset=utf-8" + RETRY_TIME = 20 # seconds + STATUS_POLL_RETRIES = 3 # re-try attempts + STATUS = { "{http://www.opengis.net/wps/1.0.0}ProcessAccepted": "ACCEPTED", "{http://www.opengis.net/wps/1.0.0}ProcessFailed": "FAILED", @@ -120,12 +120,17 @@ def __init__(self, url, headers=None, logger=None): self.headers = headers or {} self.logger = self._LoggerAdapter(logger or getLogger(__name__), {}) - def retrieve(self, request, handler=None): + def retrieve(self, request, handler=None, content_type=None): """Send a synchronous POST WPS request to a server and retrieve the output. """ + headers = { + **self.headers, + "Content-Type": content_type or self.DEFAULT_CONTENT_TYPE, + } + return self._retrieve( - Request(self.url, request, self.headers), handler, self.error_handler + Request(self.url, request, headers), handler, self.error_handler ) def retrieve_async( @@ -136,16 +141,20 @@ def retrieve_async( cleanup_handler=None, polling_interval=1, output_name="output", + content_type=None, ): """Send an asynchronous POST WPS request to a server and retrieve the output. """ timer = Timer() status, percentCompleted, status_url, execute_response = self.submit_async( - request + request, + content_type=content_type, ) wpsstatus = WPSStatus() - wpsstatus.update(status, percentCompleted, status_url, execute_response) + wpsstatus.update( + status, percentCompleted, urljoin(self.url, status_url), execute_response + ) def log_wpsstatus(wpsstatus): self.logger.info( @@ -169,7 +178,7 @@ def log_wpsstatus_percentCompleted(wpsstatus): last_status = wpsstatus.status last_percentCompleted = wpsstatus.percentCompleted - wpsstatus.update(*self.poll_status(wpsstatus.url)) + wpsstatus.update(*self.poll_status(urljoin(self.url, wpsstatus.url))) if wpsstatus.status != last_status: log_wpsstatus(wpsstatus) @@ -197,7 +206,9 @@ def retrieve_async_output(self, status_url, output_name, handler=None): """Retrieve asynchronous job output reference.""" self.logger.debug("Retrieving asynchronous job output '%s'.", output_name) output_url = self.parse_output_reference(status_url, output_name) - return self._retrieve(Request(output_url, None, self.headers), handler) + return self._retrieve( + Request(urljoin(self.url, output_url), None, self.headers), handler + ) @staticmethod def parse_output_reference(xml, identifier): @@ -210,15 +221,22 @@ def parse_output_reference(xml, identifier): elm_reference = elm.find( "./{http://www.opengis.net/wps/1.0.0}Reference" ) - return elm_reference.attrib["href"] + return ( + elm_reference.attrib.get("{http://www.w3.org/1999/xlink}href") + or elm_reference.attrib["href"] + ) - def submit_async(self, request): + def submit_async(self, request, content_type=None): """Send a POST WPS asynchronous request to a server and retrieve the status URL. """ self.logger.debug("Submitting asynchronous job.") + headers = { + **self.headers, + "Content-Type": content_type or self.DEFAULT_CONTENT_TYPE, + } return self._retrieve( - Request(self.url, request, self.headers), + Request(self.url, request, headers), self.parse_status, self.error_handler, ) @@ -226,9 +244,37 @@ def submit_async(self, request): def poll_status(self, status_url): """Poll status of an asynchronous WPS job.""" self.logger.debug("Polling asynchronous job status.") - return self._retrieve( - Request(status_url, None, self.headers), self.parse_status - ) + + for index in range(self.STATUS_POLL_RETRIES + 1): + + if index == 0: + self.logger.debug("Polling asynchronous job status.") + else: + self.logger.debug( + "Polling asynchronous job status. Retry attempt #%s.", index + ) + + try: + return self._retrieve( + Request(status_url, None, self.headers), self.parse_status + ) + except Exception as error: + if index < self.STATUS_POLL_RETRIES: + self.logger.error( + "Status poll failed. Retrying in %s seconds. %s: %s", + self.RETRY_TIME, + error.__class__.__name__, + error, + ) + else: + self.logger.error( + "Status poll failed. No more retries. %s: %s", + error.__class__.__name__, + error, + ) + raise + + sleep(self.RETRY_TIME) @classmethod def parse_status(cls, response):