Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VirES WPS fixes #121

Merged
merged 6 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ Release notes
Change log
----------

Changes from 0.12.1 to 0.12.2
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

- **Internal WPS fixes which may be required to access the server in the future**
- Improved robustness during asynchronous requests (the client now repeats the failed job status polling 3 times with 20 seconds interval)

See `PR#121 <https://github.com/ESA-VirES/VirES-Python-Client/pull/121>`_ for details

Changes from 0.12.0 to 0.12.1
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
2 changes: 1 addition & 1 deletion src/viresclient/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@
from ._config import ClientConfig, set_token
from ._data_handling import ReturnedData, ReturnedDataFile

__version__ = "0.12.1"
__version__ = "0.12.2"
4 changes: 3 additions & 1 deletion src/viresclient/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@
"NO_LOGGING": CRITICAL + 1,
}

DEFAULT_LOGGING_LEVEL = "ERROR"

# File type to WPS output name
RESPONSE_TYPES = {
"csv": "text/csv",
Expand Down Expand Up @@ -258,7 +260,7 @@ def __init__(
url=None,
token=None,
config=None,
logging_level="NO_LOGGING",
logging_level=DEFAULT_LOGGING_LEVEL,
server_type=None,
):
self._server_type = server_type
Expand Down
6 changes: 4 additions & 2 deletions src/viresclient/_client_aeolus.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pandas as pd

from ._client import ClientRequest, WPSInputs
from ._client import DEFAULT_LOGGING_LEVEL, ClientRequest, WPSInputs
from ._data import CONFIG_AEOLUS
from ._data_handling import ReturnedDataFile

Expand Down Expand Up @@ -222,7 +222,9 @@ class AeolusRequest(ClientRequest):

"""

def __init__(self, url=None, token=None, config=None, logging_level="NO_LOGGING"):
def __init__(
self, url=None, token=None, config=None, logging_level=DEFAULT_LOGGING_LEVEL
):
super().__init__(url, token, config, logging_level, server_type="Aeolus")
# self._available = self._set_available_data()
self._request_inputs = AeolusWPSInputs()
Expand Down
6 changes: 4 additions & 2 deletions src/viresclient/_client_swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pandas import read_csv
from tqdm import tqdm

from ._client import TEMPLATE_FILES, ClientRequest, WPSInputs
from ._client import DEFAULT_LOGGING_LEVEL, TEMPLATE_FILES, ClientRequest, WPSInputs
from ._data import CONFIG_SWARM
from ._data_handling import ReturnedDataFile
from ._wps.environment import JINJA2_ENVIRONMENT
Expand Down Expand Up @@ -1344,7 +1344,9 @@ class SwarmRequest(ClientRequest):
"SwarmCI",
]

def __init__(self, url=None, token=None, config=None, logging_level="NO_LOGGING"):
def __init__(
self, url=None, token=None, config=None, logging_level=DEFAULT_LOGGING_LEVEL
):
super().__init__(url, token, config, logging_level, server_type="Swarm")

self._available = self._get_available_data()
Expand Down
84 changes: 65 additions & 19 deletions src/viresclient/_wps/wps.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,12 @@
# THE SOFTWARE.
# -------------------------------------------------------------------------------

try:
from urllib.error import HTTPError
from urllib.request import Request, urlopen
except ImportError:
# Python 2 backward compatibility
from urllib2 import urlopen, Request, HTTPError

from contextlib import closing
from logging import LoggerAdapter, getLogger
from time import sleep
from urllib.error import HTTPError
from urllib.parse import urljoin
from urllib.request import Request, urlopen
from xml.etree import ElementTree

from .time_util import Timer
Expand Down Expand Up @@ -104,6 +100,10 @@ class WPS10Service:
request
"""

DEFAULT_CONTENT_TYPE = "application/xml; charset=utf-8"
RETRY_TIME = 20 # seconds
STATUS_POLL_RETRIES = 3 # re-try attempts

STATUS = {
"{http://www.opengis.net/wps/1.0.0}ProcessAccepted": "ACCEPTED",
"{http://www.opengis.net/wps/1.0.0}ProcessFailed": "FAILED",
Expand All @@ -120,12 +120,17 @@ def __init__(self, url, headers=None, logger=None):
self.headers = headers or {}
self.logger = self._LoggerAdapter(logger or getLogger(__name__), {})

def retrieve(self, request, handler=None):
def retrieve(self, request, handler=None, content_type=None):
"""Send a synchronous POST WPS request to a server and retrieve
the output.
"""
headers = {
**self.headers,
"Content-Type": content_type or self.DEFAULT_CONTENT_TYPE,
}

return self._retrieve(
Request(self.url, request, self.headers), handler, self.error_handler
Request(self.url, request, headers), handler, self.error_handler
)

def retrieve_async(
Expand All @@ -136,16 +141,20 @@ def retrieve_async(
cleanup_handler=None,
polling_interval=1,
output_name="output",
content_type=None,
):
"""Send an asynchronous POST WPS request to a server and retrieve
the output.
"""
timer = Timer()
status, percentCompleted, status_url, execute_response = self.submit_async(
request
request,
content_type=content_type,
)
wpsstatus = WPSStatus()
wpsstatus.update(status, percentCompleted, status_url, execute_response)
wpsstatus.update(
status, percentCompleted, urljoin(self.url, status_url), execute_response
)

def log_wpsstatus(wpsstatus):
self.logger.info(
Expand All @@ -169,7 +178,7 @@ def log_wpsstatus_percentCompleted(wpsstatus):

last_status = wpsstatus.status
last_percentCompleted = wpsstatus.percentCompleted
wpsstatus.update(*self.poll_status(wpsstatus.url))
wpsstatus.update(*self.poll_status(urljoin(self.url, wpsstatus.url)))

if wpsstatus.status != last_status:
log_wpsstatus(wpsstatus)
Expand Down Expand Up @@ -197,7 +206,9 @@ def retrieve_async_output(self, status_url, output_name, handler=None):
"""Retrieve asynchronous job output reference."""
self.logger.debug("Retrieving asynchronous job output '%s'.", output_name)
output_url = self.parse_output_reference(status_url, output_name)
return self._retrieve(Request(output_url, None, self.headers), handler)
return self._retrieve(
Request(urljoin(self.url, output_url), None, self.headers), handler
)

@staticmethod
def parse_output_reference(xml, identifier):
Expand All @@ -210,25 +221,60 @@ def parse_output_reference(xml, identifier):
elm_reference = elm.find(
"./{http://www.opengis.net/wps/1.0.0}Reference"
)
return elm_reference.attrib["href"]
return (
elm_reference.attrib.get("{http://www.w3.org/1999/xlink}href")
or elm_reference.attrib["href"]
)

def submit_async(self, request):
def submit_async(self, request, content_type=None):
"""Send a POST WPS asynchronous request to a server and retrieve
the status URL.
"""
self.logger.debug("Submitting asynchronous job.")
headers = {
**self.headers,
"Content-Type": content_type or self.DEFAULT_CONTENT_TYPE,
}
return self._retrieve(
Request(self.url, request, self.headers),
Request(self.url, request, headers),
self.parse_status,
self.error_handler,
)

def poll_status(self, status_url):
"""Poll status of an asynchronous WPS job."""
self.logger.debug("Polling asynchronous job status.")
return self._retrieve(
Request(status_url, None, self.headers), self.parse_status
)

for index in range(self.STATUS_POLL_RETRIES + 1):

if index == 0:
self.logger.debug("Polling asynchronous job status.")
else:
self.logger.debug(
"Polling asynchronous job status. Retry attempt #%s.", index
)

try:
return self._retrieve(
Request(status_url, None, self.headers), self.parse_status
)
except Exception as error:
if index < self.STATUS_POLL_RETRIES:
self.logger.error(
"Status poll failed. Retrying in %s seconds. %s: %s",
self.RETRY_TIME,
error.__class__.__name__,
error,
)
else:
self.logger.error(
"Status poll failed. No more retries. %s: %s",
error.__class__.__name__,
error,
)
raise

sleep(self.RETRY_TIME)

@classmethod
def parse_status(cls, response):
Expand Down
Loading