diff --git a/.gitignore b/.gitignore index 436c53b..d21b0ff 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,4 @@ settings.json local/ localcache/ *pyc -build/ \ No newline at end of file +build/ diff --git a/dirconfig b/dirconfig index 040dd31..da48dbb 100644 --- a/dirconfig +++ b/dirconfig @@ -1,4 +1,6 @@ # Directory containing new flights to be pushed to Elasticsearch -/home/badc/software/datasets/flight-finder/add_records/ +/home/users/icircu/flight-pipeline/add/ # Directory for moving written flights - write DELETE to remove pushed flight records from the local system after pushing. -/home/badc/software/datasets/flight-finder/stac-flightfinder-items/ +/home/users/icircu/test-flights/ +# Logging File +/home/users/icircu/test-flights/logging diff --git a/flight_update.py b/flight_update.py index e3e26c1..2a5e868 100644 --- a/flight_update.py +++ b/flight_update.py @@ -8,6 +8,7 @@ import importlib import logging +from flightpipe.logger import logger import argparse @@ -18,7 +19,6 @@ settings_file = 'settings.json' - def openConfig(): """ Function to open configuration file and initialise paths to relevant directories. @@ -31,14 +31,15 @@ def openConfig(): if VERB: print('> (1/6) Opening Config File') - logging.info('> (1/6) Opening Config File') + logger.info('> (1/6) Opening Config File') f = open('dirconfig','r') content = f.readlines() f.close() try: - return content[1].replace('\n',''), content[3].replace('\n',''), content[5].replace('\n','') + return content[1].replace('\n',''), content[3].replace('\n','') except IndexError: + logger.error('One or both paths missing from the dirconfig file') print('Error: One or both paths missing from the dirconfig file - please fill these in') return '','' @@ -73,7 +74,7 @@ def addFlights(rootdir, archive, repush=False): # ES client to determine array of ids if VERB: print('> (2/6) Setting up ES Flight Client') - logging.info('> (2/6) Setting up ES Flight Client') + logger.info('> (2/6) Setting up ES Flight Client') if repush: files_list = os.listdir(archive) fclient = ESFlightClient(archive, settings_file) @@ -87,21 +88,21 @@ def addFlights(rootdir, archive, repush=False): # Push new flights to index if VERB: print('> (4/6) Identified {} flights'.format(len(checked_list))) - logging.info('> (4/6) Identified {} flights'.format(len(checked_list))) + logger.info('> (4/6) Identified {} flights'.format(len(checked_list))) if len(checked_list) > 0: fclient.push_flights(checked_list) if VERB: print('> (5/6) Pushed flights to ES Index') - logging.info('> (5/6) Pushed flights to ES Index') + logger.info('> (5/6) Pushed flights to ES Index') if not repush: moveOldFiles(rootdir, archive, checked_list) if VERB: print('> (6/6) Removed local files from push directory') - logging.info('> (6/6) Removed local files from push directory') + logger.info('> (6/6) Removed local files from push directory') else: if VERB: print('> Exiting flight pipeline') - logging.info('> Exiting flight pipeline') + logger.info('> Exiting flight pipeline') # Move old records into an archive directory @@ -136,27 +137,19 @@ def main(): REPUSH = False if args.mode == 'add': - root, archive, log_file = openConfig() - - if log_file == '': - print("Error: Please fill in the third directory in dirconfig file") - - # Set up logging config - logging.basicConfig( - level=logging.DEBUG, # Capture all levels - format='%(asctime)s - %(levelname)s - %(message)s', # timestamp, level, message - handlers=[ - logging.FileHandler(log_file), # Write output to file - logging.StreamHandler() # If logging to console - ] - ) + logger.debug("Mode set to add") + root, archive = openConfig() + + logger.debug("Root directory set to %s", root) + logger.debug("Archive set to %s", archive) + if archive == '': print('Error: Please fill in second directory in dirconfig file') - logging.error("Error: Second directory in dirconfig file missing") + logger.error("Second directory in dirconfig file missing") sys.exit() elif root == '': print('Error: Please fill in first directory in dirconfig file') - logging.error("Error: First directory in dirconfig file missing") + logger.error("First directory in dirconfig file missing") sys.exit() else: addFlights(root, archive, repush=REPUSH) @@ -171,14 +164,18 @@ def main(): """ elif args.mode == 'update': + logger.debug("Mode set to update") updateFlights(args.update) elif args.mode == 'add_moles': + logger.debug("Mode set to add moles") updateFlights('moles') elif args.mode == 'reindex': + logger.debug("Mode set to reindex") reindex(args.new_index) else: + logger.error("Mode unrecognised - ", args.mode) print('Error: Mode unrecognised - ', args.mode) sys.exit() diff --git a/flightpipe/flight_client.py b/flightpipe/flight_client.py index 40b3510..ab5f701 100644 --- a/flightpipe/flight_client.py +++ b/flightpipe/flight_client.py @@ -11,7 +11,10 @@ from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk -from ceda_elastic_py import SimpleClient, gen_id +from flightpipe.logger import logger + +from flightpipe.simple_client import SimpleClient, gen_id +from flightpipe.logger import setup_logging from datetime import datetime @@ -22,6 +25,7 @@ urllib3.disable_warnings() def resolve_link(path, ): + logger.debug("Debug: Resolving link for path %s", path) mpath = str(path) uuid = None @@ -32,11 +36,13 @@ def resolve_link(path, ): r = json.loads(resp) if r['results']: uuid = r['results'][0]['uuid'] + logger.debug("Debug: Reslolving link, found uuid %s", str(uuid)) except: print(f'Unsuccessful link retrieval for {path} - proceeding without') path = '/'.join(path.split('/')[:-1]) if not uuid: + logger.error("Error: Recursive path search failed for %s", mpath) print(f'Recursive path search failed for: {mpath}') return uuid @@ -44,14 +50,17 @@ def resolve_link(path, ): class ESFlightClient(SimpleClient): """ Connects to an elasticsearch instance and exports the - documents to elasticsearch.""" + documents to elasticsearch. + """ def __init__(self, rootdir, es_config='settings.json'): self.rootdir = rootdir + logger.info("Info: Initialising ES Flight Client") - super().__init__("stac-flightfinder-items", es_config=es_config) + super().__init__("stac-flightfinder-items-test", es_config=es_config) with open('stac_template.json') as f: + logger.info("Info: Reading stac templace JSON file") self.required_keys = json.load(f).keys() def push_flights(self, file_list): @@ -63,10 +72,13 @@ def push_flights(self, file_list): elif isinstance(file_list[0], dict): flight_list = file_list else: + logger.error("Error: Flight file not found %s", str(file_list[0])) raise FileNotFoundError(file_list[0]) + logger.info("Info: Flights to be pushed %s", str(flight_list)) self.push_records(flight_list) def preprocess_records(self, file_list): + logger.debug("Debug: Processing following records - %s", file_list) def set_defaults(refs): collection = refs['collection'] @@ -116,6 +128,7 @@ def set_defaults(refs): if rq not in source: missing.append(rq) if len(missing) > 0: + logger.error("Error: File is missing entries - %s", str(missing)) raise TypeError(f"File {file} is missing entries:{missing}") source['last_update'] = datetime.strftime(datetime.now(),'%Y-%m-%d %H:%M:%S') @@ -126,6 +139,7 @@ def set_defaults(refs): return records def obtain_field(self, id, fieldnames): + logger.info("Info: Performing query to obtain the following fields: %s", str(fieldnames)) search = { "_source": fieldnames, "query": { @@ -140,11 +154,14 @@ def obtain_field(self, id, fieldnames): body=search) try: + logger.info("Info: Found following fields: %s", str(resp['hits']['hits'][0])) return resp['hits']['hits'][0] except IndexError: # No entry found + logger.error("Error: No entry found.") return None def add_field(self, id, data, fieldname): + logger.debug("Debug: Update mapping for id - %s", str(id)) # Update mapping self.es.update(index=self.index, doc_type='_doc', id=id, body={'doc':{fieldname:data}}) @@ -227,6 +244,7 @@ def check_ptcode(self, ptcode): return 100 def reindex(self, new_index): + logger.debug("Debug: Reindex for source %s and destination %s", self.index, new_index) self.es.reindex({ "source":{ diff --git a/flightpipe/logger.py b/flightpipe/logger.py new file mode 100644 index 0000000..9edade5 --- /dev/null +++ b/flightpipe/logger.py @@ -0,0 +1,50 @@ +import logging + +def setup_logging(enable_logging=True, console_logging=True): + """ + Sets up logging configuration. If `enable_logging` is False, no logging will occur. + + :param enable_logging: Flag to enable/disable logging. + """ + file = "dirconfig" + + with open(file) as f: # 'r' is default if not specified. + content = [r.strip() for r in f.readlines()] # Removes the '\n' from all lines + + log_file = content[5].replace('\n','') + + if log_file == '': + print("Error: Please fill in the third directory in dirconfig file") + + handlers = [ + logging.FileHandler(log_file), # Write output to file + ] + + if console_logging: + handlers.append(logging.StreamHandler()) # Logs to the console if enabled + + + if enable_logging: + logging.basicConfig( + level=logging.DEBUG, # Capture all levels + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=handlers + ) + else: + # Disable logging by setting a null handler + logging.basicConfig(level=logging.CRITICAL) + #NOTSET for no alerts at all + + +enable_logging = True + +# Set up logging with a flag (True to enable logging, False to disable logging) +setup_logging(enable_logging) # Change to False to disable logging + +logger = logging.getLogger(__name__) + + + +__author__ = "Ioana Circu" +__contact__ = "ioana.circu@stfc.ac.uk" +__copyright__ = "Copyright 2025 United Kingdom Research and Innovation" diff --git a/flightpipe/simple_client.py b/flightpipe/simple_client.py new file mode 100644 index 0000000..17f63a2 --- /dev/null +++ b/flightpipe/simple_client.py @@ -0,0 +1,132 @@ +import json +import os + +from elasticsearch import Elasticsearch +from elasticsearch.helpers import bulk + +def gen_id(): + import random + chars = [*'0123456789abcdefghijklmnopqrstuvwxyz'] + xid = '' + for i in range(39): + j = random.randint(0,len(chars)-1) + xid += chars[j] + return xid + # Probability of id reuse is negligible. + +default_rec = { + "_index": None, + "_type": "_doc", + "_id":None, + "_score": 1.0, + "_source":None +} + +settings_default = { + "hosts": [ + "https://es10.ceda.ac.uk:9200" + ], + "headers": { + "x-api-key": "" + }, + "verify_certs": 0, + "ssl_show_warn": 0 +} + +def create_settings(es_config='es_settings.json'): + with open(es_config,'w') as f: + f.write(json.dumps(settings_default)) + + +class ResponseException(Exception): + """ + Response Exception - Elasticsearch did not return an object with any 'hits'. This indicates an error, + as even with 0 hits returned there should still be an empty 'hits' list here. + """ + def __init__(self, keys=None): + self.message = f'Received ({keys}) keys from response - No "hits" received' + super().__init__(self.message) + +class SimpleClient: + """ + Simple Elasticsearch-Python client for bulk operations + """ + def __init__(self,index, es_config=None): + """ + Initialise client, pull credentials from a configuration file if present + and create an underlying client within this class. + """ + self.index = index + + if isinstance(es_config,str): + if not os.path.isfile(es_config): + raise FileNotFoundError(f'File {es_config} not present, no settings loaded.') + with open(es_config) as f: + connection_kwargs = json.load(f) + else: + connection_kwargs = es_config + + self.es = Elasticsearch(**connection_kwargs) + + def preprocess_records(self, records): + """ + Blank preprocessor method - override with preprocessing steps for specific + client use-cases. + """ + return records + + def process_records(self, records): + """ + Final processing step to add ids to new records if they are not already present. + """ + for r in records: + rec = self.establish_id(r) + yield rec + + def get_size(self): + md = self.es.cat.count(index=self.index, params={"format": "json"})[0] + return md['count'] + + def establish_id(self, record): + """ + Add _id and other required fields before pushing new entries. + """ + if '_id' not in record: + record = { + "_index": self.index, + "_type": "_doc", + "_id":gen_id(), + "_score": 1.0, + "_source":dict(record) + } + return record + + + def push_records(self, records): + """ + Push records using bulk helper tool. + """ + records = self.preprocess_records(records) + bulk(self.es, self.process_records(records)) + + def pull_records(self): + """ + Pull all records up to 10,000 and return all hits. + """ + search = { + "size":10000, + "query": { + "match_all":{} + } + } + + resp = self.es.search( + index=self.index, + body=search) + + try: + return resp['hits']['hits'] + except IndexError: + raise ResponseException(keys=resp.keys()) + + \ No newline at end of file