Skip to content

Commit

Permalink
CADC-10715 - code review comments - remove CadcDataClient support fro…
Browse files Browse the repository at this point in the history
…m the StorageClientWrapper class.
  • Loading branch information
SharonGoliath committed Nov 16, 2022
1 parent 4627f24 commit 49e3d01
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 215 deletions.
10 changes: 1 addition & 9 deletions caom2utils/caom2utils/caom2blueprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -5271,15 +5271,7 @@ def proc(args, obs_blueprints):
raise RuntimeError(msg)

subject = net.Subject.from_cmd_line_args(args)
if args.resource_id == 'ivo://cadc.nrc.ca/fits2caom2':
# if the resource_id is the default value, using CadcDataClient
client = data_util.StorageClientWrapper(
subject, using_storage_inventory=False)
else:
# using the new Storage Inventory system, since it's the one that
# depends on a resource_id
client = data_util.StorageClientWrapper(
subject, resource_id=args.resource_id)
client = data_util.StorageClientWrapper(subject, resource_id=args.resource_id)
validate_wcs = True
if args.no_validate:
validate_wcs = False
Expand Down
175 changes: 49 additions & 126 deletions caom2utils/caom2utils/data_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@

from astropy.io import fits
from urllib.parse import urlparse
from cadcdata import FileInfo, CadcDataClient, StorageInventoryClient
from cadcdata import FileInfo, StorageInventoryClient
from cadcutils import exceptions


Expand All @@ -92,61 +92,31 @@

class StorageClientWrapper:
"""
Wrap the choice between CadcDataClient and StorageInventoryClient.
Wrap the metrics collection with StorageInventoryClient.
"""

def __init__(
self,
subject,
using_storage_inventory=True,
resource_id='ivo://cadc.nrc.ca/uvic/minoc',
metrics=None,
):
def __init__(self, subject, resource_id='ivo://cadc.nrc.ca/uvic/minoc', metrics=None):
"""
:param subject: net.Subject instance for authentication and
authorization
:param using_storage_inventory: if True will use
StorageInventoryClient for file operations at CADC. If False will
use CadcDataClient.
:param resource_id: str identifies the StorageInventoryClient
endpoint. If using_storage_inventory is set to False, it's
un-necessary.
:param metrics: caom2pipe.manaage_composable.Metrics instance. If set,
will track execution times, by action, from the beginning of
the method invocation to the end of the method invocation,
success or failure. Defaults to None, because fits2caom2 is
a stand-alone application.
:param subject: net.Subject instance for authentication and authorization
:param resource_id: str identifies the StorageInventoryClient endpoint. Defaults to the installation closest to
most of the current invocations.
:param metrics: caom2pipe.manaage_composable.Metrics instance. If set, will track execution times, by action,
from the beginning of the method invocation to the end of the method invocation, success or failure.
Defaults to None, because fits2caom2 is a stand-alone application.
"""
if using_storage_inventory:
self._cadc_client = StorageInventoryClient(
subject=subject, resource_id=resource_id
)
else:
self._cadc_client = CadcDataClient(subject=subject)
self._use_si = using_storage_inventory
self._cadc_client = StorageInventoryClient(subject=subject, resource_id=resource_id)
self._metrics = metrics
self._logger = logging.getLogger(self.__class__.__name__)

def _add_fail_metric(self, action, name):
"""Single location for the check for a self._metrics member in the
failure case."""
"""Single location for the check for a self._metrics member in the failure case."""
if self._metrics is not None:
client_name = 'si' if self._use_si else 'data'
self._metrics.observe_failure(action, client_name, name)
self._metrics.observe_failure(action, 'si', name)

def _add_metric(self, action, name, start, value):
"""Single location for the check for a self._metrics member in the
success case."""
"""Single location for the check for a self._metrics member in the success case."""
if self._metrics is not None:
client_name = 'si' if self._use_si else 'data'
self._metrics.observe(
start,
StorageClientWrapper._current(),
value,
action,
client_name,
name,
)
self._metrics.observe(start, StorageClientWrapper._current(), value, action, 'si', name)

