Skip to content

Commit

Permalink
VirES WPS fixes (#121)
Browse files Browse the repository at this point in the history
* Sending WPS POST requests with the proper content type header.

* Handle gracefully relative response URLs.

* Handle gracefully xlink:href references.

* Implementing re-trials for failed WPS asynchronous job status requests.

- 3 re-tries after 20 seconds
- changing the default logging level to ERROR

* Style autofixes by pre-commit

* Version bump and release notes for v0.12.2

---------

Co-authored-by: Ashley Smith <[email protected]>
  • Loading branch information
pacesm and smithara authored Feb 20, 2025
1 parent 3570b7e commit af39718
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 25 deletions.
8 changes: 8 additions & 0 deletions docs/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ Release notes
Change log
----------

Changes from 0.12.1 to 0.12.2
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

- **Internal WPS fixes which may be required to access the server in the future**
- Improved robustness during asynchronous requests (the client now repeats the failed job status polling 3 times with 20 seconds interval)

See `PR#121 <https://github.com/ESA-VirES/VirES-Python-Client/pull/121>`_ for details

Changes from 0.12.0 to 0.12.1
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
2 changes: 1 addition & 1 deletion src/viresclient/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@
from ._config import ClientConfig, set_token
from ._data_handling import ReturnedData, ReturnedDataFile

__version__ = "0.12.1"
__version__ = "0.12.2"
4 changes: 3 additions & 1 deletion src/viresclient/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
"NO_LOGGING": CRITICAL + 1,
}

DEFAULT_LOGGING_LEVEL = "ERROR"

