diff --git a/caom2utils/caom2utils/caom2blueprint.py b/caom2utils/caom2utils/caom2blueprint.py index 2a151f0a..136c726f 100755 --- a/caom2utils/caom2utils/caom2blueprint.py +++ b/caom2utils/caom2utils/caom2blueprint.py @@ -5271,15 +5271,7 @@ def proc(args, obs_blueprints): raise RuntimeError(msg) subject = net.Subject.from_cmd_line_args(args) - if args.resource_id == 'ivo://cadc.nrc.ca/fits2caom2': - # if the resource_id is the default value, using CadcDataClient - client = data_util.StorageClientWrapper( - subject, using_storage_inventory=False) - else: - # using the new Storage Inventory system, since it's the one that - # depends on a resource_id - client = data_util.StorageClientWrapper( - subject, resource_id=args.resource_id) + client = data_util.StorageClientWrapper(subject, resource_id=args.resource_id) validate_wcs = True if args.no_validate: validate_wcs = False diff --git a/caom2utils/caom2utils/data_util.py b/caom2utils/caom2utils/data_util.py index b73a862a..171fb504 100644 --- a/caom2utils/caom2utils/data_util.py +++ b/caom2utils/caom2utils/data_util.py @@ -75,7 +75,7 @@ from astropy.io import fits from urllib.parse import urlparse -from cadcdata import FileInfo, CadcDataClient, StorageInventoryClient +from cadcdata import FileInfo, StorageInventoryClient from cadcutils import exceptions @@ -92,61 +92,31 @@ class StorageClientWrapper: """ - Wrap the choice between CadcDataClient and StorageInventoryClient. + Wrap the metrics collection with StorageInventoryClient. """ - def __init__( - self, - subject, - using_storage_inventory=True, - resource_id='ivo://cadc.nrc.ca/uvic/minoc', - metrics=None, - ): + def __init__(self, subject, resource_id='ivo://cadc.nrc.ca/uvic/minoc', metrics=None): """ - :param subject: net.Subject instance for authentication and - authorization - :param using_storage_inventory: if True will use - StorageInventoryClient for file operations at CADC. If False will - use CadcDataClient. - :param resource_id: str identifies the StorageInventoryClient - endpoint. If using_storage_inventory is set to False, it's - un-necessary. - :param metrics: caom2pipe.manaage_composable.Metrics instance. If set, - will track execution times, by action, from the beginning of - the method invocation to the end of the method invocation, - success or failure. Defaults to None, because fits2caom2 is - a stand-alone application. + :param subject: net.Subject instance for authentication and authorization + :param resource_id: str identifies the StorageInventoryClient endpoint. Defaults to the installation closest to + most of the current invocations. + :param metrics: caom2pipe.manaage_composable.Metrics instance. If set, will track execution times, by action, + from the beginning of the method invocation to the end of the method invocation, success or failure. + Defaults to None, because fits2caom2 is a stand-alone application. """ - if using_storage_inventory: - self._cadc_client = StorageInventoryClient( - subject=subject, resource_id=resource_id - ) - else: - self._cadc_client = CadcDataClient(subject=subject) - self._use_si = using_storage_inventory + self._cadc_client = StorageInventoryClient(subject=subject, resource_id=resource_id) self._metrics = metrics self._logger = logging.getLogger(self.__class__.__name__) def _add_fail_metric(self, action, name): - """Single location for the check for a self._metrics member in the - failure case.""" + """Single location for the check for a self._metrics member in the failure case.""" if self._metrics is not None: - client_name = 'si' if self._use_si else 'data' - self._metrics.observe_failure(action, client_name, name) + self._metrics.observe_failure(action, 'si', name) def _add_metric(self, action, name, start, value): - """Single location for the check for a self._metrics member in the - success case.""" + """Single location for the check for a self._metrics member in the success case.""" if self._metrics is not None: - client_name = 'si' if self._use_si else 'data' - self._metrics.observe( - start, - StorageClientWrapper._current(), - value, - action, - client_name, - name, - ) + self._metrics.observe(start, StorageClientWrapper._current(), value, action, 'si', name) def get(self, working_directory, uri): """ @@ -156,15 +126,12 @@ def get(self, working_directory, uri): :param uri: str this is an Artifact URI, representing the file to be retrieved. """ - self._logger.debug(f'Being get for {uri} in {working_directory}') + self._logger.debug(f'Begin get for {uri} in {working_directory}') start = StorageClientWrapper._current() try: archive, f_name = self._decompose(uri) fqn = path.join(working_directory, f_name) - if self._use_si: - self._cadc_client.cadcget(uri, dest=fqn) - else: - self._cadc_client.get_file(archive, f_name, destination=fqn) + self._cadc_client.cadcget(uri, dest=fqn) except Exception as e: self._add_fail_metric('get', uri) self._logger.debug(traceback.format_exc()) @@ -186,11 +153,7 @@ def get_head(self, uri): try: b = BytesIO() b.name = uri - if self._use_si: - self._cadc_client.cadcget(uri, b, fhead=True) - else: - archive, f_name = StorageClientWrapper._decompose(uri) - self._cadc_client.get_file(archive, f_name, b, fhead=True) + self._cadc_client.cadcget(uri, b, fhead=True) fits_header = b.getvalue().decode('ascii') b.close() self._add_metric('get_head', uri, start, len(fits_header)) @@ -207,44 +170,28 @@ def get_head(self, uri): def info(self, uri): """ - Retrieve the descriptive metdata associated with a file. + Retrieve the descriptive metadata associated with a file. :param uri: str that is an Artifact URI, representing the file for which to retrieve metadata :return: cadcdata.FileInfo instance, no scheme for md5sum """ self._logger.debug(f'Begin info for {uri}') try: - if self._use_si: - result = self._cadc_client.cadcinfo(uri) - # make the result look like the other possible ways to - # obtain metadata - result.md5sum = result.md5sum.replace('md5:', '') - else: - archive, f_name = StorageClientWrapper._decompose(uri) - temp = self._cadc_client.get_file_info(archive, f_name) - result = FileInfo( - id=uri, - size=temp.get('size'), - file_type=temp.get('type'), - md5sum=temp.get('md5sum').replace('md5:', ''), - encoding=temp.get('encoding'), - ) + result = self._cadc_client.cadcinfo(uri) + # make the result look like the other possible ways to + # obtain metadata + result.md5sum = result.md5sum.replace('md5:', '') except exceptions.NotFoundException: self._logger.info(f'cadcinfo:: {uri} not found') result = None self._logger.debug('End info') return result - def put(self, working_directory, uri, stream='default'): + def put(self, working_directory, uri): """ Store a file at CADC. - :param working_directory: str fully-qualified name of where to find - the file on the local machine - :param uri: str that is an Artifact URI, representing the file to - be stored at CADC. - :param stream: str representing the namespace used by the - CadcDataClient. Not required if using the StorageInventoryClient. - 'default' is default name for a lately-created ad archive. + :param working_directory: str fully-qualified name of where to find the file on the local machine + :param uri: str that is an Artifact URI, representing the file to be stored at CADC. """ self._logger.debug(f'Begin put for {uri} in {working_directory}') start = self._current() @@ -255,41 +202,22 @@ def put(self, working_directory, uri, stream='default'): try: local_meta = get_local_file_info(fqn) encoding = get_file_encoding(fqn) - if self._use_si: - replace = True - cadc_meta = self.info(uri) - if cadc_meta is None: - replace = False - self._logger.debug( - f'uri {uri} src {fqn} replace {replace} file_type ' - f'{local_meta.file_type} encoding {encoding} md5_checksum ' - f'{local_meta.md5sum}' - ) - self._cadc_client.cadcput( - uri, - src=fqn, - replace=replace, - file_type=local_meta.file_type, - file_encoding=encoding, - md5_checksum=local_meta.md5sum, - ) - else: - archive, f_name = self._decompose(uri) - # libmagic does a worse job with guessing file types - # than ad for .fits.gz => it will say 'binary' - self._logger.debug( - f'archive {archive} f_name {f_name} archive_stream ' - f'{stream} mime_type {local_meta.file_type} ' - f'mime_encoding {encoding} md5_check True ' - ) - self._cadc_client.put_file( - archive, - f_name, - archive_stream=stream, - mime_type=local_meta.file_type, - mime_encoding=encoding, - md5_check=True, - ) + replace = True + cadc_meta = self.info(uri) + if cadc_meta is None: + replace = False + self._logger.debug( + f'uri {uri} src {fqn} replace {replace} file_type {local_meta.file_type} encoding {encoding} ' + f'md5_checksum {local_meta.md5sum}' + ) + self._cadc_client.cadcput( + uri, + src=fqn, + replace=replace, + file_type=local_meta.file_type, + file_encoding=encoding, + md5_checksum=local_meta.md5sum, + ) self._logger.info(f'Stored {fqn} at CADC.') except Exception as e: self._add_fail_metric('put', uri) @@ -311,19 +239,14 @@ def remove(self, uri): """ self._logger.debug(f'Begin remove for {uri}') start = StorageClientWrapper._current() - if self._use_si: - try: - self._cadc_client.cadcremove(uri) - except Exception as e: - self._add_fail_metric('remove', uri) - self._logger.debug(traceback.format_exc()) - self._logger.error(e) - raise exceptions.UnexpectedException( - f'Did not remove {uri} because {e}' - ) - else: - raise NotImplementedError( - 'No remove functionality for CadcDataClient' + try: + self._cadc_client.cadcremove(uri) + except Exception as e: + self._add_fail_metric('remove', uri) + self._logger.debug(traceback.format_exc()) + self._logger.error(e) + raise exceptions.UnexpectedException( + f'Did not remove {uri} because {e}' ) self._add_metric('remove', uri, start, value=None) self._logger.debug('End remove') diff --git a/caom2utils/caom2utils/tests/test_data_util.py b/caom2utils/caom2utils/tests/test_data_util.py index 886fcbab..14533d95 100644 --- a/caom2utils/caom2utils/tests/test_data_util.py +++ b/caom2utils/caom2utils/tests/test_data_util.py @@ -105,77 +105,6 @@ def test_get_file_type(): ), f'wrong type {data_util.get_file_type(key)} for {key}' -@patch('caom2utils.data_util.CadcDataClient', autospec=True) -def test_cadc_data_client(cadc_client_mock): - test_subject = Mock(autospec=True) - test_uri = 'ad:TEST/test_file.fits' - test_working_directory = Path(test_fits2caom2.TESTDATA_DIR) - test_fqn = test_working_directory / 'test_file.fits' - if test_fqn.exists(): - test_fqn.unlink() - - def info_mock(ignore1, ignore2): - return { - 'type': 'application/fits', - 'md5sum': 'abc', - 'size': 42, - } - - def get_mock(ignore1, ignore2, destination, **kwargs): - fhead = kwargs.get('fhead') - if fhead: - destination.write(TEST_HEADERS) - else: - test_fqn.write_text('CadcDataClient') - - cadc_client_mock.return_value.get_file_info.side_effect = info_mock - cadc_client_mock.return_value.get_file.side_effect = get_mock - cadc_client_mock.return_value.put_file = Mock(autospec=True) - - test_wrapper = data_util.StorageClientWrapper( - subject=test_subject, - using_storage_inventory=False, - ) - assert test_wrapper is not None, 'ctor failure' - - # info - test_result = test_wrapper.info(test_uri) - _check_info_result(test_result) - - # get_head - test_result = test_wrapper.get_head(test_uri) - _check_header_result(test_result) - - # get - test_wrapper.get(test_working_directory, test_uri) - _check_get_result(test_fqn) - - # put - test_wrapper.put(test_working_directory, test_uri) - _check_put_result(cadc_client_mock.return_value.put_file) - - # delete - with pytest.raises(NotImplementedError): - test_wrapper.remove(test_uri) - - cadc_client_mock.return_value.get_file_info.side_effect = ( - exceptions.UnexpectedException('get_file_info') - ) - cadc_client_mock.return_value.get_file.side_effect = ( - exceptions.UnexpectedException('get_file') - ) - cadc_client_mock.return_value.put_file.side_effect = ( - exceptions.UnexpectedException('put_file') - ) - _fail_mock(test_wrapper, test_uri, test_working_directory) - - cadc_client_mock.return_value.get_file_info.side_effect = ( - exceptions.NotFoundException('cadcinfo') - ) - test_result = test_wrapper.info(test_uri) - assert test_result is None, 'expected when not found' - - @patch('caom2utils.data_util.StorageInventoryClient') def test_storage_inventory_client(cadc_client_mock): test_subject = Mock(autospec=True) @@ -201,10 +130,7 @@ def get_si_mock(ignore2, dest, **kwargs): cadc_client_mock.return_value.cadcput = Mock(autospec=True) cadc_client_mock.return_value.cadcremove = Mock(autospec=True) - test_wrapper = data_util.StorageClientWrapper( - subject=test_subject, - using_storage_inventory=True, - ) + test_wrapper = data_util.StorageClientWrapper(subject=test_subject) assert test_wrapper is not None, 'ctor failure' # info @@ -264,11 +190,7 @@ def _get(working_directory, uri): client_mock.return_value.cadcget.side_effect = _get client_mock.return_value.cadcremove.side_effect = Mock() - test_wrapper = data_util.StorageClientWrapper( - subject=test_subject, - using_storage_inventory=True, - metrics=test_metrics, - ) + test_wrapper = data_util.StorageClientWrapper(subject=test_subject, metrics=test_metrics) assert test_wrapper is not None, 'ctor failure' # test metrics failure