def get(self, working_directory, uri):
"""
Expand All @@ -156,15 +126,12 @@ def get(self, working_directory, uri):
:param uri: str this is an Artifact URI, representing the file to
be retrieved.
"""
self._logger.debug(f'Being get for {uri} in {working_directory}')
self._logger.debug(f'Begin get for {uri} in {working_directory}')
start = StorageClientWrapper._current()
try:
archive, f_name = self._decompose(uri)
fqn = path.join(working_directory, f_name)
if self._use_si:
self._cadc_client.cadcget(uri, dest=fqn)
else:
self._cadc_client.get_file(archive, f_name, destination=fqn)
self._cadc_client.cadcget(uri, dest=fqn)
except Exception as e:
self._add_fail_metric('get', uri)
self._logger.debug(traceback.format_exc())
Expand All @@ -186,11 +153,7 @@ def get_head(self, uri):
try:
b = BytesIO()
b.name = uri
if self._use_si:
self._cadc_client.cadcget(uri, b, fhead=True)
else:
archive, f_name = StorageClientWrapper._decompose(uri)
self._cadc_client.get_file(archive, f_name, b, fhead=True)
self._cadc_client.cadcget(uri, b, fhead=True)
fits_header = b.getvalue().decode('ascii')
b.close()
self._add_metric('get_head', uri, start, len(fits_header))
Expand All @@ -207,44 +170,28 @@ def get_head(self, uri):

def info(self, uri):
"""
Retrieve the descriptive metdata associated with a file.
Retrieve the descriptive metadata associated with a file.
:param uri: str that is an Artifact URI, representing the file for
which to retrieve metadata
:return: cadcdata.FileInfo instance, no scheme for md5sum
"""
self._logger.debug(f'Begin info for {uri}')
try:
if self._use_si:
result = self._cadc_client.cadcinfo(uri)
# make the result look like the other possible ways to
# obtain metadata
result.md5sum = result.md5sum.replace('md5:', '')
else:
archive, f_name = StorageClientWrapper._decompose(uri)
temp = self._cadc_client.get_file_info(archive, f_name)
result = FileInfo(
id=uri,
size=temp.get('size'),
file_type=temp.get('type'),
md5sum=temp.get('md5sum').replace('md5:', ''),
encoding=temp.get('encoding'),
)
result = self._cadc_client.cadcinfo(uri)
# make the result look like the other possible ways to
# obtain metadata
result.md5sum = result.md5sum.replace('md5:', '')
except exceptions.NotFoundException:
self._logger.info(f'cadcinfo:: {uri} not found')
result = None
self._logger.debug('End info')
return result