# File type to WPS output name
RESPONSE_TYPES = {
"csv": "text/csv",
Expand Down Expand Up @@ -258,7 +260,7 @@ def __init__(
url=None,
token=None,
config=None,
logging_level="NO_LOGGING",
logging_level=DEFAULT_LOGGING_LEVEL,
server_type=None,
):
self._server_type = server_type
Expand Down
6 changes: 4 additions & 2 deletions src/viresclient/_client_aeolus.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pandas as pd

from ._client import ClientRequest, WPSInputs
from ._client import DEFAULT_LOGGING_LEVEL, ClientRequest, WPSInputs
from ._data import CONFIG_AEOLUS
from ._data_handling import ReturnedDataFile

Expand Down Expand Up @@ -222,7 +222,9 @@ class AeolusRequest(ClientRequest):
"""

def __init__(self, url=None, token=None, config=None, logging_level="NO_LOGGING"):
def __init__(
self, url=None, token=None, config=None, logging_level=DEFAULT_LOGGING_LEVEL
):
super().__init__(url, token, config, logging_level, server_type="Aeolus")
# self._available = self._set_available_data()
self._request_inputs = AeolusWPSInputs()
Expand Down
6 changes: 4 additions & 2 deletions src/viresclient/_client_swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pandas import read_csv
from tqdm import tqdm

from ._client import TEMPLATE_FILES, ClientRequest, WPSInputs
from ._client import DEFAULT_LOGGING_LEVEL, TEMPLATE_FILES, ClientRequest, WPSInputs
from ._data import CONFIG_SWARM
from ._data_handling import ReturnedDataFile
from ._wps.environment import JINJA2_ENVIRONMENT
Expand Down Expand Up @@ -1344,7 +1344,9 @@ class SwarmRequest(ClientRequest):
"SwarmCI",
]

def __init__(self, url=None, token=None, config=None, logging_level="NO_LOGGING"):
def __init__(
self, url=None, token=None, config=None, logging_level=DEFAULT_LOGGING_LEVEL
):
super().__init__(url, token, config, logging_level, server_type="Swarm")

self._available = self._get_available_data()
Expand Down
84 changes: 65 additions & 19 deletions src/viresclient/_wps/wps.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,12 @@
# THE SOFTWARE.
# -------------------------------------------------------------------------------

try:
from urllib.error import HTTPError
from urllib.request import Request, urlopen
except ImportError:
# Python 2 backward compatibility
from urllib2 import urlopen, Request, HTTPError

from contextlib import closing
from logging import LoggerAdapter, getLogger
from time import sleep
from urllib.error import HTTPError
from urllib.parse import urljoin
from urllib.request import Request, urlopen
from xml.etree import ElementTree

from .time_util import Timer
Expand Down Expand Up @@ -104,6 +100,10 @@ class WPS10Service:
request
"""

DEFAULT_CONTENT_TYPE = "application/xml; charset=utf-8"
RETRY_TIME = 20 # seconds
STATUS_POLL_RETRIES = 3 # re-try attempts

STATUS = {
"{http://www.opengis.net/wps/1.0.0}ProcessAccepted": "ACCEPTED",
"{http://www.opengis.net/wps/1.0.0}ProcessFailed": "FAILED",
Expand All @@ -120,12 +120,17 @@ def __init__(self, url, headers=None, logger=None):
self.headers = headers or {}
self.logger = self._LoggerAdapter(logger or getLogger(__name__), {})

def retrieve(self, request, handler=None):
def retrieve(self, request, handler=None, content_type=None):
"""Send a synchronous POST WPS request to a server and retrieve
the output.
"""
headers = {
**self.headers,
"Content-Type": content_type or self.DEFAULT_CONTENT_TYPE,
}

return self._retrieve(
Request(self.url, request, self.headers), handler, self.error_handler
Request(self.url, request, headers), handler, self.error_handler
)

def retrieve_async(
Expand All @@ -136,16 +141,20 @@ def retrieve_async(
cleanup_handler=None,
polling_interval=1,
output_name="output",
content_type=None,
):
"""Send an asynchronous POST WPS request to a server and retrieve
the output.
"""
timer = Timer()
status, percentCompleted, status_url, execute_response = self.submit_async(
request
request,
content_type=content_type,
)
wpsstatus = WPSStatus()
wpsstatus.update(status, percentCompleted, status_url, execute_response)
wpsstatus.update(
status, percentCompleted, urljoin(self.url, status_url), execute_response
)

def log_wpsstatus(wpsstatus):
self.logger.info(
Expand All @@ -169,7 +178,7 @@ def log_wpsstatus_percentCompleted(wpsstatus):

last_status = wpsstatus.status
last_percentCompleted = wpsstatus.percentCompleted
wpsstatus.update(*self.poll_status(wpsstatus.url))
wpsstatus.update(*self.poll_status(urljoin(self.url, wpsstatus.url)))

if wpsstatus.status != last_status:
log_wpsstatus(wpsstatus)
Expand Down Expand Up @@ -197,7 +206,9 @@ def retrieve_async_output(self, status_url, output_name, handler=None):
"""Retrieve asynchronous job output reference."""
self.logger.debug("Retrieving asynchronous job output '%s'.", output_name)
output_url = self.parse_output_reference(status_url, output_name)
return self._retrieve(Request(output_url, None, self.headers), handler)
return self._retrieve(
Request(urljoin(self.url, output_url), None, self.headers), handler
)

@staticmethod
def parse_output_reference(xml, identifier):
Expand All @@ -210,25 +221,60 @@ def parse_output_reference(xml, identifier):
elm_reference = elm.find(
"./{http://www.opengis.net/wps/1.0.0}Reference"
)
return elm_reference.attrib["href"]
return (
elm_reference.attrib.get("{http://www.w3.org/1999/xlink}href")
or elm_reference.attrib["href"]
)

def submit_async(self, request):
def submit_async(self, request, content_type=None):
"""Send a POST WPS asynchronous request to a server and retrieve
the status URL.
"""
self.logger.debug("Submitting asynchronous job.")
headers = {
**self.headers,
"Content-Type": content_type or self.DEFAULT_CONTENT_TYPE,
}
return self._retrieve(
Request(self.url, request, self.headers),
Request(self.url, request, headers),
self.parse_status,
self.error_handler,
)

def poll_status(self, status_url):
"""Poll status of an asynchronous WPS job."""
self.logger.debug("Polling asynchronous job status.")
return self._retrieve(
Request(status_url, None, self.headers), self.parse_status
)

for index in range(self.STATUS_POLL_RETRIES + 1):

if index == 0:
self.logger.debug("Polling asynchronous job status.")
else:
self.logger.debug(
"Polling asynchronous job status. Retry attempt #%s.", index
)

try:
return self._retrieve(
Request(status_url, None, self.headers), self.parse_status
)
except Exception as error:
if index < self.STATUS_POLL_RETRIES:
self.logger.error(
"Status poll failed. Retrying in %s seconds. %s: %s",
self.RETRY_TIME,
error.__class__.__name__,
error,
)
else:
self.logger.error(
"Status poll failed. No more retries. %s: %s",
error.__class__.__name__,
error,
)
raise

sleep(self.RETRY_TIME)

@classmethod
def parse_status(cls, response):
Expand Down

0 comments on commit af39718

Please sign in to comment.