diff --git a/python/lsst/ctrl/ingestd/entries/dataFile.py b/python/lsst/ctrl/ingestd/entries/dataFile.py
new file mode 100644
index 0000000..ac39187
--- /dev/null
+++ b/python/lsst/ctrl/ingestd/entries/dataFile.py
@@ -0,0 +1,75 @@
+# This file is part of ctrl_ingestd
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (https://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import logging
+from lsst.ctrl.ingestd.entries.entry import Entry
+from lsst.daf.butler import DatasetRef, FileDataset
+
+LOGGER = logging.getLogger(__name__)
+
+
+class DataFile(Entry):
+ """Entry representing a raw file to ingest via RawIngestTask
+
+ Parameters
+ ----------
+ butler: Butler
+ Butler associated with this entry
+ message: Message
+ Message representing data to ingest
+ mapper: Mapper
+ Mapping of RSE entry to Butler repo location
+ """
+
+ def __init__(self, butler, message, mapper):
+ super().__init__(butler, message, mapper)
+ self._populate()
+
+ def _populate(self):
+ # create an object that's ingestible by the butler
+ self.fds = None
+ try:
+ self.fds = self._create_file_dataset(self.file_to_ingest, self.sidecar)
+ except Exception as e:
+ LOGGER.debug(e)
+
+ def _create_file_dataset(self, butler_file: str, sidecar: dict) -> FileDataset:
+ """Create a FileDatset with sidecar information
+
+ Parameters
+ ----------
+ butler_file: `str`
+ full uri to butler file location
+ sidecar: `dict`
+ dictionary of the 'sidecar' metadata
+ """
+ ref = DatasetRef.from_json(sidecar, registry=self.butler.registry)
+ fds = FileDataset(butler_file, ref)
+ return fds
+
+ def get_data(self):
+ """Get data associated with this type of object
+ Returns
+ -------
+ fds: FileDataset
+ FileDataset representing this DataProduct
+ """
+ return self.fds
diff --git a/python/lsst/ctrl/ingestd/entries/dataProduct.py b/python/lsst/ctrl/ingestd/entries/dataProduct.py
new file mode 100644
index 0000000..a706e6a
--- /dev/null
+++ b/python/lsst/ctrl/ingestd/entries/dataProduct.py
@@ -0,0 +1,42 @@
+# This file is part of ctrl_ingestd
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (https://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import logging
+from lsst.ctrl.ingestd.entries.dataFile import DataFile
+
+LOGGER = logging.getLogger(__name__)
+
+
+class DataProduct(DataFile):
+ """Entry representing a data product to put into the butler
+
+ Parameters
+ ----------
+ butler: Butler
+ Butler associated with this entry
+ message: Message
+ Message representing data to ingest
+ mapper: Mapper
+ Mapping of RSE entry to Butler repo location
+ """
+
+ def __init__(self, butler, message, mapper):
+ super().__init__(butler, message, mapper)
diff --git a/python/lsst/ctrl/ingestd/entries/dataType.py b/python/lsst/ctrl/ingestd/entries/dataType.py
new file mode 100644
index 0000000..7c31a99
--- /dev/null
+++ b/python/lsst/ctrl/ingestd/entries/dataType.py
@@ -0,0 +1,24 @@
+# This file is part of ctrl_ingestd
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (https://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+class DataType:
+ DATA_PRODUCT = "data_product"
+ RAW_FILE = "raw_file"
diff --git a/python/lsst/ctrl/ingestd/entries/entry.py b/python/lsst/ctrl/ingestd/entries/entry.py
new file mode 100644
index 0000000..fd46d7c
--- /dev/null
+++ b/python/lsst/ctrl/ingestd/entries/entry.py
@@ -0,0 +1,63 @@
+# This file is part of ctrl_ingestd
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (https://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import logging
+LOGGER = logging.getLogger(__name__)
+
+
+class Entry:
+ """Generic representation of data to put into the Butler
+
+ Parameters
+ ----------
+ butler: Butler
+ Butler associated with this entry
+ message: Message
+ Message representing data to ingest
+ mapper: Mapper
+ Mapping of RSE entry to Butler repo location
+ """
+
+ def __init__(self, butler, message, mapper=None):
+ self.butler = butler
+ self.message = message
+ self.mapper = mapper
+
+ self.data_type = message.get_rubin_butler()
+ self.sidecar = message.get_rubin_sidecar_dict()
+ LOGGER.debug(f"{message=} {self.data_type=} {self.sidecar=}")
+ if self.data_type is None:
+ raise RuntimeError(f"shouldn't have gotten this: {message}")
+
+ # Rewrite the Rucio URL to actual file location
+ dst_url = self.message.get_dst_url()
+ self.file_to_ingest = self.mapper.rewrite(self.message.get_dst_rse(), dst_url)
+
+ if self.file_to_ingest == dst_url:
+ # Avoid E501
+ m = f"failed to map {self.file_to_ingest}; check config file for incorrect mapping"
+ raise RuntimeError(m)
+
+ def get_data_type(self):
+ return self.data_type
+
+ def get_data(self):
+ raise RuntimeError("Shouldn't call Entry.get_data directly")
diff --git a/python/lsst/ctrl/ingestd/entries/entryFactory.py b/python/lsst/ctrl/ingestd/entries/entryFactory.py
new file mode 100644
index 0000000..035f356
--- /dev/null
+++ b/python/lsst/ctrl/ingestd/entries/entryFactory.py
@@ -0,0 +1,64 @@
+# This file is part of ctrl_ingestd
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (https://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+from lsst.ctrl.ingestd.entries.dataType import DataType
+from lsst.ctrl.ingestd.entries.entry import Entry
+from lsst.ctrl.ingestd.entries.dataProduct import DataProduct
+from lsst.ctrl.ingestd.entries.rawFile import RawFile
+
+
+class EntryFactory:
+ """Generic representation of data to put into the Butler
+
+ Parameters
+ ----------
+ rse_butler: RseButler
+ Object representing a Butler for an RSE
+ mapper: Mapper
+ mapper between rse and prefix associated with it
+ """
+ def __init__(self, rse_butler, mapper):
+ self.rse_butler = rse_butler
+ self.butler = self.rse_butler.butler
+ self.mapper = mapper
+
+ def create_entry(self, message) -> Entry:
+ """Create an Entry object
+
+ Parameters
+ ----------
+ message: Message
+ Object representing a Kafka message
+
+ Returns
+ -------
+ entry: Entry
+ An object presented by base class Entry
+ """
+ data_type = message.get_rubin_butler()
+
+ match data_type:
+ case DataType.DATA_PRODUCT:
+ return DataProduct(self.butler, message, self.mapper)
+ case DataType.RAW_FILE:
+ return RawFile(self.butler, message, self.mapper)
+ case _:
+ raise ValueError(f"Unknown rubin_butler type: {data_type}")
diff --git a/python/lsst/ctrl/ingestd/entries/rawFile.py b/python/lsst/ctrl/ingestd/entries/rawFile.py
new file mode 100644
index 0000000..e6f428e
--- /dev/null
+++ b/python/lsst/ctrl/ingestd/entries/rawFile.py
@@ -0,0 +1,36 @@
+# This file is part of ctrl_ingestd
+#
+# Developed for the LSST Data Management System.
+# This product includes software developed by the LSST Project
+# (https://www.lsst.org).
+# See the COPYRIGHT file at the top-level directory of this distribution
+# for details of code ownership.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+import logging
+from lsst.ctrl.ingestd.entries.dataFile import DataFile
+
+LOGGER = logging.getLogger(__name__)
+
+
+class RawFile(DataFile):
+ """Entry representing a raw file to ingest via RawIngestTask
+
+ Parameters
+ ----------
+ """
+
+ def __init__(self, butler, message, mapper):
+ super().__init__(butler, message, mapper)
diff --git a/python/lsst/ctrl/ingestd/ingestd.py b/python/lsst/ctrl/ingestd/ingestd.py
index e8ad6df..42dac9c 100644
--- a/python/lsst/ctrl/ingestd/ingestd.py
+++ b/python/lsst/ctrl/ingestd/ingestd.py
@@ -28,6 +28,7 @@
from lsst.ctrl.ingestd.mapper import Mapper
from lsst.ctrl.ingestd.message import Message
from lsst.ctrl.ingestd.rseButler import RseButler
+from lsst.ctrl.ingestd.entries.entryFactory import EntryFactory
LOGGER = logging.getLogger(__name__)
@@ -65,7 +66,8 @@ def __init__(self):
self.consumer = Consumer(conf)
self.consumer.subscribe(topics)
- self.butler = RseButler(config.get_repo())
+ self.rse_butler = RseButler(config.get_repo())
+ self.entry_factory = EntryFactory(self.rse_butler, self.mapper)
def run(self):
"""continually process messages"""
@@ -78,7 +80,7 @@ def process(self):
# read up to self.num_messages, with a timeout of self.timeout
msgs = self.consumer.consume(num_messages=self.num_messages, timeout=self.timeout)
# just return if there are no messages
- if msgs is None:
+ if not msgs:
return
# cycle through all the messages, rewriting the Rucio URL
@@ -92,35 +94,11 @@ def process(self):
logging.info(msg.value())
logging.info(e)
continue
- rubin_butler = message.get_rubin_butler()
- sidecar = message.get_rubin_sidecar_dict()
- logging.debug(f"{message=} {rubin_butler=} {sidecar=}")
-
- if rubin_butler is None:
- logging.warning("shouldn't have gotten this message: %s" % message)
- continue
-
- # Rewrite the Rucio URL to actual file location
- dst_url = message.get_dst_url()
- file_to_ingest = self.mapper.rewrite(message.get_dst_rse(), dst_url)
-
- if file_to_ingest == dst_url:
- logging.warn(
- f"failed to map {file_to_ingest}; check {self.config_file} for incorrect mapping"
- )
- continue
-
- # create an object that's ingestible by the butler
- # and add it to the list
- try:
- entry = self.butler.create_entry(file_to_ingest, sidecar)
- except Exception as e:
- logging.info(e)
- continue
+ entry = self.entry_factory.create_entry(message)
entries.append(entry)
# if we've got anything in the list, try and ingest it.
if len(entries) > 0:
- self.butler.ingest(entries)
+ self.rse_butler.ingest(entries)
if __name__ == "__main__":
diff --git a/python/lsst/ctrl/ingestd/rseButler.py b/python/lsst/ctrl/ingestd/rseButler.py
index 85afa26..c14789b 100644
--- a/python/lsst/ctrl/ingestd/rseButler.py
+++ b/python/lsst/ctrl/ingestd/rseButler.py
@@ -21,8 +21,10 @@
import logging
-from lsst.daf.butler import Butler, DatasetRef, FileDataset
+from lsst.ctrl.ingestd.entries.dataType import DataType
+from lsst.daf.butler import Butler
from lsst.daf.butler.registry import DatasetTypeError, MissingCollectionError
+from lsst.obs.base.ingest import RawIngestConfig, RawIngestTask
LOGGER = logging.getLogger(__name__)
@@ -39,38 +41,87 @@ class RseButler:
def __init__(self, repo: str):
self.butler = Butler(repo, writeable=True)
+ cfg = RawIngestConfig()
+ cfg.transfer = "direct"
+ self.task = RawIngestTask(
+ config=cfg,
+ butler=self.butler,
+ on_success=self.on_success,
+ on_ingest_failure=self.on_ingest_failure,
+ on_metadata_failure=self.on_metadata_failure,
+ )
- def create_entry(self, butler_file: str, sidecar: str) -> FileDataset:
- """Create a FileDatset with sidecar information
+ def ingest(self, entries: list):
+ """ingest a list of datasets
Parameters
----------
- butler_file: `str`
- full uri to butler file location
- sidecar: `dict`
- dictionary of the 'sidecar' metadata
+ entries : `list`
+ List of Entry
"""
- ref = DatasetRef.from_json(sidecar, registry=self.butler.registry)
- fds = FileDataset(butler_file, ref)
- return fds
- def ingest(self, datasets: list):
- """Ingest a list of Datasets
+ #
+ # group entries by data type, so they can be run in batches
+ #
+ data_type_dict = {}
+ LOGGER.debug(f"{entries=}")
+ for entry in entries:
+ data_type = entry.get_data_type()
+ if data_type not in data_type_dict:
+ data_type_dict[data_type] = []
+ LOGGER.debug(f"adding {data_type=}, {entry=}")
+ data_type_dict[data_type].append(entry)
+
+ LOGGER.debug(f"{data_type_dict=}")
+ if DataType.RAW_FILE in data_type_dict:
+ LOGGER.debug(f"ingesting - {DataType.RAW_FILE}")
+ self._ingest(data_type_dict[DataType.RAW_FILE], "direct", True)
+ if DataType.DATA_PRODUCT in data_type_dict:
+ LOGGER.debug(f"ingesting - {DataType.DATA_PRODUCT}")
+ self._ingest(data_type_dict[DataType.DATA_PRODUCT], "auto", False)
+ LOGGER.debug("done processing")
+
+ def _ingest_raw(self, entries: list):
+ """ingest using raw Task
+
+ Parameters
+ ----------
+ entries : `list`
+ List of Entry
+ """
+ try:
+ files = [e.file_to_ingest for e in entries]
+ LOGGER.debug(f"{files=}")
+ self.task.run(files)
+ except Exception as e:
+ LOGGER.warning(e)
+
+ def _ingest(self, entries: list, transfer, retry_as_raw):
+ """ingest data with dataset refs, optionally
+ retrying using RawIngestTask
Parameters
----------
- datasets : `list`
- List of Datasets
+ entries : `list`
+ List of Entry
+ transfer: `str`
+ Butler transfer type
+ retry_as_raw: `bool`
+ on ingest failure, retry using RawIngestTask
"""
+ LOGGER.debug(f"{entries=}")
completed = False
+
+ datasets = [e.get_data() for e in entries]
+
while not completed:
try:
- self.butler.ingest(*datasets, transfer="auto")
- LOGGER.debug("ingest succeeded")
+ self.butler.ingest(*datasets, transfer=transfer)
for dataset in datasets:
LOGGER.info(f"ingested: {dataset.path}")
completed = True
except DatasetTypeError:
+ LOGGER.debug("DatasetTypeError")
dst_set = set()
for dataset in datasets:
for dst in {ref.datasetType for ref in dataset.refs}:
@@ -78,6 +129,7 @@ def ingest(self, datasets: list):
for dst in dst_set:
self.butler.registry.registerDatasetType(dst)
except MissingCollectionError:
+ LOGGER.debug("MissingCollectionError")
run_set = set()
for dataset in datasets:
for run in {ref.run for ref in dataset.refs}:
@@ -85,5 +137,72 @@ def ingest(self, datasets: list):
for run in run_set:
self.butler.registry.registerRun(run)
except Exception as e:
- LOGGER.warning(e)
+ if retry_as_raw:
+ self._ingest_raw(entries)
+ else:
+ LOGGER.warning(e)
completed = True
+
+ def on_success(self, datasets):
+ """Callback used on successful ingest. Used to transmit
+ successful data ingestion status
+
+ Parameters
+ ----------
+ datasets: `list`
+ list of DatasetRefs
+ """
+ for dataset in datasets:
+ LOGGER.info(f"ingested: {dataset.path}")
+
+ def on_ingest_failure(self, exposures, exc):
+ """Callback used on ingest failure. Used to transmit
+ unsuccessful data ingestion status
+
+ Parameters
+ ----------
+ exposures: `RawExposureData`
+ exposures that failed in ingest
+ exc: `Exception`
+ Exception which explains what happened
+
+ """
+ for f in exposures.files:
+ filename = f.filename
+ cause = self.extract_cause(exc)
+ LOGGER.info(f"{filename}: ingest failure: {cause}")
+
+ def on_metadata_failure(self, filename, exc):
+ """Callback used on metadata extraction failure. Used to transmit
+ unsuccessful data ingestion status
+
+ Parameters
+ ----------
+ filename: `ButlerURI`
+ ButlerURI that failed in ingest
+ exc: `Exception`
+ Exception which explains what happened
+ """
+ cause = self.extract_cause(exc)
+ LOGGER.info(f"{filename}: metadata failure: {cause}")
+
+ def extract_cause(self, e):
+ """extract the cause of an exception
+
+ Parameters
+ ----------
+ e : `BaseException`
+ exception to extract cause from
+
+ Returns
+ -------
+ s : `str`
+ A string containing the cause of an exception
+ """
+ if e.__cause__ is None:
+ return f"{e}"
+ cause = self.extract_cause(e.__cause__)
+ if cause is None:
+ return f"{str(e.__cause__)}"
+ else:
+ return f"{str(e.__cause__)}; {cause}"
diff --git a/tests/data/message.json b/tests/data/message.json
index 3234897..17eba50 100644
--- a/tests/data/message.json
+++ b/tests/data/message.json
@@ -1 +1 @@
-{"event_type": "transfer-done", "payload": {"activity": "User Subscriptions", "request-id": "217d5d321a394d349b82829cbb410b3f", "duration": 3, "checksum-adler": "3210cb3f", "checksum-md5": "7018315389da34f8e4d1d2d3c57b014a", "file-size": 1365120, "bytes": 1365120, "guid": "None", "previous-request-id": "None", "protocol": "root", "scope": "test", "name": "something/visitSummary_HSC_y_HSC-Y_328_HSC_runs_RC2_w_2023_32_DM-40356_20230814T170253Z.fits", "dataset": "rubin_dataset", "datasetScope": "test", "src-type": "DISK", "src-rse": "XRD1", "src-url": "root://xrd1:1094//rucio/test/something/visitSummary_HSC_y_HSC-Y_328_HSC_runs_RC2_w_2023_32_DM-40356_20230814T170253Z.fits", "dst-type": "DISK", "dst-rse": "XRD3", "dst-url": "root://xrd3:1096//rucio/test/something/visitSummary_HSC_y_HSC-Y_328_HSC_runs_RC2_w_2023_32_DM-40356_20230814T170253Z.fits", "reason": "", "transfer-endpoint": "https://fts:8446", "transfer-id": "602975dc-d445-11ef-b2f8-0242ac130013", "transfer-link": "None", "created_at": "2025-01-16 20:05:52.814896", "submitted_at": "2025-01-16 20:06:30.300257", "started_at": "2025-01-16 20:06:41", "transferred_at": "2025-01-16 20:06:44", "tool-id": "rucio-conveyor", "account": "root", "datatype": "None", "transfer_link": "https://fts:8449/fts3/ftsmon/#/job/602975dc-d445-11ef-b2f8-0242ac130013", "rubin_butler": 1, "rubin_sidecar": {"id":"0ef08762-b0dd-4a02-8b1c-e09b1544992d","datasetType":{"name":"visitSummary","storageClass":"ExposureCatalog","dimensions":["instrument","visit"]},"dataId":{"dataId":{"instrument":"HSC","visit":328,"band":"y","physical_filter":"HSC-Y"}},"run":"HSC/runs/RC2/w_2023_32/DM-40356/20230814T170253Z"}}, "created_at": "2025-01-16 20:07:16.564403"}
+{"id": "cdcb2ee0cebe471aa36196e28de33db4", "created_at": "datetime.datetime(2023, 11, 29, 20, 19, 3, 275833)", "event_type": "transfer-done", "services": "kafka", "payload": {"activity": "User Subscriptions", "request-id": "a50f0e3ad52a4b1ea7f072a719b5141f", "duration": 8, "checksum-adler": "95935a72", "checksum-md5": "None", "file-size": 57980160, "bytes": 57980160, "guid": "None", "previous-request-id": "None", "protocol": "root", "scope": "test", "name": "srp/data/calexp_HSC_y_HSC-Y_330_1_54_HSC_runs_RC2_w_2023_32_DM-40356_20230812T080035Z.fits", "dataset": "None", "datasetScope": "None", "src-type": "DISK", "src-rse": "XRD1", "src-url": "root://xrd1:1094//rucio/test/srp/data/calexp_HSC_y_HSC-Y_330_1_54_HSC_runs_RC2_w_2023_32_DM-40356_20230812T080035Z.fits", "dst-type": "DISK", "dst-rse": "XRD2", "dst-url": "root://xrd2:1095//rucio/test/srp/data/calexp_HSC_y_HSC-Y_330_1_54_HSC_runs_RC2_w_2023_32_DM-40356_20230812T080035Z.fits", "reason": "", "transfer-endpoint": "https://fts:8446", "transfer-id": "6dfb5aaa-8ef4-11ee-9688-0242ac16000b", "transfer-link": "None", "created_at": "2023-11-29 20:18:04.425276", "submitted_at": "2023-11-29 20:18:16.351754", "started_at": "2023-11-29 20:18:31", "transferred_at": "2023-11-29 20:18:39", "tool-id": "rucio-conveyor", "account": "root", "datatype": "None", "transfer_link": "https://fts:8449/fts3/ftsmon/#/job/6dfb5aaa-8ef4-11ee-9688-0242ac16000b", "rubin_butler": "data_product", "rubin_sidecar": {"id": "00a86e99-7661-4f14-ae0d-93d3d4162e26", "datasetType": {"name": "calexp", "storageClass": "ExposureF", "dimensions": {"names": ["band", "instrument", "detector", "physical_filter", "visit"]}}, "dataId": {"dataId": {"band": "y", "instrument": "HSC", "detector": 1, "physical_filter": "HSC-Y", "visit": 330}}, "run": "HSC/runs/RC2/w_2023_32/DM-40356/20230812T080035Z"}}}
diff --git a/tests/data/raw_message.json b/tests/data/raw_message.json
new file mode 100644
index 0000000..ad3282d
--- /dev/null
+++ b/tests/data/raw_message.json
@@ -0,0 +1 @@
+{"event_type": "transfer-done", "payload": {"activity": "User Subscriptions", "request-id": "d690018f01b44b21a2ef38608faba652", "duration": 4, "checksum-adler": "c61efd01", "checksum-md5": "ce78c8bc805d0e69792d71073a2f5ccf", "file-size": 15073920, "bytes": 15073920, "guid": null, "previous-request-id": null, "protocol": "root", "scope": "test", "name": "something/AT_O_20250113_000001_R00_S00.fits", "dataset": "rubin_dataset", "datasetScope": "test", "src-type": "DISK", "src-rse": "XRD1", "src-url": "root://xrd1:1094//rucio/test/something/AT_O_20250113_000001_R00_S00.fits", "dst-type": "DISK", "dst-rse": "XRD3", "dst-url": "root://xrd3:1096//rucio/test/something/AT_O_20250113_000001_R00_S00.fits", "reason": "", "transfer-endpoint": "https://fts:8446", "transfer-id": "42e545c6-e581-11ef-9ff9-0242ac130005", "transfer-link": null, "created_at": "2025-02-07 18:27:32.947290", "submitted_at": "2025-02-07 18:28:00.769745", "started_at": "2025-02-07 18:28:03", "transferred_at": "2025-02-07 18:28:07", "tool-id": "rucio-conveyor", "account": "root", "datatype": null, "transfer_link": "https://fts:8449/fts3/ftsmon/#/job/42e545c6-e581-11ef-9ff9-0242ac130005", "rubin_butler": "raw_file", "rubin_sidecar": {"id": "9db1123b-b6c9-5678-90a4-6762d31d5712","datasetType":{"name":"raw","storageClass":"Exposure","dimensions":["instrument","detector","exposure"]},"dataId":{"dataId":{"instrument":"LATISS","detector":0,"exposure":2025011300001,"band":"white","day_obs":20250113,"group":"2025-01-13T18:08:19.460","physical_filter":"empty~empty"}},"run":"LATISS/raw/all"}}, "created_at": "2025-02-07 18:28:12.330612"}
diff --git a/tests/etc/ingestd.yml b/tests/etc/ingestd.yml
new file mode 100644
index 0000000..d2a5c82
--- /dev/null
+++ b/tests/etc/ingestd.yml
@@ -0,0 +1,22 @@
+brokers: kafka:9092
+
+group_id: "my_test_group"
+num_messages: 50
+
+butler:
+ repo: /tmp/repo
+
+rses:
+ XRD1:
+ rucio_prefix: root://xrd1:1094//rucio
+ fs_prefix: file:///rucio/disks/xrd1/rucio
+ XRD2:
+ rucio_prefix: root://xrd2:1095//rucio
+ fs_prefix: file:///rucio/disks/xrd2/rucio
+ XRD3:
+ rucio_prefix: root://xrd3:1096//rucio
+ fs_prefix: file:///rucio/disks/xrd3/rucio
+ XRD4:
+ rucio_prefix: root://xrd4:1097//rucio
+ fs_prefix: file:///rucio/disks/xrd4/rucio
+
diff --git a/tests/test_message.py b/tests/test_message.py
index a3b2bcf..338c7d0 100644
--- a/tests/test_message.py
+++ b/tests/test_message.py
@@ -22,6 +22,7 @@
import os.path
import lsst.utils.tests
+from lsst.ctrl.ingestd.entries.dataType import DataType
from lsst.ctrl.ingestd.message import Message
@@ -55,7 +56,7 @@ def testAttributes(self):
"RC2_w_2023_32_DM-40356_20230814T170253Z.fits"
),
)
- self.assertEqual(self.msg.get_rubin_butler(), 1)
+ self.assertEqual(self.msg.get_rubin_butler(), DataType.DATA_PRODUCT)
sidecar_str = self.msg.get_rubin_sidecar_str()
print(f"{sidecar_str=}")
self.assertEqual(
diff --git a/tests/test_rseButler.py b/tests/test_rseButler.py
index 79ab8f7..7fae2a5 100644
--- a/tests/test_rseButler.py
+++ b/tests/test_rseButler.py
@@ -23,6 +23,9 @@
import tempfile
import lsst.utils.tests
+from lsst.ctrl.ingestd.config import Config
+from lsst.ctrl.ingestd.entries.entryFactory import EntryFactory
+from lsst.ctrl.ingestd.mapper import Mapper
from lsst.ctrl.ingestd.message import Message
from lsst.ctrl.ingestd.rseButler import RseButler
from lsst.daf.butler import Butler
@@ -38,6 +41,7 @@ def value(self) -> str:
class RseButlerTestCase(lsst.utils.tests.TestCase):
+
def testRseButler(self):
json_name = "message.json"
testdir = os.path.abspath(os.path.dirname(__file__))
@@ -61,10 +65,41 @@ def testRseButler(self):
instr.register(butler.butler.registry)
butler.butler.import_(filename=prep_file)
- with tempfile.NamedTemporaryFile() as temp_file:
- sidecar_str = self.msg.get_rubin_sidecar_str()
- fds = butler.create_entry(temp_file.name, sidecar_str)
- butler.ingest([fds])
+ config_file = os.path.join(testdir, "etc", "ingestd.yml")
+ config = Config(config_file)
+ mapper = Mapper(config.get_rses())
+
+ event_factory = EntryFactory(butler, mapper)
+ entry = event_factory.create_entry(self.msg)
+ butler.ingest([entry])
+
+ def testRseButler2(self):
+
+ json_name = "raw_message.json"
+ testdir = os.path.abspath(os.path.dirname(__file__))
+ json_file = os.path.join(testdir, "data", json_name)
+
+ with open(json_file) as f:
+ fake_data = f.read()
+
+ fake_msg = FakeKafkaMessage(fake_data)
+ self.msg = Message(fake_msg)
+
+ self.repo_dir = tempfile.mkdtemp()
+ Butler.makeRepo(self.repo_dir)
+
+ butler = RseButler(self.repo_dir)
+ instr = Instrument.from_string("lsst.obs.lsst.Latiss")
+
+ instr.register(butler.butler.registry)
+
+ config_file = os.path.join(testdir, "etc", "ingestd.yml")
+ config = Config(config_file)
+ mapper = Mapper(config.get_rses())
+
+ event_factory = EntryFactory(butler, mapper)
+ entry = event_factory.create_entry(self.msg)
+ butler.ingest([entry])
class MemoryTester(lsst.utils.tests.MemoryTestCase):