Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tickets/DM-46456 #7

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions python/lsst/ctrl/ingestd/entries/dataFile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# This file is part of ctrl_ingestd
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import logging
from lsst.ctrl.ingestd.entries.entry import Entry
from lsst.daf.butler import DatasetRef, FileDataset

LOGGER = logging.getLogger(__name__)


class DataFile(Entry):
"""Entry representing a raw file to ingest via RawIngestTask

Parameters
----------
butler: Butler
Butler associated with this entry
message: Message
Message representing data to ingest
mapper: Mapper
Mapping of RSE entry to Butler repo location
"""

def __init__(self, butler, message, mapper):
super().__init__(butler, message, mapper)
self._populate()

def _populate(self):
# create an object that's ingestible by the butler
self.fds = None
try:
self.fds = self._create_file_dataset(self.file_to_ingest, self.sidecar)
except Exception as e:
LOGGER.debug(e)

def _create_file_dataset(self, butler_file: str, sidecar: dict) -> FileDataset:
"""Create a FileDatset with sidecar information

Parameters
----------
butler_file: `str`
full uri to butler file location
sidecar: `dict`
dictionary of the 'sidecar' metadata
"""
ref = DatasetRef.from_json(sidecar, registry=self.butler.registry)
fds = FileDataset(butler_file, ref)
return fds

def get_data(self):
"""Get data associated with this type of object
Returns
-------
fds: FileDataset
FileDataset representing this DataProduct
"""
return self.fds
42 changes: 42 additions & 0 deletions python/lsst/ctrl/ingestd/entries/dataProduct.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# This file is part of ctrl_ingestd
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import logging
from lsst.ctrl.ingestd.entries.dataFile import DataFile

LOGGER = logging.getLogger(__name__)


class DataProduct(DataFile):
"""Entry representing a data product to put into the butler

Parameters
----------
butler: Butler
Butler associated with this entry
message: Message
Message representing data to ingest
mapper: Mapper
Mapping of RSE entry to Butler repo location
"""

def __init__(self, butler, message, mapper):
super().__init__(butler, message, mapper)
24 changes: 24 additions & 0 deletions python/lsst/ctrl/ingestd/entries/dataType.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# This file is part of ctrl_ingestd
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

class DataType:
DATA_PRODUCT = "data_product"
RAW_FILE = "raw_file"
63 changes: 63 additions & 0 deletions python/lsst/ctrl/ingestd/entries/entry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# This file is part of ctrl_ingestd
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import logging
LOGGER = logging.getLogger(__name__)


class Entry:
"""Generic representation of data to put into the Butler

Parameters
----------
butler: Butler
Butler associated with this entry
message: Message
Message representing data to ingest
mapper: Mapper
Mapping of RSE entry to Butler repo location
"""

def __init__(self, butler, message, mapper=None):
self.butler = butler
self.message = message
self.mapper = mapper

self.data_type = message.get_rubin_butler()
self.sidecar = message.get_rubin_sidecar_dict()
LOGGER.debug(f"{message=} {self.data_type=} {self.sidecar=}")
if self.data_type is None:
raise RuntimeError(f"shouldn't have gotten this: {message}")

# Rewrite the Rucio URL to actual file location
dst_url = self.message.get_dst_url()
self.file_to_ingest = self.mapper.rewrite(self.message.get_dst_rse(), dst_url)

if self.file_to_ingest == dst_url:
# Avoid E501
m = f"failed to map {self.file_to_ingest}; check config file for incorrect mapping"
raise RuntimeError(m)

def get_data_type(self):
return self.data_type

def get_data(self):
raise RuntimeError("Shouldn't call Entry.get_data directly")
64 changes: 64 additions & 0 deletions python/lsst/ctrl/ingestd/entries/entryFactory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# This file is part of ctrl_ingestd
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from lsst.ctrl.ingestd.entries.dataType import DataType
from lsst.ctrl.ingestd.entries.entry import Entry
from lsst.ctrl.ingestd.entries.dataProduct import DataProduct
from lsst.ctrl.ingestd.entries.rawFile import RawFile


class EntryFactory:
"""Generic representation of data to put into the Butler

Parameters
----------
rse_butler: RseButler
Object representing a Butler for an RSE
mapper: Mapper
mapper between rse and prefix associated with it
"""
def __init__(self, rse_butler, mapper):
self.rse_butler = rse_butler
self.butler = self.rse_butler.butler
self.mapper = mapper

def create_entry(self, message) -> Entry:
"""Create an Entry object

Parameters
----------
message: Message
Object representing a Kafka message

Returns
-------
entry: Entry
An object presented by base class Entry
"""
data_type = message.get_rubin_butler()

match data_type:
case DataType.DATA_PRODUCT:
return DataProduct(self.butler, message, self.mapper)
case DataType.RAW_FILE:
return RawFile(self.butler, message, self.mapper)
case _:
raise ValueError(f"Unknown rubin_butler type: {data_type}")
36 changes: 36 additions & 0 deletions python/lsst/ctrl/ingestd/entries/rawFile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# This file is part of ctrl_ingestd
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import logging
from lsst.ctrl.ingestd.entries.dataFile import DataFile

LOGGER = logging.getLogger(__name__)


class RawFile(DataFile):
"""Entry representing a raw file to ingest via RawIngestTask

Parameters
----------
"""

def __init__(self, butler, message, mapper):
super().__init__(butler, message, mapper)
34 changes: 6 additions & 28 deletions python/lsst/ctrl/ingestd/ingestd.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from lsst.ctrl.ingestd.mapper import Mapper
from lsst.ctrl.ingestd.message import Message
from lsst.ctrl.ingestd.rseButler import RseButler
from lsst.ctrl.ingestd.entries.entryFactory import EntryFactory

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -65,7 +66,8 @@ def __init__(self):
self.consumer = Consumer(conf)
self.consumer.subscribe(topics)

self.butler = RseButler(config.get_repo())
self.rse_butler = RseButler(config.get_repo())
self.entry_factory = EntryFactory(self.rse_butler, self.mapper)

def run(self):
"""continually process messages"""
Expand All @@ -78,7 +80,7 @@ def process(self):
# read up to self.num_messages, with a timeout of self.timeout
msgs = self.consumer.consume(num_messages=self.num_messages, timeout=self.timeout)
# just return if there are no messages
if msgs is None:
if not msgs:
return

# cycle through all the messages, rewriting the Rucio URL
Expand All @@ -92,35 +94,11 @@ def process(self):
logging.info(msg.value())
logging.info(e)
continue
rubin_butler = message.get_rubin_butler()
sidecar = message.get_rubin_sidecar_dict()
logging.debug(f"{message=} {rubin_butler=} {sidecar=}")

if rubin_butler is None:
logging.warning("shouldn't have gotten this message: %s" % message)
continue

# Rewrite the Rucio URL to actual file location
dst_url = message.get_dst_url()
file_to_ingest = self.mapper.rewrite(message.get_dst_rse(), dst_url)

if file_to_ingest == dst_url:
logging.warn(
f"failed to map {file_to_ingest}; check {self.config_file} for incorrect mapping"
)
continue

# create an object that's ingestible by the butler
# and add it to the list
try:
entry = self.butler.create_entry(file_to_ingest, sidecar)
except Exception as e:
logging.info(e)
continue
entry = self.entry_factory.create_entry(message)
entries.append(entry)
# if we've got anything in the list, try and ingest it.
if len(entries) > 0:
self.butler.ingest(entries)
self.rse_butler.ingest(entries)


if __name__ == "__main__":
Expand Down
Loading
Loading