diff --git a/python/lsst/ctrl/ingestd/entries/dataFile.py b/python/lsst/ctrl/ingestd/entries/dataFile.py new file mode 100644 index 0000000..ac39187 --- /dev/null +++ b/python/lsst/ctrl/ingestd/entries/dataFile.py @@ -0,0 +1,75 @@ +# This file is part of ctrl_ingestd +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import logging +from lsst.ctrl.ingestd.entries.entry import Entry +from lsst.daf.butler import DatasetRef, FileDataset + +LOGGER = logging.getLogger(__name__) + + +class DataFile(Entry): + """Entry representing a raw file to ingest via RawIngestTask + + Parameters + ---------- + butler: Butler + Butler associated with this entry + message: Message + Message representing data to ingest + mapper: Mapper + Mapping of RSE entry to Butler repo location + """ + + def __init__(self, butler, message, mapper): + super().__init__(butler, message, mapper) + self._populate() + + def _populate(self): + # create an object that's ingestible by the butler + self.fds = None + try: + self.fds = self._create_file_dataset(self.file_to_ingest, self.sidecar) + except Exception as e: + LOGGER.debug(e) + + def _create_file_dataset(self, butler_file: str, sidecar: dict) -> FileDataset: + """Create a FileDatset with sidecar information + + Parameters + ---------- + butler_file: `str` + full uri to butler file location + sidecar: `dict` + dictionary of the 'sidecar' metadata + """ + ref = DatasetRef.from_json(sidecar, registry=self.butler.registry) + fds = FileDataset(butler_file, ref) + return fds + + def get_data(self): + """Get data associated with this type of object + Returns + ------- + fds: FileDataset + FileDataset representing this DataProduct + """ + return self.fds diff --git a/python/lsst/ctrl/ingestd/entries/dataProduct.py b/python/lsst/ctrl/ingestd/entries/dataProduct.py new file mode 100644 index 0000000..a706e6a --- /dev/null +++ b/python/lsst/ctrl/ingestd/entries/dataProduct.py @@ -0,0 +1,42 @@ +# This file is part of ctrl_ingestd +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import logging +from lsst.ctrl.ingestd.entries.dataFile import DataFile + +LOGGER = logging.getLogger(__name__) + + +class DataProduct(DataFile): + """Entry representing a data product to put into the butler + + Parameters + ---------- + butler: Butler + Butler associated with this entry + message: Message + Message representing data to ingest + mapper: Mapper + Mapping of RSE entry to Butler repo location + """ + + def __init__(self, butler, message, mapper): + super().__init__(butler, message, mapper) diff --git a/python/lsst/ctrl/ingestd/entries/dataType.py b/python/lsst/ctrl/ingestd/entries/dataType.py new file mode 100644 index 0000000..7c31a99 --- /dev/null +++ b/python/lsst/ctrl/ingestd/entries/dataType.py @@ -0,0 +1,24 @@ +# This file is part of ctrl_ingestd +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +class DataType: + DATA_PRODUCT = "data_product" + RAW_FILE = "raw_file" diff --git a/python/lsst/ctrl/ingestd/entries/entry.py b/python/lsst/ctrl/ingestd/entries/entry.py new file mode 100644 index 0000000..fd46d7c --- /dev/null +++ b/python/lsst/ctrl/ingestd/entries/entry.py @@ -0,0 +1,63 @@ +# This file is part of ctrl_ingestd +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import logging +LOGGER = logging.getLogger(__name__) + + +class Entry: + """Generic representation of data to put into the Butler + + Parameters + ---------- + butler: Butler + Butler associated with this entry + message: Message + Message representing data to ingest + mapper: Mapper + Mapping of RSE entry to Butler repo location + """ + + def __init__(self, butler, message, mapper=None): + self.butler = butler + self.message = message + self.mapper = mapper + + self.data_type = message.get_rubin_butler() + self.sidecar = message.get_rubin_sidecar_dict() + LOGGER.debug(f"{message=} {self.data_type=} {self.sidecar=}") + if self.data_type is None: + raise RuntimeError(f"shouldn't have gotten this: {message}") + + # Rewrite the Rucio URL to actual file location + dst_url = self.message.get_dst_url() + self.file_to_ingest = self.mapper.rewrite(self.message.get_dst_rse(), dst_url) + + if self.file_to_ingest == dst_url: + # Avoid E501 + m = f"failed to map {self.file_to_ingest}; check config file for incorrect mapping" + raise RuntimeError(m) + + def get_data_type(self): + return self.data_type + + def get_data(self): + raise RuntimeError("Shouldn't call Entry.get_data directly") diff --git a/python/lsst/ctrl/ingestd/entries/entryFactory.py b/python/lsst/ctrl/ingestd/entries/entryFactory.py new file mode 100644 index 0000000..035f356 --- /dev/null +++ b/python/lsst/ctrl/ingestd/entries/entryFactory.py @@ -0,0 +1,64 @@ +# This file is part of ctrl_ingestd +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from lsst.ctrl.ingestd.entries.dataType import DataType +from lsst.ctrl.ingestd.entries.entry import Entry +from lsst.ctrl.ingestd.entries.dataProduct import DataProduct +from lsst.ctrl.ingestd.entries.rawFile import RawFile + + +class EntryFactory: + """Generic representation of data to put into the Butler + + Parameters + ---------- + rse_butler: RseButler + Object representing a Butler for an RSE + mapper: Mapper + mapper between rse and prefix associated with it + """ + def __init__(self, rse_butler, mapper): + self.rse_butler = rse_butler + self.butler = self.rse_butler.butler + self.mapper = mapper + + def create_entry(self, message) -> Entry: + """Create an Entry object + + Parameters + ---------- + message: Message + Object representing a Kafka message + + Returns + ------- + entry: Entry + An object presented by base class Entry + """ + data_type = message.get_rubin_butler() + + match data_type: + case DataType.DATA_PRODUCT: + return DataProduct(self.butler, message, self.mapper) + case DataType.RAW_FILE: + return RawFile(self.butler, message, self.mapper) + case _: + raise ValueError(f"Unknown rubin_butler type: {data_type}") diff --git a/python/lsst/ctrl/ingestd/entries/rawFile.py b/python/lsst/ctrl/ingestd/entries/rawFile.py new file mode 100644 index 0000000..e6f428e --- /dev/null +++ b/python/lsst/ctrl/ingestd/entries/rawFile.py @@ -0,0 +1,36 @@ +# This file is part of ctrl_ingestd +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import logging +from lsst.ctrl.ingestd.entries.dataFile import DataFile + +LOGGER = logging.getLogger(__name__) + + +class RawFile(DataFile): + """Entry representing a raw file to ingest via RawIngestTask + + Parameters + ---------- + """ + + def __init__(self, butler, message, mapper): + super().__init__(butler, message, mapper) diff --git a/python/lsst/ctrl/ingestd/ingestd.py b/python/lsst/ctrl/ingestd/ingestd.py index e8ad6df..42dac9c 100644 --- a/python/lsst/ctrl/ingestd/ingestd.py +++ b/python/lsst/ctrl/ingestd/ingestd.py @@ -28,6 +28,7 @@ from lsst.ctrl.ingestd.mapper import Mapper from lsst.ctrl.ingestd.message import Message from lsst.ctrl.ingestd.rseButler import RseButler +from lsst.ctrl.ingestd.entries.entryFactory import EntryFactory LOGGER = logging.getLogger(__name__) @@ -65,7 +66,8 @@ def __init__(self): self.consumer = Consumer(conf) self.consumer.subscribe(topics) - self.butler = RseButler(config.get_repo()) + self.rse_butler = RseButler(config.get_repo()) + self.entry_factory = EntryFactory(self.rse_butler, self.mapper) def run(self): """continually process messages""" @@ -78,7 +80,7 @@ def process(self): # read up to self.num_messages, with a timeout of self.timeout msgs = self.consumer.consume(num_messages=self.num_messages, timeout=self.timeout) # just return if there are no messages - if msgs is None: + if not msgs: return # cycle through all the messages, rewriting the Rucio URL @@ -92,35 +94,11 @@ def process(self): logging.info(msg.value()) logging.info(e) continue - rubin_butler = message.get_rubin_butler() - sidecar = message.get_rubin_sidecar_dict() - logging.debug(f"{message=} {rubin_butler=} {sidecar=}") - - if rubin_butler is None: - logging.warning("shouldn't have gotten this message: %s" % message) - continue - - # Rewrite the Rucio URL to actual file location - dst_url = message.get_dst_url() - file_to_ingest = self.mapper.rewrite(message.get_dst_rse(), dst_url) - - if file_to_ingest == dst_url: - logging.warn( - f"failed to map {file_to_ingest}; check {self.config_file} for incorrect mapping" - ) - continue - - # create an object that's ingestible by the butler - # and add it to the list - try: - entry = self.butler.create_entry(file_to_ingest, sidecar) - except Exception as e: - logging.info(e) - continue + entry = self.entry_factory.create_entry(message) entries.append(entry) # if we've got anything in the list, try and ingest it. if len(entries) > 0: - self.butler.ingest(entries) + self.rse_butler.ingest(entries) if __name__ == "__main__": diff --git a/python/lsst/ctrl/ingestd/rseButler.py b/python/lsst/ctrl/ingestd/rseButler.py index 85afa26..c14789b 100644 --- a/python/lsst/ctrl/ingestd/rseButler.py +++ b/python/lsst/ctrl/ingestd/rseButler.py @@ -21,8 +21,10 @@ import logging -from lsst.daf.butler import Butler, DatasetRef, FileDataset +from lsst.ctrl.ingestd.entries.dataType import DataType +from lsst.daf.butler import Butler from lsst.daf.butler.registry import DatasetTypeError, MissingCollectionError +from lsst.obs.base.ingest import RawIngestConfig, RawIngestTask LOGGER = logging.getLogger(__name__) @@ -39,38 +41,87 @@ class RseButler: def __init__(self, repo: str): self.butler = Butler(repo, writeable=True) + cfg = RawIngestConfig() + cfg.transfer = "direct" + self.task = RawIngestTask( + config=cfg, + butler=self.butler, + on_success=self.on_success, + on_ingest_failure=self.on_ingest_failure, + on_metadata_failure=self.on_metadata_failure, + ) - def create_entry(self, butler_file: str, sidecar: str) -> FileDataset: - """Create a FileDatset with sidecar information + def ingest(self, entries: list): + """ingest a list of datasets Parameters ---------- - butler_file: `str` - full uri to butler file location - sidecar: `dict` - dictionary of the 'sidecar' metadata + entries : `list` + List of Entry """ - ref = DatasetRef.from_json(sidecar, registry=self.butler.registry) - fds = FileDataset(butler_file, ref) - return fds - def ingest(self, datasets: list): - """Ingest a list of Datasets + # + # group entries by data type, so they can be run in batches + # + data_type_dict = {} + LOGGER.debug(f"{entries=}") + for entry in entries: + data_type = entry.get_data_type() + if data_type not in data_type_dict: + data_type_dict[data_type] = [] + LOGGER.debug(f"adding {data_type=}, {entry=}") + data_type_dict[data_type].append(entry) + + LOGGER.debug(f"{data_type_dict=}") + if DataType.RAW_FILE in data_type_dict: + LOGGER.debug(f"ingesting - {DataType.RAW_FILE}") + self._ingest(data_type_dict[DataType.RAW_FILE], "direct", True) + if DataType.DATA_PRODUCT in data_type_dict: + LOGGER.debug(f"ingesting - {DataType.DATA_PRODUCT}") + self._ingest(data_type_dict[DataType.DATA_PRODUCT], "auto", False) + LOGGER.debug("done processing") + + def _ingest_raw(self, entries: list): + """ingest using raw Task + + Parameters + ---------- + entries : `list` + List of Entry + """ + try: + files = [e.file_to_ingest for e in entries] + LOGGER.debug(f"{files=}") + self.task.run(files) + except Exception as e: + LOGGER.warning(e) + + def _ingest(self, entries: list, transfer, retry_as_raw): + """ingest data with dataset refs, optionally + retrying using RawIngestTask Parameters ---------- - datasets : `list` - List of Datasets + entries : `list` + List of Entry + transfer: `str` + Butler transfer type + retry_as_raw: `bool` + on ingest failure, retry using RawIngestTask """ + LOGGER.debug(f"{entries=}") completed = False + + datasets = [e.get_data() for e in entries] + while not completed: try: - self.butler.ingest(*datasets, transfer="auto") - LOGGER.debug("ingest succeeded") + self.butler.ingest(*datasets, transfer=transfer) for dataset in datasets: LOGGER.info(f"ingested: {dataset.path}") completed = True except DatasetTypeError: + LOGGER.debug("DatasetTypeError") dst_set = set() for dataset in datasets: for dst in {ref.datasetType for ref in dataset.refs}: @@ -78,6 +129,7 @@ def ingest(self, datasets: list): for dst in dst_set: self.butler.registry.registerDatasetType(dst) except MissingCollectionError: + LOGGER.debug("MissingCollectionError") run_set = set() for dataset in datasets: for run in {ref.run for ref in dataset.refs}: @@ -85,5 +137,72 @@ def ingest(self, datasets: list): for run in run_set: self.butler.registry.registerRun(run) except Exception as e: - LOGGER.warning(e) + if retry_as_raw: + self._ingest_raw(entries) + else: + LOGGER.warning(e) completed = True + + def on_success(self, datasets): + """Callback used on successful ingest. Used to transmit + successful data ingestion status + + Parameters + ---------- + datasets: `list` + list of DatasetRefs + """ + for dataset in datasets: + LOGGER.info(f"ingested: {dataset.path}") + + def on_ingest_failure(self, exposures, exc): + """Callback used on ingest failure. Used to transmit + unsuccessful data ingestion status + + Parameters + ---------- + exposures: `RawExposureData` + exposures that failed in ingest + exc: `Exception` + Exception which explains what happened + + """ + for f in exposures.files: + filename = f.filename + cause = self.extract_cause(exc) + LOGGER.info(f"{filename}: ingest failure: {cause}") + + def on_metadata_failure(self, filename, exc): + """Callback used on metadata extraction failure. Used to transmit + unsuccessful data ingestion status + + Parameters + ---------- + filename: `ButlerURI` + ButlerURI that failed in ingest + exc: `Exception` + Exception which explains what happened + """ + cause = self.extract_cause(exc) + LOGGER.info(f"{filename}: metadata failure: {cause}") + + def extract_cause(self, e): + """extract the cause of an exception + + Parameters + ---------- + e : `BaseException` + exception to extract cause from + + Returns + ------- + s : `str` + A string containing the cause of an exception + """ + if e.__cause__ is None: + return f"{e}" + cause = self.extract_cause(e.__cause__) + if cause is None: + return f"{str(e.__cause__)}" + else: + return f"{str(e.__cause__)}; {cause}" diff --git a/tests/data/message.json b/tests/data/message.json index 3234897..17eba50 100644 --- a/tests/data/message.json +++ b/tests/data/message.json @@ -1 +1 @@ -{"event_type": "transfer-done", "payload": {"activity": "User Subscriptions", "request-id": "217d5d321a394d349b82829cbb410b3f", "duration": 3, "checksum-adler": "3210cb3f", "checksum-md5": "7018315389da34f8e4d1d2d3c57b014a", "file-size": 1365120, "bytes": 1365120, "guid": "None", "previous-request-id": "None", "protocol": "root", "scope": "test", "name": "something/visitSummary_HSC_y_HSC-Y_328_HSC_runs_RC2_w_2023_32_DM-40356_20230814T170253Z.fits", "dataset": "rubin_dataset", "datasetScope": "test", "src-type": "DISK", "src-rse": "XRD1", "src-url": "root://xrd1:1094//rucio/test/something/visitSummary_HSC_y_HSC-Y_328_HSC_runs_RC2_w_2023_32_DM-40356_20230814T170253Z.fits", "dst-type": "DISK", "dst-rse": "XRD3", "dst-url": "root://xrd3:1096//rucio/test/something/visitSummary_HSC_y_HSC-Y_328_HSC_runs_RC2_w_2023_32_DM-40356_20230814T170253Z.fits", "reason": "", "transfer-endpoint": "https://fts:8446", "transfer-id": "602975dc-d445-11ef-b2f8-0242ac130013", "transfer-link": "None", "created_at": "2025-01-16 20:05:52.814896", "submitted_at": "2025-01-16 20:06:30.300257", "started_at": "2025-01-16 20:06:41", "transferred_at": "2025-01-16 20:06:44", "tool-id": "rucio-conveyor", "account": "root", "datatype": "None", "transfer_link": "https://fts:8449/fts3/ftsmon/#/job/602975dc-d445-11ef-b2f8-0242ac130013", "rubin_butler": 1, "rubin_sidecar": {"id":"0ef08762-b0dd-4a02-8b1c-e09b1544992d","datasetType":{"name":"visitSummary","storageClass":"ExposureCatalog","dimensions":["instrument","visit"]},"dataId":{"dataId":{"instrument":"HSC","visit":328,"band":"y","physical_filter":"HSC-Y"}},"run":"HSC/runs/RC2/w_2023_32/DM-40356/20230814T170253Z"}}, "created_at": "2025-01-16 20:07:16.564403"} +{"id": "cdcb2ee0cebe471aa36196e28de33db4", "created_at": "datetime.datetime(2023, 11, 29, 20, 19, 3, 275833)", "event_type": "transfer-done", "services": "kafka", "payload": {"activity": "User Subscriptions", "request-id": "a50f0e3ad52a4b1ea7f072a719b5141f", "duration": 8, "checksum-adler": "95935a72", "checksum-md5": "None", "file-size": 57980160, "bytes": 57980160, "guid": "None", "previous-request-id": "None", "protocol": "root", "scope": "test", "name": "srp/data/calexp_HSC_y_HSC-Y_330_1_54_HSC_runs_RC2_w_2023_32_DM-40356_20230812T080035Z.fits", "dataset": "None", "datasetScope": "None", "src-type": "DISK", "src-rse": "XRD1", "src-url": "root://xrd1:1094//rucio/test/srp/data/calexp_HSC_y_HSC-Y_330_1_54_HSC_runs_RC2_w_2023_32_DM-40356_20230812T080035Z.fits", "dst-type": "DISK", "dst-rse": "XRD2", "dst-url": "root://xrd2:1095//rucio/test/srp/data/calexp_HSC_y_HSC-Y_330_1_54_HSC_runs_RC2_w_2023_32_DM-40356_20230812T080035Z.fits", "reason": "", "transfer-endpoint": "https://fts:8446", "transfer-id": "6dfb5aaa-8ef4-11ee-9688-0242ac16000b", "transfer-link": "None", "created_at": "2023-11-29 20:18:04.425276", "submitted_at": "2023-11-29 20:18:16.351754", "started_at": "2023-11-29 20:18:31", "transferred_at": "2023-11-29 20:18:39", "tool-id": "rucio-conveyor", "account": "root", "datatype": "None", "transfer_link": "https://fts:8449/fts3/ftsmon/#/job/6dfb5aaa-8ef4-11ee-9688-0242ac16000b", "rubin_butler": "data_product", "rubin_sidecar": {"id": "00a86e99-7661-4f14-ae0d-93d3d4162e26", "datasetType": {"name": "calexp", "storageClass": "ExposureF", "dimensions": {"names": ["band", "instrument", "detector", "physical_filter", "visit"]}}, "dataId": {"dataId": {"band": "y", "instrument": "HSC", "detector": 1, "physical_filter": "HSC-Y", "visit": 330}}, "run": "HSC/runs/RC2/w_2023_32/DM-40356/20230812T080035Z"}}} diff --git a/tests/data/raw_message.json b/tests/data/raw_message.json new file mode 100644 index 0000000..ad3282d --- /dev/null +++ b/tests/data/raw_message.json @@ -0,0 +1 @@ +{"event_type": "transfer-done", "payload": {"activity": "User Subscriptions", "request-id": "d690018f01b44b21a2ef38608faba652", "duration": 4, "checksum-adler": "c61efd01", "checksum-md5": "ce78c8bc805d0e69792d71073a2f5ccf", "file-size": 15073920, "bytes": 15073920, "guid": null, "previous-request-id": null, "protocol": "root", "scope": "test", "name": "something/AT_O_20250113_000001_R00_S00.fits", "dataset": "rubin_dataset", "datasetScope": "test", "src-type": "DISK", "src-rse": "XRD1", "src-url": "root://xrd1:1094//rucio/test/something/AT_O_20250113_000001_R00_S00.fits", "dst-type": "DISK", "dst-rse": "XRD3", "dst-url": "root://xrd3:1096//rucio/test/something/AT_O_20250113_000001_R00_S00.fits", "reason": "", "transfer-endpoint": "https://fts:8446", "transfer-id": "42e545c6-e581-11ef-9ff9-0242ac130005", "transfer-link": null, "created_at": "2025-02-07 18:27:32.947290", "submitted_at": "2025-02-07 18:28:00.769745", "started_at": "2025-02-07 18:28:03", "transferred_at": "2025-02-07 18:28:07", "tool-id": "rucio-conveyor", "account": "root", "datatype": null, "transfer_link": "https://fts:8449/fts3/ftsmon/#/job/42e545c6-e581-11ef-9ff9-0242ac130005", "rubin_butler": "raw_file", "rubin_sidecar": {"id": "9db1123b-b6c9-5678-90a4-6762d31d5712","datasetType":{"name":"raw","storageClass":"Exposure","dimensions":["instrument","detector","exposure"]},"dataId":{"dataId":{"instrument":"LATISS","detector":0,"exposure":2025011300001,"band":"white","day_obs":20250113,"group":"2025-01-13T18:08:19.460","physical_filter":"empty~empty"}},"run":"LATISS/raw/all"}}, "created_at": "2025-02-07 18:28:12.330612"} diff --git a/tests/etc/ingestd.yml b/tests/etc/ingestd.yml new file mode 100644 index 0000000..d2a5c82 --- /dev/null +++ b/tests/etc/ingestd.yml @@ -0,0 +1,22 @@ +brokers: kafka:9092 + +group_id: "my_test_group" +num_messages: 50 + +butler: + repo: /tmp/repo + +rses: + XRD1: + rucio_prefix: root://xrd1:1094//rucio + fs_prefix: file:///rucio/disks/xrd1/rucio + XRD2: + rucio_prefix: root://xrd2:1095//rucio + fs_prefix: file:///rucio/disks/xrd2/rucio + XRD3: + rucio_prefix: root://xrd3:1096//rucio + fs_prefix: file:///rucio/disks/xrd3/rucio + XRD4: + rucio_prefix: root://xrd4:1097//rucio + fs_prefix: file:///rucio/disks/xrd4/rucio + diff --git a/tests/test_message.py b/tests/test_message.py index a3b2bcf..338c7d0 100644 --- a/tests/test_message.py +++ b/tests/test_message.py @@ -22,6 +22,7 @@ import os.path import lsst.utils.tests +from lsst.ctrl.ingestd.entries.dataType import DataType from lsst.ctrl.ingestd.message import Message @@ -55,7 +56,7 @@ def testAttributes(self): "RC2_w_2023_32_DM-40356_20230814T170253Z.fits" ), ) - self.assertEqual(self.msg.get_rubin_butler(), 1) + self.assertEqual(self.msg.get_rubin_butler(), DataType.DATA_PRODUCT) sidecar_str = self.msg.get_rubin_sidecar_str() print(f"{sidecar_str=}") self.assertEqual( diff --git a/tests/test_rseButler.py b/tests/test_rseButler.py index 79ab8f7..7fae2a5 100644 --- a/tests/test_rseButler.py +++ b/tests/test_rseButler.py @@ -23,6 +23,9 @@ import tempfile import lsst.utils.tests +from lsst.ctrl.ingestd.config import Config +from lsst.ctrl.ingestd.entries.entryFactory import EntryFactory +from lsst.ctrl.ingestd.mapper import Mapper from lsst.ctrl.ingestd.message import Message from lsst.ctrl.ingestd.rseButler import RseButler from lsst.daf.butler import Butler @@ -38,6 +41,7 @@ def value(self) -> str: class RseButlerTestCase(lsst.utils.tests.TestCase): + def testRseButler(self): json_name = "message.json" testdir = os.path.abspath(os.path.dirname(__file__)) @@ -61,10 +65,41 @@ def testRseButler(self): instr.register(butler.butler.registry) butler.butler.import_(filename=prep_file) - with tempfile.NamedTemporaryFile() as temp_file: - sidecar_str = self.msg.get_rubin_sidecar_str() - fds = butler.create_entry(temp_file.name, sidecar_str) - butler.ingest([fds]) + config_file = os.path.join(testdir, "etc", "ingestd.yml") + config = Config(config_file) + mapper = Mapper(config.get_rses()) + + event_factory = EntryFactory(butler, mapper) + entry = event_factory.create_entry(self.msg) + butler.ingest([entry]) + + def testRseButler2(self): + + json_name = "raw_message.json" + testdir = os.path.abspath(os.path.dirname(__file__)) + json_file = os.path.join(testdir, "data", json_name) + + with open(json_file) as f: + fake_data = f.read() + + fake_msg = FakeKafkaMessage(fake_data) + self.msg = Message(fake_msg) + + self.repo_dir = tempfile.mkdtemp() + Butler.makeRepo(self.repo_dir) + + butler = RseButler(self.repo_dir) + instr = Instrument.from_string("lsst.obs.lsst.Latiss") + + instr.register(butler.butler.registry) + + config_file = os.path.join(testdir, "etc", "ingestd.yml") + config = Config(config_file) + mapper = Mapper(config.get_rses()) + + event_factory = EntryFactory(butler, mapper) + entry = event_factory.create_entry(self.msg) + butler.ingest([entry]) class MemoryTester(lsst.utils.tests.MemoryTestCase):