def put(self, working_directory, uri, stream='default'):
def put(self, working_directory, uri):
"""
Store a file at CADC.
:param working_directory: str fully-qualified name of where to find
the file on the local machine
:param uri: str that is an Artifact URI, representing the file to
be stored at CADC.
:param stream: str representing the namespace used by the
CadcDataClient. Not required if using the StorageInventoryClient.
'default' is default name for a lately-created ad archive.
:param working_directory: str fully-qualified name of where to find the file on the local machine
:param uri: str that is an Artifact URI, representing the file to be stored at CADC.
"""
self._logger.debug(f'Begin put for {uri} in {working_directory}')
start = self._current()
Expand All @@ -255,41 +202,22 @@ def put(self, working_directory, uri, stream='default'):
try:
local_meta = get_local_file_info(fqn)
encoding = get_file_encoding(fqn)
if self._use_si:
replace = True
cadc_meta = self.info(uri)
if cadc_meta is None:
replace = False
self._logger.debug(
f'uri {uri} src {fqn} replace {replace} file_type '
f'{local_meta.file_type} encoding {encoding} md5_checksum '
f'{local_meta.md5sum}'
)
self._cadc_client.cadcput(
uri,
src=fqn,
replace=replace,
file_type=local_meta.file_type,
file_encoding=encoding,
md5_checksum=local_meta.md5sum,
)
else:
archive, f_name = self._decompose(uri)
# libmagic does a worse job with guessing file types
# than ad for .fits.gz => it will say 'binary'
self._logger.debug(
f'archive {archive} f_name {f_name} archive_stream '
f'{stream} mime_type {local_meta.file_type} '
f'mime_encoding {encoding} md5_check True '
)
self._cadc_client.put_file(
archive,
f_name,
archive_stream=stream,
mime_type=local_meta.file_type,
mime_encoding=encoding,
md5_check=True,
)
replace = True
cadc_meta = self.info(uri)
if cadc_meta is None:
replace = False
self._logger.debug(
f'uri {uri} src {fqn} replace {replace} file_type {local_meta.file_type} encoding {encoding} '
f'md5_checksum {local_meta.md5sum}'
)
self._cadc_client.cadcput(
uri,
src=fqn,
replace=replace,
file_type=local_meta.file_type,
file_encoding=encoding,
md5_checksum=local_meta.md5sum,
)
self._logger.info(f'Stored {fqn} at CADC.')
except Exception as e:
self._add_fail_metric('put', uri)
Expand All @@ -311,19 +239,14 @@ def remove(self, uri):
"""
self._logger.debug(f'Begin remove for {uri}')
start = StorageClientWrapper._current()
if self._use_si:
try:
self._cadc_client.cadcremove(uri)
except Exception as e:
self._add_fail_metric('remove', uri)
self._logger.debug(traceback.format_exc())
self._logger.error(e)
raise exceptions.UnexpectedException(
f'Did not remove {uri} because {e}'
)
else:
raise NotImplementedError(
'No remove functionality for CadcDataClient'
try:
self._cadc_client.cadcremove(uri)
except Exception as e:
self._add_fail_metric('remove', uri)
self._logger.debug(traceback.format_exc())
self._logger.error(e)
raise exceptions.UnexpectedException(
f'Did not remove {uri} because {e}'
)
self._add_metric('remove', uri, start, value=None)
self._logger.debug('End remove')
Expand Down
82 changes: 2 additions & 80 deletions caom2utils/caom2utils/tests/test_data_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,77 +105,6 @@ def test_get_file_type():
), f'wrong type {data_util.get_file_type(key)} for {key}'


@patch('caom2utils.data_util.CadcDataClient', autospec=True)
def test_cadc_data_client(cadc_client_mock):
test_subject = Mock(autospec=True)
test_uri = 'ad:TEST/test_file.fits'
test_working_directory = Path(test_fits2caom2.TESTDATA_DIR)
test_fqn = test_working_directory / 'test_file.fits'
if test_fqn.exists():
test_fqn.unlink()

def info_mock(ignore1, ignore2):
return {
'type': 'application/fits',
'md5sum': 'abc',
'size': 42,
}

def get_mock(ignore1, ignore2, destination, **kwargs):
fhead = kwargs.get('fhead')
if fhead:
destination.write(TEST_HEADERS)
else:
test_fqn.write_text('CadcDataClient')

cadc_client_mock.return_value.get_file_info.side_effect = info_mock
cadc_client_mock.return_value.get_file.side_effect = get_mock
cadc_client_mock.return_value.put_file = Mock(autospec=True)

test_wrapper = data_util.StorageClientWrapper(
subject=test_subject,
using_storage_inventory=False,
)
assert test_wrapper is not None, 'ctor failure'

# info
test_result = test_wrapper.info(test_uri)
_check_info_result(test_result)

# get_head
test_result = test_wrapper.get_head(test_uri)
_check_header_result(test_result)

# get
test_wrapper.get(test_working_directory, test_uri)
_check_get_result(test_fqn)

# put
test_wrapper.put(test_working_directory, test_uri)
_check_put_result(cadc_client_mock.return_value.put_file)

# delete
with pytest.raises(NotImplementedError):
test_wrapper.remove(test_uri)

cadc_client_mock.return_value.get_file_info.side_effect = (
exceptions.UnexpectedException('get_file_info')
)
cadc_client_mock.return_value.get_file.side_effect = (
exceptions.UnexpectedException('get_file')
)
cadc_client_mock.return_value.put_file.side_effect = (
exceptions.UnexpectedException('put_file')
)
_fail_mock(test_wrapper, test_uri, test_working_directory)

cadc_client_mock.return_value.get_file_info.side_effect = (
exceptions.NotFoundException('cadcinfo')
)
test_result = test_wrapper.info(test_uri)
assert test_result is None, 'expected when not found'


@patch('caom2utils.data_util.StorageInventoryClient')
def test_storage_inventory_client(cadc_client_mock):
test_subject = Mock(autospec=True)
Expand All @@ -201,10 +130,7 @@ def get_si_mock(ignore2, dest, **kwargs):
cadc_client_mock.return_value.cadcput = Mock(autospec=True)
cadc_client_mock.return_value.cadcremove = Mock(autospec=True)

test_wrapper = data_util.StorageClientWrapper(
subject=test_subject,
using_storage_inventory=True,
)
test_wrapper = data_util.StorageClientWrapper(subject=test_subject)
assert test_wrapper is not None, 'ctor failure'

# info
Expand Down Expand Up @@ -264,11 +190,7 @@ def _get(working_directory, uri):
client_mock.return_value.cadcget.side_effect = _get
client_mock.return_value.cadcremove.side_effect = Mock()

test_wrapper = data_util.StorageClientWrapper(
subject=test_subject,
using_storage_inventory=True,
metrics=test_metrics,
)
test_wrapper = data_util.StorageClientWrapper(subject=test_subject, metrics=test_metrics)
assert test_wrapper is not None, 'ctor failure'

# test metrics failure
Expand Down

0 comments on commit 49e3d01

Please sign in to comment.