From 7565ee71749cb3eb6687431d0b2cb7ff73f740c3 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 6 Jun 2024 14:49:10 -0400 Subject: [PATCH 01/11] Alter worker entrypoints to optimize output write. Adjusting to account for slowness of writes for output when output dataset directory is actually a mounted object store bucket (the default at the moment), such that we work in container-specific dirs and archive things before writing to such a place; also adding TODO comments to come back and add more general behavior later, when output datasets may be somewhere else. --- docker/main/ngen/funcs.sh | 91 +++++++++++++++++++++++++ docker/main/ngen/ngen_cal_entrypoint.sh | 61 +++++++++++------ docker/main/ngen/ngen_entrypoint.sh | 53 ++++++++++++-- 3 files changed, 179 insertions(+), 26 deletions(-) diff --git a/docker/main/ngen/funcs.sh b/docker/main/ngen/funcs.sh index 14bc4bd62..0944a51a0 100644 --- a/docker/main/ngen/funcs.sh +++ b/docker/main/ngen/funcs.sh @@ -212,4 +212,95 @@ run_secondary_mpi_ssh_worker_node() while [ -e ${RUN_SENTINEL} ] && kill -s 0 "${_SSH_D_PID}" 2>/dev/null ; do sleep 5 done +} + +tar_and_copy() +{ + # If $1 is "--dry-run" then just do sanity checks without tarring or copying, then shift args + # If $1 is "--compress", then indicate that tar should be gzipped, then shift args + + # $1 is source directory containing contents to archive + # $2 is the name of/path to the produced tar archive + # $3 is the location to copy to + + if [ "${1:?No args given to tar_and_copy}" == "--dry-run" ]; then + local _DRY_RUN="true" + shift + fi + + if [ "${1:?No contents directory given to tar_and_copy}" == "--compress" ]; then + local _TAR_EXTRA_ARGS="-z" + shift + fi + + local _CONTENTS_DIR="${1:?No contents directory given to tar_and_copy}" + local _TAR_FILE="${2:?No archive file given to tar_and_copy}" + local _DEST_DIR="${3:?No copy destination directory given to tar_and_copy}" + + if [ ! -e "${_CONTENTS_DIR}" ]; then + >&2 echo "$(print_date) Error: tar_and_copy contents directory '${_CONTENTS_DIR}' does not exist!" + exit 1 + elif [ ! -d "${_CONTENTS_DIR}" ]; then + >&2 echo "$(print_date) Error: tar_and_copy contents directory '${_CONTENTS_DIR}' exists but is not a directory!" + exit 1 + elif [ ! -e "${_DEST_DIR}" ]; then + >&2 echo "$(print_date) Error: tar_and_copy copy destination directory '${_DEST_DIR}' does not exist!" + exit 1 + elif [ ! -e "${_DEST_DIR}" ]; then + >&2 echo "$(print_date) Error: tar_and_copy copy destination directory '${_DEST_DIR}' exist but is not a directory!" + exit 1 + elif [ -e "${_TAR_FILE}" ]; then + >&2 echo "$(print_date) Error: tar_and_copy archive file '${_TAR_FILE}' already exists!" + exit 1 + fi + + if [ "${_DRY_RUN:-}" == "true" ]; then + return 0 + fi + + tar -c ${_TAR_EXTRA_ARGS:-} -f "${_DEST_DIR}/${_TAR_FILE}" -C "${_CONTENTS_DIR}" . + #cp -a "${_TAR_FILE}" "${_DEST_DIR}/." + #rm "${_TAR_FILE}" +} + +gather_output() { + echo "$(print_date) Gather from remote worker host ${JOB_OUTPUT_WRITE_DIR:?Job temp output dir not defined} dirs" + for i in $(echo "${MPI_HOST_STRING}" | sed 's/,/ /g'); do + _HOST_NAME=$(echo "${i}" | awk -F: '{print $1}') + if [ "$(hostname)" == "${_HOST_NAME}" ]; then + continue + fi + scp -q -r ${_HOST_NAME}:${JOB_OUTPUT_WRITE_DIR}/ ${JOB_OUTPUT_WRITE_DIR}/. & + done + for p in $(jobs -p); do + wait ${p} + _R=$? + if [ ${_R} -ne 0 ]; then + echo "$(print_date) Error: remote copying of output exited with error ${_R}" + exit ${_R} + fi + done +} + +move_output_to_dataset() +{ + # $1 output directory + # $2 dataset directory + + if [ ! -d ${1:?No output directory given for copying to dataset} ]; then + >&2 echo "$(print_date) Error: cannot move output from non-directory path '${1}' to output dataset!" + exit 1 + elif [ ! -d ${2:?No output dataset directory given for copying} ]; then + >&2 echo "$(print_date) Error: cannot move output to non-directory path '${1}' for output dataset!" + exit 1 + fi + + if [ $(ls ${1} | grep '.csv' | wc -l) -gt 0 ]; then + echo "$(print_date) Archiving and copying output CSVs to output dataset" + tar_and_copy ${1} job-${JOB_ID:?}-output.tar ${2} + else + echo "$(print_date) Copying output file(s) to output dataset" + cp -a ${1}/. ${2}/. + fi + rm -rf ${1} } \ No newline at end of file diff --git a/docker/main/ngen/ngen_cal_entrypoint.sh b/docker/main/ngen/ngen_cal_entrypoint.sh index cac0c6ac1..14e4e60f4 100755 --- a/docker/main/ngen/ngen_cal_entrypoint.sh +++ b/docker/main/ngen/ngen_cal_entrypoint.sh @@ -4,45 +4,48 @@ while [ ${#} -gt 0 ]; do case "${1}" in --config-dataset) - CONFIG_DATASET_NAME="${2:?}" + declare -x CONFIG_DATASET_NAME="${2:?}" shift ;; --host-string) - MPI_HOST_STRING="${2:?}" + declare -x MPI_HOST_STRING="${2:?}" shift ;; --hydrofabric-dataset) - HYDROFABRIC_DATASET_NAME="${2:?}" + declare -x HYDROFABRIC_DATASET_NAME="${2:?}" shift ;; --job-id) - JOB_ID="${2:?}" + declare -x JOB_ID="${2:?}" shift ;; --node-count) - MPI_NODE_COUNT="${2:?}" + declare -x MPI_NODE_COUNT="${2:?}" shift ;; --output-dataset) - OUTPUT_DATASET_NAME="${2:?}" + declare -x OUTPUT_DATASET_NAME="${2:?}" shift ;; --partition-dataset) - PARTITION_DATASET_NAME="${2:?}" + declare -x PARTITION_DATASET_NAME="${2:?}" shift ;; --worker-index) - WORKER_INDEX="${2:?}" + declare -x WORKER_INDEX="${2:?}" shift ;; --calibration-config-file) - CALIBRATION_CONFIG_BASENAME="${2:?}" + declare -x CALIBRATION_CONFIG_BASENAME="${2:?}" shift ;; esac shift done +# TODO: (later) in both ngen and ngen-cal entrypoints, add controls for whether this is temp dir or output dataset dir +declare -x JOB_OUTPUT_WRITE_DIR="/tmp/job_output" + # Get some universally applicable functions and constants source ./funcs.sh @@ -50,8 +53,16 @@ ngen_sanity_checks_and_derived_init init_script_mpi_vars init_ngen_executable_paths -# Move to the output dataset mounted directory -cd ${OUTPUT_DATASET_DIR:?Output dataset directory not defined} +# Move to the output write directory +# TODO: (later) in both ngen and ngen-cal entrypoints, control whether this is needed, based on if write dir is output dataset dir +#cd ${OUTPUT_DATASET_DIR:?Output dataset directory not defined} +mkdir ${JOB_OUTPUT_WRITE_DIR:?} +chown ${MPI_USER}:${MPI_USER} ${JOB_OUTPUT_WRITE_DIR} +cd ${JOB_OUTPUT_WRITE_DIR} +#Needed for routing +if [ ! -e /dmod/datasets/linked_job_output ]; then + ln -s ${JOB_OUTPUT_WRITE_DIR} /dmod/datasets/linked_job_output +fi start_calibration() { # Start ngen calibration @@ -61,17 +72,21 @@ start_calibration() { echo "$(print_date) Starting ngen calibration with serial ngen execution" fi - # Find and use copy of config in output dataset + # Find calibration config, then copy to output dataset and use that if [ -n "${CALIBRATION_CONFIG_BASENAME:-}" ]; then - CALIBRATION_CONFIG_FILE=$(find ${OUTPUT_DATASET_DIR:?} -type f -name "${CALIBRATION_CONFIG_BASENAME}" -maxdepth 1 | head -1) + #CALIBRATION_CONFIG_FILE=$(find ${OUTPUT_DATASET_DIR:?} -type f -name "${CALIBRATION_CONFIG_BASENAME}" -maxdepth 1 | head -1) + _ORIG_CAL_CONFIG_FILE=$(find ${CONFIG_DATASET_DIR:?} -type f -name "${CALIBRATION_CONFIG_BASENAME}" -maxdepth 1 | head -1) else - CALIBRATION_CONFIG_FILE=$(find ${OUTPUT_DATASET_DIR:?} -type f -iname "*.yaml" -o -iname "*.yml" -maxdepth 1 | head -1) + #CALIBRATION_CONFIG_FILE=$(find ${OUTPUT_DATASET_DIR:?} -type f -iname "*.yaml" -o -iname "*.yml" -maxdepth 1 | head -1) + _ORIG_CAL_CONFIG_FILE=$(find ${CONFIG_DATASET_DIR:?} -type f -iname "*.yaml" -o -iname "*.yml" -maxdepth 1 | head -1) fi - - if [ -z "${CALIBRATION_CONFIG_FILE}" ]; then + if [ -z "${_ORIG_CAL_CONFIG_FILE}" ]; then echo "Error: NGEN calibration yaml file not found" 2>&1 exit 1 fi + cp -a ${_ORIG_CAL_CONFIG_FILE:?} ${OUTPUT_DATASET_DIR:?}/. + CALIBRATION_CONFIG_FILE="${OUTPUT_DATASET_DIR:?}/$(basename ${_ORIG_CAL_CONFIG_FILE})" + python3 -m ngen.cal "${CALIBRATION_CONFIG_FILE}" #Capture the return value to use as service exit code @@ -79,6 +94,9 @@ start_calibration() { echo "$(print_date) ngen calibration finished with return value: ${NGEN_RETURN}" + # TODO: (later) make sure outputs are handled properly, and that eventually we support toggling whether written to + # TODO: output dataset dir directly or somewhere else + # Exit with the model's exit code return ${NGEN_RETURN} } @@ -89,11 +107,16 @@ if [ "${WORKER_INDEX:-0}" = "0" ]; then # This will only have an effect when running with multiple MPI nodes, so its safe to have even in serial exec trap close_remote_workers EXIT # Have "main" (potentially only) worker copy config files to output dataset for record keeping - # TODO: perform copy of configs to output dataset outside of image (in service) for better performance - cp -a ${CONFIG_DATASET_DIR:?Config dataset directory not defined}/. ${OUTPUT_DATASET_DIR:?} + # TODO: (later) in ngen and ngen-cal entrypoints, consider adding controls for whether this is done or a simpler + # TODO: 'cp' call, based on whether we write directly to output dataset dir or some other output write dir + # Do a dry run first to sanity check directory and fail if needed before backgrounding process + tar_and_copy --dry-run --compress ${CONFIG_DATASET_DIR:?Config dataset directory not defined} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} + # Then actually run the archive and copy function in the background + tar_and_copy --compress ${CONFIG_DATASET_DIR:?} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} & + _CONFIG_COPY_PROC=$! + # If there is partitioning, which implies multi-processing job ... if [ -n "${PARTITION_DATASET_DIR:-}" ]; then # Include partition config dataset too if appropriate - # TODO: perform copy of configs to output dataset outside of image (in service) for better performance cp -a ${PARTITION_DATASET_DIR}/. ${OUTPUT_DATASET_DIR:?} fi diff --git a/docker/main/ngen/ngen_entrypoint.sh b/docker/main/ngen/ngen_entrypoint.sh index 8aa2a32c6..f16707a7e 100755 --- a/docker/main/ngen/ngen_entrypoint.sh +++ b/docker/main/ngen/ngen_entrypoint.sh @@ -39,6 +39,9 @@ while [ ${#} -gt 0 ]; do shift done +# TODO: (later) in both ngen and ngen-cal entrypoints, add controls for whether this is temp dir or output dataset dir +declare -x JOB_OUTPUT_WRITE_DIR="/tmp/job_output" + # Get some universally applicable functions and constants source /ngen/funcs.sh @@ -46,11 +49,15 @@ ngen_sanity_checks_and_derived_init init_script_mpi_vars init_ngen_executable_paths -# Move to the output dataset mounted directory -cd ${OUTPUT_DATASET_DIR:?Output dataset directory not defined} +# Move to the output write directory +# TODO: (later) in both ngen and ngen-cal entrypoints, control whether this is needed, based on if write dir is output dataset dir +#cd ${OUTPUT_DATASET_DIR:?Output dataset directory not defined} +mkdir ${JOB_OUTPUT_WRITE_DIR:?} +chown ${MPI_USER}:${MPI_USER} ${JOB_OUTPUT_WRITE_DIR} +cd ${JOB_OUTPUT_WRITE_DIR} #Needed for routing if [ ! -e /dmod/datasets/linked_job_output ]; then - ln -s $(pwd) /dmod/datasets/linked_job_output + ln -s ${JOB_OUTPUT_WRITE_DIR} /dmod/datasets/linked_job_output fi # We can allow worker index to not be supplied when executing serially @@ -59,15 +66,47 @@ if [ "${WORKER_INDEX:-0}" = "0" ]; then # This will only have an effect when running with multiple MPI nodes, so its safe to have even in serial exec trap close_remote_workers EXIT # Have "main" (potentially only) worker copy config files to output dataset for record keeping - # TODO: perform copy of configs to output dataset outside of image (in service) for better performance - cp -a ${CONFIG_DATASET_DIR:?Config dataset directory not defined}/. ${OUTPUT_DATASET_DIR:?} + # TODO: (later) in ngen and ngen-cal entrypoints, consider adding controls for whether this is done or a simpler + # TODO: 'cp' call, based on whether we write directly to output dataset dir or some other output write dir + # Do a dry run first to sanity check directory and fail if needed before backgrounding process + tar_and_copy --dry-run --compress ${CONFIG_DATASET_DIR:?Config dataset directory not defined} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} + # Then actually run the archive and copy function in the background + tar_and_copy --compress ${CONFIG_DATASET_DIR:?} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} & + _CONFIG_COPY_PROC=$! + # If there is partitioning, which implies multi-processing job ... if [ -n "${PARTITION_DATASET_DIR:-}" ]; then - # Include partition config dataset too if appropriate - # TODO: perform copy of configs to output dataset outside of image (in service) for better performance + # Include partition config dataset too if appropriate, though for simplicity, just copy directly cp -a ${PARTITION_DATASET_DIR}/. ${OUTPUT_DATASET_DIR:?} + # Then run execution exec_main_worker_ngen_run + + # TODO: (later) in ngen and ngen-cal entrypoints, add controls for whether this is done base on whether we + # TODO: are writing directly to output dataset dir or some other output write dir; this will be + # TODO: important once netcdf output works + # Then gather output from all worker hosts + gather_output + # Then wait at this point (if necessary) for our background config copy to avoid taxing things + echo "$(print_date) Waiting for compression and copying of configuration files to output dataset" + wait ${_CONFIG_COPY_PROC} + echo "$(print_date) Compression/copying of config data to output dataset complete" + echo "$(print_date) Copying results to output dataset" + move_output_to_dataset ${JOB_OUTPUT_WRITE_DIR} ${OUTPUT_DATASET_DIR:?} + echo "$(print_date) Results copied to output dataset" + # Otherwise, we just have a serial job ... else + # Execute it first exec_serial_ngen_run + + # TODO: (later) in ngen and ngen-cal entrypoints, add controls for whether this is done base on whether we + # TODO: are writing directly to output dataset dir or some other output write dir; this will be + # TODO: important once netcdf output works + echo "$(print_date) Waiting for compression and copying of configuration files to output dataset" + wait ${_CONFIG_COPY_PROC} + echo "$(print_date) Compression/copying of config data to output dataset complete" + + echo "$(print_date) Copying results to output dataset" + move_output_to_dataset ${JOB_OUTPUT_WRITE_DIR} ${OUTPUT_DATASET_DIR:?} + echo "$(print_date) Results copied to output dataset" fi else # Start SSHD on the main worker if have an MPI job From 9a05f1576bcd11fad95fb7677fd6de5eb2364b1a Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 6 Jun 2024 14:52:07 -0400 Subject: [PATCH 02/11] Adjusting data formats for CSV/archive output. Renaming NGEN_OUTPUT to NGEN_CSV_OUTPUT, introducing ARCHIVED_NGEN_CSV_OUTPUT format for when CSVs are bundled into tar file before being placed in dataset, and proactively add NGEN_NETCDF_OUTPUT format for eventual use; also, switch to ARCHIVED_NGEN_CSV_OUTPUT as current job default. --- .../ngen/abstract_nextgen_request.py | 2 +- python/lib/core/dmod/core/meta_data.py | 40 ++++++++++++++----- .../dmod/test/scheduler_test_utils.py | 2 +- .../dataservice/dmod/dataservice/service.py | 13 ++++-- 4 files changed, 41 insertions(+), 16 deletions(-) diff --git a/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py b/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py index d3b4221c7..fe344e9ac 100644 --- a/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py +++ b/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py @@ -311,7 +311,7 @@ def output_formats(self) -> List[DataFormat]: List[DataFormat] List of the formats of each required output dataset for the requested job. """ - return [DataFormat.NGEN_OUTPUT] + return [DataFormat.ARCHIVED_NGEN_CSV_OUTPUT] @property def partition_cfg_data_id(self) -> Optional[str]: diff --git a/python/lib/core/dmod/core/meta_data.py b/python/lib/core/dmod/core/meta_data.py index e66d7199e..a4fe2a607 100644 --- a/python/lib/core/dmod/core/meta_data.py +++ b/python/lib/core/dmod/core/meta_data.py @@ -118,11 +118,11 @@ class DataFormat(PydanticEnum): True ) """ The default format for "raw" AORC forcing data. """ - NGEN_OUTPUT = (3, - {StandardDatasetIndex.CATCHMENT_ID: None, StandardDatasetIndex.TIME: None, StandardDatasetIndex.DATA_ID: None}, - None, - True) - """ Representation of the format for Nextgen output, with unknown/unspecified configuration of output fields. """ + NGEN_CSV_OUTPUT = (3, + {StandardDatasetIndex.CATCHMENT_ID: None, StandardDatasetIndex.TIME: None, StandardDatasetIndex.DATA_ID: None}, + None, + True) + """ Format for output of ngen when written as CSV, with unknown/unspecified configuration of output fields. """ NGEN_REALIZATION_CONFIG = ( 4, {StandardDatasetIndex.CATCHMENT_ID: None, StandardDatasetIndex.TIME: None, StandardDatasetIndex.DATA_ID: None}, None, True) """ Representation of the format of realization configs, which covers catchments (id) has a time period (time). """ @@ -221,15 +221,28 @@ class DataFormat(PydanticEnum): is removed). """ + ARCHIVED_NGEN_CSV_OUTPUT = (17, + {StandardDatasetIndex.CATCHMENT_ID: None, StandardDatasetIndex.TIME: None, StandardDatasetIndex.DATA_ID: None}, + None, + True) + """ Format for output of ngen, similar to ``NGEN_CSV_OUTPUT``, but with all output archived to single tar file. """ + + NGEN_NETCDF_OUTPUT = (18, + {StandardDatasetIndex.CATCHMENT_ID: None, StandardDatasetIndex.TIME: None, + StandardDatasetIndex.DATA_ID: None}, + None, + True) + """ Format for output of ngen when written to single NetCDF file, with dynamically configured output fields. """ + @classmethod def can_format_fulfill(cls, needed: DataFormat, alternate: DataFormat) -> bool: """ - Test whether data in an alternate format is capable of satisfying requirements of some other format. + Test whether a dataset and contained data in some format can satisfy requirements of a different format. - This function indicates whether data in one format (the alternate format) is compatible with requirements - specified using a different format (the needed format). It is an indication of whether data is **potentially** - capable of satisfying a requirement - even if the data formats of the two are not the same - due to the two - formats being sufficiently similar. + This function indicates whether a hypothetical dataset and its data, having some particular format (the + alternate format) is compatible with hypothical requirements specified using a different format (the needed + format). It is an indication of whether a dataset and its data are **potentially** capable of satisfying a + requirement, even with a different format, due to the two formats being sufficiently similar. For example, the NextGen framework can support forcings in either CSV or NetCDF formats, represented as ``AORC_CSV`` and ``NETCDF_FORCING_CANONICAL`` respectively. A job to execute NextGen would include a forcing @@ -264,7 +277,12 @@ def can_format_fulfill(cls, needed: DataFormat, alternate: DataFormat) -> bool: compatible_forcing_formats = {cls.AORC_CSV, cls.NETCDF_FORCING_CANONICAL, cls.NETCDF_AORC_DEFAULT} if needed in compatible_forcing_formats and alternate in compatible_forcing_formats: return True - # Anything else, they are compatible + + ngen_csv_output_formats = {cls.ARCHIVED_NGEN_CSV_OUTPUT, cls.NGEN_CSV_OUTPUT} + if needed in ngen_csv_output_formats and alternate in ngen_csv_output_formats: + return True + + # Anything else, they are not compatible return False @classmethod diff --git a/python/lib/scheduler/dmod/test/scheduler_test_utils.py b/python/lib/scheduler/dmod/test/scheduler_test_utils.py index 707457df9..091bba9eb 100644 --- a/python/lib/scheduler/dmod/test/scheduler_test_utils.py +++ b/python/lib/scheduler/dmod/test/scheduler_test_utils.py @@ -83,7 +83,7 @@ def mock_job(model: str = 'nwm', cpus: int = 4, mem: int = 500000, strategy: str request_json['model'] = _ngen_model dataset_name = 'test_output_dataset_1' model_request = NGENRequest.factory_init_from_deserialized_json(request_json) - data_domain = DataDomain(data_format=DataFormat.NGEN_OUTPUT, + data_domain = DataDomain(data_format=DataFormat.NGEN_CSV_OUTPUT, discrete_restrictions=[DiscreteRestriction(variable='id', values=[])]) output_requirement = DataRequirement(domain=data_domain, is_input=False, category=DataCategory.OUTPUT, fulfilled_by=dataset_name) diff --git a/python/services/dataservice/dmod/dataservice/service.py b/python/services/dataservice/dmod/dataservice/service.py index b421ea3bc..08a68a5e0 100644 --- a/python/services/dataservice/dmod/dataservice/service.py +++ b/python/services/dataservice/dmod/dataservice/service.py @@ -7,7 +7,7 @@ from dmod.communication import DatasetManagementMessage, DatasetManagementResponse, ManagementAction, WebSocketInterface from dmod.communication.dataset_management_message import DatasetQuery, QueryType from dmod.communication.data_transmit_message import DataTransmitMessage, DataTransmitResponse -from dmod.core.meta_data import DataCategory, DataDomain, DataRequirement, DiscreteRestriction, \ +from dmod.core.meta_data import DataCategory, DataDomain, DataFormat, DataRequirement, DiscreteRestriction, \ StandardDatasetIndex from dmod.core.dataset import Dataset, DatasetManager, DatasetUser, DatasetType from dmod.core.serializable import BasicResultIndicator @@ -787,11 +787,18 @@ def _create_output_datasets(self, job: Job): break # TODO: (later) more intelligently determine type - mgr = self._managers.manager(DatasetType.OBJECT_STORE) + dataset_type = DatasetType.OBJECT_STORE + mgr = self._managers.manager(dataset_type) + + data_format = job.model_request.output_formats[i] + # If we are writing to an object store, lots of CSV files will kill us, so switch to archived variant + if dataset_type == DatasetType.OBJECT_STORE and data_format == DataFormat.NGEN_CSV_OUTPUT: + data_format = DataFormat.ARCHIVED_NGEN_CSV_OUTPUT + dataset = mgr.create(name='job-{}-output-{}'.format(job.job_id, i), is_read_only=False, category=DataCategory.OUTPUT, - domain=DataDomain(data_format=job.model_request.output_formats[i], + domain=DataDomain(data_format=data_format, continuous_restrictions=None if time_range is None else [time_range], discrete_restrictions=[id_restrict])) # TODO: (later) in the future, whether the job is running via Docker needs to be checked From e38df7a35b06adaf57d30d2d133ea1108fd74989 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 6 Jun 2024 14:57:24 -0400 Subject: [PATCH 03/11] Fix unit tests for change to NGEN_CSV_OUTPUT. --- data/serialized_dataset_examples/ngen-output-1.json | 2 +- python/lib/core/dmod/test/test_meta_data.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/data/serialized_dataset_examples/ngen-output-1.json b/data/serialized_dataset_examples/ngen-output-1.json index e638c9d04..9d5660b9c 100644 --- a/data/serialized_dataset_examples/ngen-output-1.json +++ b/data/serialized_dataset_examples/ngen-output-1.json @@ -5,7 +5,7 @@ "type" : "OBJECT_STORE", "data_domain" : { "continuous" : [], - "data_format" : "NGEN_OUTPUT", + "data_format" : "NGEN_CSV_OUTPUT", "discrete" : [ { "values" : [ diff --git a/python/lib/core/dmod/test/test_meta_data.py b/python/lib/core/dmod/test/test_meta_data.py index 1c5b292dc..bc39df6db 100644 --- a/python/lib/core/dmod/test/test_meta_data.py +++ b/python/lib/core/dmod/test/test_meta_data.py @@ -191,7 +191,7 @@ def test_to_dict(self): expected_serialized_data_fields = {"a": "int", "b": "float", "c": "bool", "d": "str", "e": "Any"} data = { # NOTE: NGEN_OUTPUT data_fields = None. - "data_format": "NGEN_OUTPUT", + "data_format": "NGEN_CSV_OUTPUT", "continuous": {}, "discrete": {StandardDatasetIndex.DATA_ID: {"variable": "DATA_ID", "values": ["0"]}}, } From b667dd658662e20bb3551c79e8a89b862e0b4fc1 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Wed, 12 Jun 2024 17:06:54 -0400 Subject: [PATCH 04/11] Move to Python for entrypoint IO optimizations. --- docker/main/ngen/Dockerfile | 1 + docker/main/ngen/funcs.sh | 91 -------- docker/main/ngen/ngen_entrypoint.sh | 10 +- docker/main/ngen/py_funcs.py | 309 ++++++++++++++++++++++++++++ 4 files changed, 315 insertions(+), 96 deletions(-) create mode 100644 docker/main/ngen/py_funcs.py diff --git a/docker/main/ngen/Dockerfile b/docker/main/ngen/Dockerfile index a94f9cb88..8fd605e13 100644 --- a/docker/main/ngen/Dockerfile +++ b/docker/main/ngen/Dockerfile @@ -899,6 +899,7 @@ USER ${USER} COPY --chown=${USER} noah_owp_parameters /dmod/bmi_module_data/noah_owp/parameters/ COPY --chown=${USER} ngen_entrypoint.sh ${WORKDIR}/entrypoint.sh COPY --chown=${USER} funcs.sh ${WORKDIR} +COPY --chown=${USER} --chmod=744 py_funcs.py /dmod/bin/py_funcs ENV HYDRA_PROXY_RETRY_COUNT=5 diff --git a/docker/main/ngen/funcs.sh b/docker/main/ngen/funcs.sh index 0944a51a0..0b8797111 100644 --- a/docker/main/ngen/funcs.sh +++ b/docker/main/ngen/funcs.sh @@ -213,94 +213,3 @@ run_secondary_mpi_ssh_worker_node() sleep 5 done } - -tar_and_copy() -{ - # If $1 is "--dry-run" then just do sanity checks without tarring or copying, then shift args - # If $1 is "--compress", then indicate that tar should be gzipped, then shift args - - # $1 is source directory containing contents to archive - # $2 is the name of/path to the produced tar archive - # $3 is the location to copy to - - if [ "${1:?No args given to tar_and_copy}" == "--dry-run" ]; then - local _DRY_RUN="true" - shift - fi - - if [ "${1:?No contents directory given to tar_and_copy}" == "--compress" ]; then - local _TAR_EXTRA_ARGS="-z" - shift - fi - - local _CONTENTS_DIR="${1:?No contents directory given to tar_and_copy}" - local _TAR_FILE="${2:?No archive file given to tar_and_copy}" - local _DEST_DIR="${3:?No copy destination directory given to tar_and_copy}" - - if [ ! -e "${_CONTENTS_DIR}" ]; then - >&2 echo "$(print_date) Error: tar_and_copy contents directory '${_CONTENTS_DIR}' does not exist!" - exit 1 - elif [ ! -d "${_CONTENTS_DIR}" ]; then - >&2 echo "$(print_date) Error: tar_and_copy contents directory '${_CONTENTS_DIR}' exists but is not a directory!" - exit 1 - elif [ ! -e "${_DEST_DIR}" ]; then - >&2 echo "$(print_date) Error: tar_and_copy copy destination directory '${_DEST_DIR}' does not exist!" - exit 1 - elif [ ! -e "${_DEST_DIR}" ]; then - >&2 echo "$(print_date) Error: tar_and_copy copy destination directory '${_DEST_DIR}' exist but is not a directory!" - exit 1 - elif [ -e "${_TAR_FILE}" ]; then - >&2 echo "$(print_date) Error: tar_and_copy archive file '${_TAR_FILE}' already exists!" - exit 1 - fi - - if [ "${_DRY_RUN:-}" == "true" ]; then - return 0 - fi - - tar -c ${_TAR_EXTRA_ARGS:-} -f "${_DEST_DIR}/${_TAR_FILE}" -C "${_CONTENTS_DIR}" . - #cp -a "${_TAR_FILE}" "${_DEST_DIR}/." - #rm "${_TAR_FILE}" -} - -gather_output() { - echo "$(print_date) Gather from remote worker host ${JOB_OUTPUT_WRITE_DIR:?Job temp output dir not defined} dirs" - for i in $(echo "${MPI_HOST_STRING}" | sed 's/,/ /g'); do - _HOST_NAME=$(echo "${i}" | awk -F: '{print $1}') - if [ "$(hostname)" == "${_HOST_NAME}" ]; then - continue - fi - scp -q -r ${_HOST_NAME}:${JOB_OUTPUT_WRITE_DIR}/ ${JOB_OUTPUT_WRITE_DIR}/. & - done - for p in $(jobs -p); do - wait ${p} - _R=$? - if [ ${_R} -ne 0 ]; then - echo "$(print_date) Error: remote copying of output exited with error ${_R}" - exit ${_R} - fi - done -} - -move_output_to_dataset() -{ - # $1 output directory - # $2 dataset directory - - if [ ! -d ${1:?No output directory given for copying to dataset} ]; then - >&2 echo "$(print_date) Error: cannot move output from non-directory path '${1}' to output dataset!" - exit 1 - elif [ ! -d ${2:?No output dataset directory given for copying} ]; then - >&2 echo "$(print_date) Error: cannot move output to non-directory path '${1}' for output dataset!" - exit 1 - fi - - if [ $(ls ${1} | grep '.csv' | wc -l) -gt 0 ]; then - echo "$(print_date) Archiving and copying output CSVs to output dataset" - tar_and_copy ${1} job-${JOB_ID:?}-output.tar ${2} - else - echo "$(print_date) Copying output file(s) to output dataset" - cp -a ${1}/. ${2}/. - fi - rm -rf ${1} -} \ No newline at end of file diff --git a/docker/main/ngen/ngen_entrypoint.sh b/docker/main/ngen/ngen_entrypoint.sh index f16707a7e..c22b86770 100755 --- a/docker/main/ngen/ngen_entrypoint.sh +++ b/docker/main/ngen/ngen_entrypoint.sh @@ -69,9 +69,9 @@ if [ "${WORKER_INDEX:-0}" = "0" ]; then # TODO: (later) in ngen and ngen-cal entrypoints, consider adding controls for whether this is done or a simpler # TODO: 'cp' call, based on whether we write directly to output dataset dir or some other output write dir # Do a dry run first to sanity check directory and fail if needed before backgrounding process - tar_and_copy --dry-run --compress ${CONFIG_DATASET_DIR:?Config dataset directory not defined} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} + py_funcs tar_and_copy --dry-run --compress ${CONFIG_DATASET_DIR:?Config dataset directory not defined} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} # Then actually run the archive and copy function in the background - tar_and_copy --compress ${CONFIG_DATASET_DIR:?} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} & + py_funcs tar_and_copy --compress ${CONFIG_DATASET_DIR:?} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} & _CONFIG_COPY_PROC=$! # If there is partitioning, which implies multi-processing job ... if [ -n "${PARTITION_DATASET_DIR:-}" ]; then @@ -84,13 +84,13 @@ if [ "${WORKER_INDEX:-0}" = "0" ]; then # TODO: are writing directly to output dataset dir or some other output write dir; this will be # TODO: important once netcdf output works # Then gather output from all worker hosts - gather_output + py_funcs gather_output ${MPI_HOST_STRING:?} ${JOB_OUTPUT_WRITE_DIR:?} # Then wait at this point (if necessary) for our background config copy to avoid taxing things echo "$(print_date) Waiting for compression and copying of configuration files to output dataset" wait ${_CONFIG_COPY_PROC} echo "$(print_date) Compression/copying of config data to output dataset complete" echo "$(print_date) Copying results to output dataset" - move_output_to_dataset ${JOB_OUTPUT_WRITE_DIR} ${OUTPUT_DATASET_DIR:?} + py_funcs move_job_output --job_id ${JOB_ID:?} ${JOB_OUTPUT_WRITE_DIR} to_directory ${OUTPUT_DATASET_DIR:?} echo "$(print_date) Results copied to output dataset" # Otherwise, we just have a serial job ... else @@ -105,7 +105,7 @@ if [ "${WORKER_INDEX:-0}" = "0" ]; then echo "$(print_date) Compression/copying of config data to output dataset complete" echo "$(print_date) Copying results to output dataset" - move_output_to_dataset ${JOB_OUTPUT_WRITE_DIR} ${OUTPUT_DATASET_DIR:?} + py_funcs move_job_output --job_id ${JOB_ID:?} ${JOB_OUTPUT_WRITE_DIR} to_directory ${OUTPUT_DATASET_DIR:?} echo "$(print_date) Results copied to output dataset" fi else diff --git a/docker/main/ngen/py_funcs.py b/docker/main/ngen/py_funcs.py new file mode 100644 index 000000000..c08de50e9 --- /dev/null +++ b/docker/main/ngen/py_funcs.py @@ -0,0 +1,309 @@ +#!/usr/bin/env python3 + +import argparse +import logging +import os +import shutil +import tarfile + +from collections import OrderedDict +from datetime import datetime +from pathlib import Path +from subprocess import Popen +from typing import Dict, List, Literal, Optional + + +def _apply_logging(log_level: Literal["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]): + logging.basicConfig( + level=logging.getLevelName(log_level), + format="%(asctime)s,%(msecs)d %(levelname)s: %(message)s", + datefmt="%H:%M:%S" + ) + + +def _parse_for_tar_and_copy(parent_subparsers_container): + """ Run subparser for CLI command responsible for running `tar_and_copy` helper function. """ + # A parser for the 'tar_and_copy' param itself, underneath the parent 'command' subparsers container + helper_cmd_parser = parent_subparsers_container.add_parser( + 'tar_and_copy', description="Archive contents of a directory and copy archive to specified destination.") + helper_cmd_parser.add_argument('--dry-run', dest='do_dry_run', action='store_true', + help='Perform a dry run to check paths, with no archiving/moving/copying.') + helper_cmd_parser.add_argument('--compress', dest='do_compress', action='store_true', + help='Compress the created archive with gzip compression.') + helper_cmd_parser.add_argument('archive_name', help='Basename for the created archive file.') + helper_cmd_parser.add_argument('source', type=Path, help='Directory whose contents should be archived.') + helper_cmd_parser.add_argument('dest', type=Path, help='Destination directory in which to place the archive file.') + + +def _parse_for_gather_output(parent_subparsers_container): + # A parser for the 'tar_and_copy' param itself, underneath the parent 'command' subparsers container + desc = "Using subprocesses, gather output from remote MPI hosts and collect in analogous directory on this host." + helper_cmd_parser = parent_subparsers_container.add_parser('gather_output', description=desc) + # TODO: (later) when we move to all Python, rework this to accept several individually + helper_cmd_parser.add_argument('mpi_hosts_str', help='Comma delimited MPI hosts string received by worker.') + helper_cmd_parser.add_argument('output_write_dir', type=Path, help='Directory where output was written on hosts.') + + +def _subparse_move_to_directory(parent_subparser_container): + sub_cmd_parser = parent_subparser_container.add_parser('to_directory', description="Move to a specified directory") + sub_cmd_parser.add_argument("dest_dir", type=Path, help="Destination directory to which to move the output") + + +def _parse_for_move_job_output(parent_subparsers_container): + # A parser for the 'tar_and_copy' param itself, underneath the parent 'command' subparsers container + desc = "Move output data files produced by a job to another location, typically to put them into a DMOD dataset." + helper_cmd_parser = parent_subparsers_container.add_parser('move_job_output', description=desc) + helper_cmd_parser.add_argument('--job_id', '--job-id', dest='job_id', help='Optionally specify job id.') + helper_cmd_parser.add_argument('--archive-files', dest='do_archiving', choices=["true", "false"], + type=lambda s: True if s.lower == "true" else False, default=None, + help='Force archiving before moving job output.') + helper_cmd_parser.add_argument('output_directory', type=Path, + help='Source directory containing output files to be placed within the dataset.') + + cmd_subparsers = helper_cmd_parser.add_subparsers(dest='move_action', help="Specify the type of move action.") + cmd_subparsers.required = True + _subparse_move_to_directory(cmd_subparsers) + + +def _parse_args() -> argparse.Namespace: + """ + Set up and run top-level arg parsing for module. + + Returns + ------- + argparse.Namespace + The parsed arguments namespace object. + """ + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, prog='py_funcs') + parser.add_argument('--log-level', '-L', dest='log_level', + default=os.environ.get("DEFAULT_LOG_LEVEL", "INFO").upper(), + choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"], help='Optionally specify log level.') + + subparsers = parser.add_subparsers(dest='command', help='Specify the Python helper function command to run.') + subparsers.required = True + + _parse_for_tar_and_copy(parent_subparsers_container=subparsers) + _parse_for_gather_output(parent_subparsers_container=subparsers) + _parse_for_move_job_output(parent_subparsers_container=subparsers) + + return parser.parse_args() + + +def _move_to_directory(source_dir: Path, dest_dir: Path, archive_name: Optional[str] = None): + """ + Move source data files from their initial directory to a different directory, potentially combining into an archive. + + Parameters + ---------- + source_dir + The source directory containing output files to be placed within the dataset; note that this has already been + tested for existence as a directory. + dest_dir + The destination directory, to which the output data should be moved; note that this path has not yet been tested + for existence as a directory. + archive_name + When files should be archived as part of moving, the name of the archive; when files should not be archived, + ``None``. + """ + if not dest_dir.is_dir(): + raise ValueError(f"{get_date_str()} Can't move job output to non-directory path {dest_dir!s}!") + + if archive_name: + logging.info("Archiving output files to output dataset") + tar_and_copy(source=source_dir, dest=dest_dir, archive_name=archive_name) + else: + logging.info("Moving output file(s) to output dataset") + for p in source_dir.glob("*"): + shutil.move(p, dest_dir) + + +def gather_output(mpi_host_names: List[str], output_write_dir: Path): + """ + Using subprocesses, gather output from remote MPI hosts and collect in the analogous directory on this host. + + Parameters + ---------- + mpi_host_names + List of MPI host names for the relevant job. + output_write_dir + Common job output directory across all hosts, from which data on remotes should be taken and in which data on + this host should be collected. + """ + from socket import gethostname + local_hostname = gethostname() + + scp_subs = OrderedDict() + + for host in (h for h in mpi_host_names if h != local_hostname): + scp_subs[host] = Popen(f"scp -q -r {host}:${output_write_dir!s}/ ${output_write_dir!s}/.") + + for host in scp_subs: + return_code = scp_subs[host].wait() + if return_code != 0: + raise RuntimeError(f"{get_date_str()} gather_output failed for {host} w/ return code {return_code!s}!") + + +def get_date_str() -> str: + """ + Get the current date and time as a string with format ``%Y-%m-%d,%H:%M:%S`` + + Returns + ------- + The current date and time as a string. + """ + return datetime.now().strftime('%Y-%m-%d,%H:%M:%S') + + +def move_job_output(output_directory: Path, move_action: str, do_archiving: Optional[bool] = None, + job_id: Optional[str] = None, **kwargs): + """ + Move output data files from a job from their initial directory to somewhere, depending on the CLI-given move action. + + If `do_archiving` is either ``True`` or ``False`` (by default, it is ``None``), have that control whether output + files should be archived before moving. If it is ``None``, re-set its value to whether the output directory contains + more than 100 individual files. + + Parameters + ---------- + output_directory + move_action + do_archiving + kwargs + Other keyword args from the CLI specific to the particular move action to be performed. + + """ + if not output_directory.is_dir(): + raise ValueError( + f"{get_date_str()} Can't move job output from non-directory path {output_directory!s} to output dataset") + + + # If this was not set, dynamically determine what it should be + if do_archiving is None: + # For now, just do this if the output data contains more than 100 individual files + out_dir_files = [f for f in output_directory.glob("**/*")] + out_dir_file_count = len(out_dir_files) + logging.debug(f"Dir {output_directory!s} contains {out_dir_file_count!s} files") + max_displayed = 25 + displayed = "\n ".join((str(f) for f in (out_dir_files[:25] if len(out_dir_files) > 25 else out_dir_files))) + logging.debug(f"List of files in {output_directory!s} (max first {max_displayed!s}): \n {displayed}") + do_archiving = out_dir_file_count > 100 + else: + logging.debug(f"Archiving parameter was set to {do_archiving!s}") + + assert do_archiving is not None + + # Sub-commands will know if they should do archiving based on whether they actually receive and archive name to use + if do_archiving: + archive_name = f"job-{job_id}-output.tar" if job_id else "job-output.tar" + logging.debug(f"Archiving files with archive name {archive_name}") + else: + logging.debug("Set to not archive files before moving") + archive_name = None + + if move_action == "to_directory": + _move_to_directory(source_dir=output_directory, dest_dir=kwargs["dest_dir"], archive_name=archive_name) + else: + raise RuntimeError(f"{get_date_str()} Invalid CLI move action {move_action}") + + +def process_mpi_hosts_string(hosts_string: str, hosts_sep: str = ",", host_details_sep: str = ":") -> Dict[str, int]: + """ + Process the MPI hosts string received by worker entrypoint, splitting into a mapping of host names and processes. + + Parameters + ---------- + hosts_string + The raw host string received by the worker when the service container is started. + hosts_sep + The delimiter between individual host entries within the string; by default, ``,``. + host_details_sep + The delimiter, within a host entry substring, between the host name and the number of processes for that host. + + Returns + ------- + A dictionary, keyed by host name, mapped to the number of processes for each host. + """ + results = dict() + for host_entry in hosts_string.split(sep=hosts_sep): + split_host_details = host_entry.split(sep=host_details_sep) + if len(split_host_details) != 2: + raise ValueError(f"Unexpected format for MPI host string (using '{hosts_sep}' and '{host_details_sep}'): " + f"'{hosts_string}'") + results[split_host_details[0]] = int(split_host_details[1]) + return results + + +def tar_and_copy(source: Path, dest: Path, archive_name: str, do_dry_run: bool = False, do_compress: bool = False): + """ + Make a tar archive from the contents of a directory, and place this in a specified destination. + + Parameters + ---------- + source + Directory whose contents should be archived. + dest + Destination directory in which to place the archive file. + archive_name + Basename for the created archive file. + do_dry_run + Whether to only perform a dry run to check paths, with no archiving/moving/copying. + do_compress + Whether to compress the created archive with gzip compression. + + Raises + ------- + ValueError + If the source or dest directory either does not exist or is not a directory; also, if the archive file already + exists. + """ + if not source.exists(): + raise ValueError(f"{get_date_str()} tar_and_copy source directory {source!s} does not exist!") + elif not source.is_dir(): + raise ValueError(f"{get_date_str()} Non-directory file at path for tar_and_copy source directory {source!s}!") + + if not dest.exists(): + raise ValueError(f"{get_date_str()} tar_and_copy destination directory {dest!s} does not exist!") + elif not dest.is_dir(): + raise ValueError(f"{get_date_str()} Non-directory file at path for tar_and_copy dest directory {dest!s}!") + + # We may change things in the future to write somewhere else, so do things in a little more of a round-about way + # This is where the archive initially gets created + archive_create_path = dest.joinpath(archive_name) + # This is where it must eventually go + final_archive_path = dest.joinpath(archive_name) + + if archive_create_path.exists(): + raise ValueError(f"{get_date_str()} File exists at tar_and_copy archive path {archive_create_path!s}!") + if final_archive_path != archive_create_path and final_archive_path.exists(): + raise ValueError(f"{get_date_str()} Archive for tar_and_copy already exists in dest at {final_archive_path!s}!") + + if do_dry_run: + return + + tar_mode_args = "w:gz" if do_compress else "w" + with tarfile.open(archive_create_path, tar_mode_args) as tar: + for p in source.glob("*"): + tar.add(p, arcname=p.name) + + if archive_create_path != final_archive_path: + shutil.move(archive_create_path, final_archive_path) + + +def main(): + args = _parse_args() + + _apply_logging(args.log_level) + + if args.command == 'tar_and_copy': + tar_and_copy(**(vars(args))) + elif args.command == 'gather_output': + mpi_host_to_nproc_map = process_mpi_hosts_string(args.mpi_hosts_str) + gather_output(mpi_host_names=[h for h in mpi_host_to_nproc_map], output_write_dir=args.output_write_dir) + elif args.command == 'move_job_output': + move_job_output(**(vars(args))) + else: + raise ValueError("Unsupported command {}".format(args.command)) + + +if __name__ == '__main__': + main() From f93fef935cd42c4c1500f930df3b981adca1f535 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 13 Jun 2024 09:57:29 -0400 Subject: [PATCH 05/11] Also move cal image to initial Py functions. --- docker/main/ngen/ngen_cal_entrypoint.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/main/ngen/ngen_cal_entrypoint.sh b/docker/main/ngen/ngen_cal_entrypoint.sh index 14e4e60f4..2483f9855 100755 --- a/docker/main/ngen/ngen_cal_entrypoint.sh +++ b/docker/main/ngen/ngen_cal_entrypoint.sh @@ -110,9 +110,9 @@ if [ "${WORKER_INDEX:-0}" = "0" ]; then # TODO: (later) in ngen and ngen-cal entrypoints, consider adding controls for whether this is done or a simpler # TODO: 'cp' call, based on whether we write directly to output dataset dir or some other output write dir # Do a dry run first to sanity check directory and fail if needed before backgrounding process - tar_and_copy --dry-run --compress ${CONFIG_DATASET_DIR:?Config dataset directory not defined} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} + py_funcs tar_and_copy --dry-run --compress ${CONFIG_DATASET_DIR:?Config dataset directory not defined} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} # Then actually run the archive and copy function in the background - tar_and_copy --compress ${CONFIG_DATASET_DIR:?} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} & + py_funcs tar_and_copy --compress ${CONFIG_DATASET_DIR:?} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} & _CONFIG_COPY_PROC=$! # If there is partitioning, which implies multi-processing job ... if [ -n "${PARTITION_DATASET_DIR:-}" ]; then From 874589fba95e91351cb16095a9db901cd852240c Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 13 Jun 2024 10:09:16 -0400 Subject: [PATCH 06/11] Bump package versions and internal dependencies. Bumping versions of dmod-core and dmod-communication for direct changes, and then bumping other library and service packages to use the updated versions for their dependencies (along with bumping their own versions). --- python/lib/client/dmod/client/_version.py | 2 +- python/lib/client/setup.py | 2 +- python/lib/communication/dmod/communication/_version.py | 2 +- python/lib/communication/setup.py | 2 +- python/lib/core/dmod/core/_version.py | 2 +- python/lib/scheduler/dmod/scheduler/_version.py | 2 +- python/lib/scheduler/setup.py | 4 ++-- python/services/dataservice/dmod/dataservice/_version.py | 2 +- python/services/dataservice/setup.py | 2 +- .../services/requestservice/dmod/requestservice/_version.py | 2 +- python/services/requestservice/setup.py | 2 +- .../schedulerservice/dmod/schedulerservice/_version.py | 2 +- python/services/schedulerservice/setup.py | 2 +- 13 files changed, 14 insertions(+), 14 deletions(-) diff --git a/python/lib/client/dmod/client/_version.py b/python/lib/client/dmod/client/_version.py index b4e35405f..e4e49b3bb 100644 --- a/python/lib/client/dmod/client/_version.py +++ b/python/lib/client/dmod/client/_version.py @@ -1 +1 @@ -__version__ = '0.8.3' +__version__ = '0.9.0' diff --git a/python/lib/client/setup.py b/python/lib/client/setup.py index 8c0dd5b8a..714a88964 100644 --- a/python/lib/client/setup.py +++ b/python/lib/client/setup.py @@ -22,7 +22,7 @@ license='', include_package_data=True, #install_requires=['websockets', 'jsonschema'],vi - install_requires=['dmod-core>=0.16.0', 'websockets>=8.1', 'pydantic>=1.10.8,~=1.10', 'dmod-communication>=0.19.2', + install_requires=['dmod-core>=0.17.0', 'websockets>=8.1', 'pydantic>=1.10.8,~=1.10', 'dmod-communication>=0.20.0', 'dmod-externalrequests>=0.6.0', 'dmod-modeldata>=0.12.0'], packages=find_namespace_packages(include=['dmod.*'], exclude=['dmod.test']) ) diff --git a/python/lib/communication/dmod/communication/_version.py b/python/lib/communication/dmod/communication/_version.py index 5daae67f7..2f15b8cd3 100644 --- a/python/lib/communication/dmod/communication/_version.py +++ b/python/lib/communication/dmod/communication/_version.py @@ -1 +1 @@ -__version__ = '0.19.2' +__version__ = '0.20.0' diff --git a/python/lib/communication/setup.py b/python/lib/communication/setup.py index 741a3a314..ad30beca1 100644 --- a/python/lib/communication/setup.py +++ b/python/lib/communication/setup.py @@ -21,7 +21,7 @@ url='', license='', include_package_data=True, - install_requires=['dmod-core>=0.11.0', 'websockets>=10.1', 'jsonschema', 'redis', 'pydantic>=1.10.8,~=1.10', + install_requires=['dmod-core>=0.17.0', 'websockets>=10.1', 'jsonschema', 'redis', 'pydantic>=1.10.8,~=1.10', 'Deprecated', 'ngen-config@git+https://github.com/noaa-owp/ngen-cal@master#egg=ngen-config&subdirectory=python/ngen_conf'], packages=find_namespace_packages(include=['dmod.*'], exclude=['dmod.test']) diff --git a/python/lib/core/dmod/core/_version.py b/python/lib/core/dmod/core/_version.py index 8911e95ca..435d64bd6 100644 --- a/python/lib/core/dmod/core/_version.py +++ b/python/lib/core/dmod/core/_version.py @@ -1 +1 @@ -__version__ = '0.16.0' +__version__ = '0.17.0' diff --git a/python/lib/scheduler/dmod/scheduler/_version.py b/python/lib/scheduler/dmod/scheduler/_version.py index 92a60bdf2..2d7893e3d 100644 --- a/python/lib/scheduler/dmod/scheduler/_version.py +++ b/python/lib/scheduler/dmod/scheduler/_version.py @@ -1 +1 @@ -__version__ = '0.12.2' +__version__ = '0.13.0' diff --git a/python/lib/scheduler/setup.py b/python/lib/scheduler/setup.py index a533a9bc2..218926ced 100644 --- a/python/lib/scheduler/setup.py +++ b/python/lib/scheduler/setup.py @@ -20,8 +20,8 @@ author_email='', url='', license='', - install_requires=['docker>=7.1.0', 'Faker', 'dmod-communication>=0.17.0', 'dmod-modeldata>=0.7.1', 'dmod-redis>=0.1.0', - 'dmod-core>=0.15.0', 'cryptography', 'uri', 'pyyaml', 'pydantic>=1.10.8,~=1.10'], + install_requires=['docker>=7.1.0', 'Faker', 'dmod-communication>=0.20.0', 'dmod-modeldata>=0.7.1', 'dmod-redis>=0.1.0', + 'dmod-core>=0.17.0', 'cryptography', 'uri', 'pyyaml', 'pydantic>=1.10.8,~=1.10'], packages=find_namespace_packages(exclude=['dmod.test', 'src']) ) diff --git a/python/services/dataservice/dmod/dataservice/_version.py b/python/services/dataservice/dmod/dataservice/_version.py index e754a834e..f323a57be 100644 --- a/python/services/dataservice/dmod/dataservice/_version.py +++ b/python/services/dataservice/dmod/dataservice/_version.py @@ -1 +1 @@ -__version__ = '0.10.1' +__version__ = '0.11.0' diff --git a/python/services/dataservice/setup.py b/python/services/dataservice/setup.py index 78d256d21..ec42ea4a4 100644 --- a/python/services/dataservice/setup.py +++ b/python/services/dataservice/setup.py @@ -17,7 +17,7 @@ author_email='', url='', license='', - install_requires=['dmod-core>=0.16.0', 'dmod-communication>=0.14.0', 'dmod-scheduler>=0.12.2', + install_requires=['dmod-core>=0.17.0', 'dmod-communication>=0.20.0', 'dmod-scheduler>=0.12.2', 'dmod-modeldata>=0.12.0', 'redis', "pydantic[dotenv]>=1.10.8,~=1.10", "fastapi", "uvicorn[standard]", 'ngen-config@git+https://github.com/noaa-owp/ngen-cal@master#egg=ngen-config&subdirectory=python/ngen_conf', 'ngen-cal@git+https://github.com/noaa-owp/ngen-cal@master#egg=ngen-config&subdirectory=python/ngen_cal'], diff --git a/python/services/requestservice/dmod/requestservice/_version.py b/python/services/requestservice/dmod/requestservice/_version.py index 102b47c9c..fcca93be3 100644 --- a/python/services/requestservice/dmod/requestservice/_version.py +++ b/python/services/requestservice/dmod/requestservice/_version.py @@ -1 +1 @@ -__version__ = '0.9.1' \ No newline at end of file +__version__ = '0.10.0' \ No newline at end of file diff --git a/python/services/requestservice/setup.py b/python/services/requestservice/setup.py index 6eb45160e..694d136af 100644 --- a/python/services/requestservice/setup.py +++ b/python/services/requestservice/setup.py @@ -17,7 +17,7 @@ author_email='', url='', license='', - install_requires=['websockets', 'dmod-core>=0.16.0', 'dmod-communication>=0.19.1', 'dmod-access>=0.2.0', + install_requires=['websockets', 'dmod-core>=0.17.0', 'dmod-communication>=0.20.0', 'dmod-access>=0.2.0', 'dmod-externalrequests>=0.6.0'], packages=find_namespace_packages(exclude=['dmod.test', 'schemas', 'ssl', 'src']) ) diff --git a/python/services/schedulerservice/dmod/schedulerservice/_version.py b/python/services/schedulerservice/dmod/schedulerservice/_version.py index ff1e05dcb..a514f890f 100644 --- a/python/services/schedulerservice/dmod/schedulerservice/_version.py +++ b/python/services/schedulerservice/dmod/schedulerservice/_version.py @@ -1 +1 @@ -__version__ = '0.11.1' \ No newline at end of file +__version__ = '0.12.0' \ No newline at end of file diff --git a/python/services/schedulerservice/setup.py b/python/services/schedulerservice/setup.py index 9cb9eb60a..eddf3695c 100644 --- a/python/services/schedulerservice/setup.py +++ b/python/services/schedulerservice/setup.py @@ -17,6 +17,6 @@ author_email='', url='', license='', - install_requires=['dmod-core>=0.2.0', 'dmod-communication>=0.19.1', 'dmod-scheduler>=0.12.2'], + install_requires=['dmod-core>=0.17.0', 'dmod-communication>=0.20.1', 'dmod-scheduler>=0.13.0'], packages=find_namespace_packages(exclude=['dmod.test', 'deprecated', 'conf', 'schemas', 'ssl', 'src']) ) From af243c424073ccafbfcfc2bc3d2d0ca8196a8525 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 13 Jun 2024 11:11:01 -0400 Subject: [PATCH 07/11] Add description to main py_funcs.py parser. --- docker/main/ngen/py_funcs.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docker/main/ngen/py_funcs.py b/docker/main/ngen/py_funcs.py index c08de50e9..6183e4fd9 100644 --- a/docker/main/ngen/py_funcs.py +++ b/docker/main/ngen/py_funcs.py @@ -74,7 +74,8 @@ def _parse_args() -> argparse.Namespace: argparse.Namespace The parsed arguments namespace object. """ - parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, prog='py_funcs') + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, prog='py_funcs', + description="Run one of several Docker image entrypoint Python helper functions.") parser.add_argument('--log-level', '-L', dest='log_level', default=os.environ.get("DEFAULT_LOG_LEVEL", "INFO").upper(), choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"], help='Optionally specify log level.') From 6b37f28b761f30a9d2e4a3298cbee35bd3ddd93e Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 13 Jun 2024 17:18:35 -0400 Subject: [PATCH 08/11] Tweak py_funcs.gather_output SCP wait handling. --- docker/main/ngen/py_funcs.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/docker/main/ngen/py_funcs.py b/docker/main/ngen/py_funcs.py index 6183e4fd9..35ab90a60 100644 --- a/docker/main/ngen/py_funcs.py +++ b/docker/main/ngen/py_funcs.py @@ -138,10 +138,11 @@ def gather_output(mpi_host_names: List[str], output_write_dir: Path): for host in (h for h in mpi_host_names if h != local_hostname): scp_subs[host] = Popen(f"scp -q -r {host}:${output_write_dir!s}/ ${output_write_dir!s}/.") - for host in scp_subs: - return_code = scp_subs[host].wait() - if return_code != 0: - raise RuntimeError(f"{get_date_str()} gather_output failed for {host} w/ return code {return_code!s}!") + for host, process in scp_subs.items(): + _, error_in_bytes = process.communicate() + if process.returncode != 0: + raise RuntimeError(f"{get_date_str()} gather_output failed for '{host}' (code={process.returncode}): \n" + f"{error_in_bytes.decode()}") def get_date_str() -> str: From 6d3c685847ec51807423d19b6301a2414c5d626e Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 13 Jun 2024 17:25:51 -0400 Subject: [PATCH 09/11] Don't bother with OrderedDict in py_funcs.py. --- docker/main/ngen/py_funcs.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docker/main/ngen/py_funcs.py b/docker/main/ngen/py_funcs.py index 35ab90a60..7f8d83419 100644 --- a/docker/main/ngen/py_funcs.py +++ b/docker/main/ngen/py_funcs.py @@ -6,7 +6,6 @@ import shutil import tarfile -from collections import OrderedDict from datetime import datetime from pathlib import Path from subprocess import Popen @@ -133,7 +132,7 @@ def gather_output(mpi_host_names: List[str], output_write_dir: Path): from socket import gethostname local_hostname = gethostname() - scp_subs = OrderedDict() + scp_subs = dict() for host in (h for h in mpi_host_names if h != local_hostname): scp_subs[host] = Popen(f"scp -q -r {host}:${output_write_dir!s}/ ${output_write_dir!s}/.") From e643a96a512ed03eadf98afa00c4280875f36203 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 14 Jun 2024 11:33:11 -0400 Subject: [PATCH 10/11] Use RuntimeError for bad command in py_funcs.py. Switching to use this instead of ValueError if command arg doesn't match something supported by main(), and making message reflect the situation a little better. --- docker/main/ngen/py_funcs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/main/ngen/py_funcs.py b/docker/main/ngen/py_funcs.py index 7f8d83419..fcd300fae 100644 --- a/docker/main/ngen/py_funcs.py +++ b/docker/main/ngen/py_funcs.py @@ -303,7 +303,7 @@ def main(): elif args.command == 'move_job_output': move_job_output(**(vars(args))) else: - raise ValueError("Unsupported command {}".format(args.command)) + raise RuntimeError(f"Command arg '{args.command}' doesn't match a command supported by module's main function") if __name__ == '__main__': From 4d30a05421bd68ee9af217802c19e0875f57ed6f Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Fri, 14 Jun 2024 11:52:15 -0400 Subject: [PATCH 11/11] Use enum instead of optional bool for ternary. --- docker/main/ngen/py_funcs.py | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/docker/main/ngen/py_funcs.py b/docker/main/ngen/py_funcs.py index fcd300fae..676d7fbc0 100644 --- a/docker/main/ngen/py_funcs.py +++ b/docker/main/ngen/py_funcs.py @@ -7,11 +7,19 @@ import tarfile from datetime import datetime +from enum import Enum from pathlib import Path from subprocess import Popen from typing import Dict, List, Literal, Optional +class ArchiveStrategy(Enum): + """ Settings for whether something that may/can archive files should, should not, or should decide for itself. """ + FORCE = "force" + DISALLOW = "disallow" + DYNAMIC = "dynamic" + + def _apply_logging(log_level: Literal["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"]): logging.basicConfig( level=logging.getLevelName(log_level), @@ -53,11 +61,10 @@ def _parse_for_move_job_output(parent_subparsers_container): desc = "Move output data files produced by a job to another location, typically to put them into a DMOD dataset." helper_cmd_parser = parent_subparsers_container.add_parser('move_job_output', description=desc) helper_cmd_parser.add_argument('--job_id', '--job-id', dest='job_id', help='Optionally specify job id.') - helper_cmd_parser.add_argument('--archive-files', dest='do_archiving', choices=["true", "false"], - type=lambda s: True if s.lower == "true" else False, default=None, - help='Force archiving before moving job output.') + helper_cmd_parser.add_argument("--archiving", dest="archiving", default=ArchiveStrategy.DYNAMIC, + type=ArchiveStrategy, help="Set whether job output should be archived before moving") helper_cmd_parser.add_argument('output_directory', type=Path, - help='Source directory containing output files to be placed within the dataset.') + help='Source directory containing output files to be moved.') cmd_subparsers = helper_cmd_parser.add_subparsers(dest='move_action', help="Specify the type of move action.") cmd_subparsers.required = True @@ -155,7 +162,7 @@ def get_date_str() -> str: return datetime.now().strftime('%Y-%m-%d,%H:%M:%S') -def move_job_output(output_directory: Path, move_action: str, do_archiving: Optional[bool] = None, +def move_job_output(output_directory: Path, move_action: str, archiving: ArchiveStrategy = ArchiveStrategy.DYNAMIC, job_id: Optional[str] = None, **kwargs): """ Move output data files from a job from their initial directory to somewhere, depending on the CLI-given move action. @@ -167,8 +174,13 @@ def move_job_output(output_directory: Path, move_action: str, do_archiving: Opti Parameters ---------- output_directory + Source directory containing output files to be moved. move_action - do_archiving + The type of move action to be performed. + archiving + Strategy controlling whether job output should be archived before moving. + job_id + Optional job id, used as part of archive name when applicable. kwargs Other keyword args from the CLI specific to the particular move action to be performed. @@ -177,9 +189,8 @@ def move_job_output(output_directory: Path, move_action: str, do_archiving: Opti raise ValueError( f"{get_date_str()} Can't move job output from non-directory path {output_directory!s} to output dataset") - # If this was not set, dynamically determine what it should be - if do_archiving is None: + if archiving == ArchiveStrategy.DYNAMIC: # For now, just do this if the output data contains more than 100 individual files out_dir_files = [f for f in output_directory.glob("**/*")] out_dir_file_count = len(out_dir_files) @@ -189,7 +200,8 @@ def move_job_output(output_directory: Path, move_action: str, do_archiving: Opti logging.debug(f"List of files in {output_directory!s} (max first {max_displayed!s}): \n {displayed}") do_archiving = out_dir_file_count > 100 else: - logging.debug(f"Archiving parameter was set to {do_archiving!s}") + logging.debug(f"Archiving parameter was set to {archiving.name}") + do_archiving = archiving == ArchiveStrategy.FORCE assert do_archiving is not None