-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize worker behavior for object store writes #650
Changes from 3 commits
7565ee7
9a05f15
e38df7a
b667dd6
f93fef9
874589f
af243c4
6b37f28
6d3c685
e643a96
4d30a05
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -212,4 +212,95 @@ run_secondary_mpi_ssh_worker_node() | |
while [ -e ${RUN_SENTINEL} ] && kill -s 0 "${_SSH_D_PID}" 2>/dev/null ; do | ||
sleep 5 | ||
done | ||
} | ||
|
||
tar_and_copy() | ||
{ | ||
# If $1 is "--dry-run" then just do sanity checks without tarring or copying, then shift args | ||
# If $1 is "--compress", then indicate that tar should be gzipped, then shift args | ||
|
||
# $1 is source directory containing contents to archive | ||
# $2 is the name of/path to the produced tar archive | ||
# $3 is the location to copy to | ||
|
||
if [ "${1:?No args given to tar_and_copy}" == "--dry-run" ]; then | ||
local _DRY_RUN="true" | ||
shift | ||
fi | ||
|
||
if [ "${1:?No contents directory given to tar_and_copy}" == "--compress" ]; then | ||
local _TAR_EXTRA_ARGS="-z" | ||
shift | ||
fi | ||
|
||
local _CONTENTS_DIR="${1:?No contents directory given to tar_and_copy}" | ||
local _TAR_FILE="${2:?No archive file given to tar_and_copy}" | ||
local _DEST_DIR="${3:?No copy destination directory given to tar_and_copy}" | ||
|
||
if [ ! -e "${_CONTENTS_DIR}" ]; then | ||
>&2 echo "$(print_date) Error: tar_and_copy contents directory '${_CONTENTS_DIR}' does not exist!" | ||
exit 1 | ||
elif [ ! -d "${_CONTENTS_DIR}" ]; then | ||
>&2 echo "$(print_date) Error: tar_and_copy contents directory '${_CONTENTS_DIR}' exists but is not a directory!" | ||
exit 1 | ||
elif [ ! -e "${_DEST_DIR}" ]; then | ||
>&2 echo "$(print_date) Error: tar_and_copy copy destination directory '${_DEST_DIR}' does not exist!" | ||
exit 1 | ||
elif [ ! -e "${_DEST_DIR}" ]; then | ||
>&2 echo "$(print_date) Error: tar_and_copy copy destination directory '${_DEST_DIR}' exist but is not a directory!" | ||
exit 1 | ||
elif [ -e "${_TAR_FILE}" ]; then | ||
>&2 echo "$(print_date) Error: tar_and_copy archive file '${_TAR_FILE}' already exists!" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This sort of error makes rerunning commands impossible. dumping data to the same location is a reasonable use case even if it's not something we want in a prod-like setting. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In a vacuum, yes, but this is specifically intended to be run inside a fresh container. So there really should never be anything there already. Chances are that if there is, something is going wrong. |
||
exit 1 | ||
fi | ||
|
||
if [ "${_DRY_RUN:-}" == "true" ]; then | ||
return 0 | ||
fi | ||
|
||
tar -c ${_TAR_EXTRA_ARGS:-} -f "${_DEST_DIR}/${_TAR_FILE}" -C "${_CONTENTS_DIR}" . | ||
#cp -a "${_TAR_FILE}" "${_DEST_DIR}/." | ||
#rm "${_TAR_FILE}" | ||
} | ||
|
||
gather_output() { | ||
echo "$(print_date) Gather from remote worker host ${JOB_OUTPUT_WRITE_DIR:?Job temp output dir not defined} dirs" | ||
for i in $(echo "${MPI_HOST_STRING}" | sed 's/,/ /g'); do | ||
_HOST_NAME=$(echo "${i}" | awk -F: '{print $1}') | ||
if [ "$(hostname)" == "${_HOST_NAME}" ]; then | ||
continue | ||
fi | ||
scp -q -r ${_HOST_NAME}:${JOB_OUTPUT_WRITE_DIR}/ ${JOB_OUTPUT_WRITE_DIR}/. & | ||
done | ||
for p in $(jobs -p); do | ||
wait ${p} | ||
_R=$? | ||
if [ ${_R} -ne 0 ]; then | ||
echo "$(print_date) Error: remote copying of output exited with error ${_R}" | ||
exit ${_R} | ||
fi | ||
done | ||
} | ||
|
||
move_output_to_dataset() | ||
{ | ||
# $1 output directory | ||
# $2 dataset directory | ||
|
||
if [ ! -d ${1:?No output directory given for copying to dataset} ]; then | ||
>&2 echo "$(print_date) Error: cannot move output from non-directory path '${1}' to output dataset!" | ||
exit 1 | ||
elif [ ! -d ${2:?No output dataset directory given for copying} ]; then | ||
>&2 echo "$(print_date) Error: cannot move output to non-directory path '${1}' for output dataset!" | ||
exit 1 | ||
fi | ||
|
||
if [ $(ls ${1} | grep '.csv' | wc -l) -gt 0 ]; then | ||
echo "$(print_date) Archiving and copying output CSVs to output dataset" | ||
tar_and_copy ${1} job-${JOB_ID:?}-output.tar ${2} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason not to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think because the goal was to reduce practical running time by reducing transferred file count specifically. The size of the data was not of tremendous importance. Compression would slow things, both now and later. In contrast, since the configs were effectively duplicated here, and not the primary content of the output dataset, it made more sense to compress that data. |
||
else | ||
echo "$(print_date) Copying output file(s) to output dataset" | ||
cp -a ${1}/. ${2}/. | ||
fi | ||
rm -rf ${1} | ||
} |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to @christophertubbs's above comment about using python for example. While out of scope for this PR, we should really consider rewriting this entry point and related helper functions in something other than POSIX compliant shell. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,54 +4,65 @@ | |
while [ ${#} -gt 0 ]; do | ||
case "${1}" in | ||
--config-dataset) | ||
CONFIG_DATASET_NAME="${2:?}" | ||
declare -x CONFIG_DATASET_NAME="${2:?}" | ||
shift | ||
;; | ||
--host-string) | ||
MPI_HOST_STRING="${2:?}" | ||
declare -x MPI_HOST_STRING="${2:?}" | ||
shift | ||
;; | ||
--hydrofabric-dataset) | ||
HYDROFABRIC_DATASET_NAME="${2:?}" | ||
declare -x HYDROFABRIC_DATASET_NAME="${2:?}" | ||
shift | ||
;; | ||
--job-id) | ||
JOB_ID="${2:?}" | ||
declare -x JOB_ID="${2:?}" | ||
shift | ||
;; | ||
--node-count) | ||
MPI_NODE_COUNT="${2:?}" | ||
declare -x MPI_NODE_COUNT="${2:?}" | ||
shift | ||
;; | ||
--output-dataset) | ||
OUTPUT_DATASET_NAME="${2:?}" | ||
declare -x OUTPUT_DATASET_NAME="${2:?}" | ||
shift | ||
;; | ||
--partition-dataset) | ||
PARTITION_DATASET_NAME="${2:?}" | ||
declare -x PARTITION_DATASET_NAME="${2:?}" | ||
shift | ||
;; | ||
--worker-index) | ||
WORKER_INDEX="${2:?}" | ||
declare -x WORKER_INDEX="${2:?}" | ||
shift | ||
;; | ||
--calibration-config-file) | ||
CALIBRATION_CONFIG_BASENAME="${2:?}" | ||
declare -x CALIBRATION_CONFIG_BASENAME="${2:?}" | ||
shift | ||
;; | ||
esac | ||
shift | ||
done | ||
|
||
# TODO: (later) in both ngen and ngen-cal entrypoints, add controls for whether this is temp dir or output dataset dir | ||
declare -x JOB_OUTPUT_WRITE_DIR="/tmp/job_output" | ||
|
||
# Get some universally applicable functions and constants | ||
source ./funcs.sh | ||
|
||
ngen_sanity_checks_and_derived_init | ||
init_script_mpi_vars | ||
init_ngen_executable_paths | ||
|
||
# Move to the output dataset mounted directory | ||
cd ${OUTPUT_DATASET_DIR:?Output dataset directory not defined} | ||
# Move to the output write directory | ||
# TODO: (later) in both ngen and ngen-cal entrypoints, control whether this is needed, based on if write dir is output dataset dir | ||
#cd ${OUTPUT_DATASET_DIR:?Output dataset directory not defined} | ||
mkdir ${JOB_OUTPUT_WRITE_DIR:?} | ||
chown ${MPI_USER}:${MPI_USER} ${JOB_OUTPUT_WRITE_DIR} | ||
cd ${JOB_OUTPUT_WRITE_DIR} | ||
#Needed for routing | ||
if [ ! -e /dmod/datasets/linked_job_output ]; then | ||
ln -s ${JOB_OUTPUT_WRITE_DIR} /dmod/datasets/linked_job_output | ||
fi | ||
|
||
start_calibration() { | ||
# Start ngen calibration | ||
|
@@ -61,24 +72,31 @@ start_calibration() { | |
echo "$(print_date) Starting ngen calibration with serial ngen execution" | ||
fi | ||
|
||
# Find and use copy of config in output dataset | ||
# Find calibration config, then copy to output dataset and use that | ||
if [ -n "${CALIBRATION_CONFIG_BASENAME:-}" ]; then | ||
CALIBRATION_CONFIG_FILE=$(find ${OUTPUT_DATASET_DIR:?} -type f -name "${CALIBRATION_CONFIG_BASENAME}" -maxdepth 1 | head -1) | ||
#CALIBRATION_CONFIG_FILE=$(find ${OUTPUT_DATASET_DIR:?} -type f -name "${CALIBRATION_CONFIG_BASENAME}" -maxdepth 1 | head -1) | ||
_ORIG_CAL_CONFIG_FILE=$(find ${CONFIG_DATASET_DIR:?} -type f -name "${CALIBRATION_CONFIG_BASENAME}" -maxdepth 1 | head -1) | ||
else | ||
CALIBRATION_CONFIG_FILE=$(find ${OUTPUT_DATASET_DIR:?} -type f -iname "*.yaml" -o -iname "*.yml" -maxdepth 1 | head -1) | ||
#CALIBRATION_CONFIG_FILE=$(find ${OUTPUT_DATASET_DIR:?} -type f -iname "*.yaml" -o -iname "*.yml" -maxdepth 1 | head -1) | ||
_ORIG_CAL_CONFIG_FILE=$(find ${CONFIG_DATASET_DIR:?} -type f -iname "*.yaml" -o -iname "*.yml" -maxdepth 1 | head -1) | ||
fi | ||
|
||
if [ -z "${CALIBRATION_CONFIG_FILE}" ]; then | ||
if [ -z "${_ORIG_CAL_CONFIG_FILE}" ]; then | ||
echo "Error: NGEN calibration yaml file not found" 2>&1 | ||
exit 1 | ||
fi | ||
cp -a ${_ORIG_CAL_CONFIG_FILE:?} ${OUTPUT_DATASET_DIR:?}/. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Marking this as related to #407. I am not sure where we should document the expectations / spec for a worker image's data requirements, but this feels like a prime candidate for that sort of documentation. Just want to track this b.c. it will affect how we need to normalize paths. |
||
CALIBRATION_CONFIG_FILE="${OUTPUT_DATASET_DIR:?}/$(basename ${_ORIG_CAL_CONFIG_FILE})" | ||
|
||
python3 -m ngen.cal "${CALIBRATION_CONFIG_FILE}" | ||
|
||
#Capture the return value to use as service exit code | ||
NGEN_RETURN=$? | ||
|
||
echo "$(print_date) ngen calibration finished with return value: ${NGEN_RETURN}" | ||
|
||
# TODO: (later) make sure outputs are handled properly, and that eventually we support toggling whether written to | ||
# TODO: output dataset dir directly or somewhere else | ||
|
||
# Exit with the model's exit code | ||
return ${NGEN_RETURN} | ||
} | ||
|
@@ -89,11 +107,16 @@ if [ "${WORKER_INDEX:-0}" = "0" ]; then | |
# This will only have an effect when running with multiple MPI nodes, so its safe to have even in serial exec | ||
trap close_remote_workers EXIT | ||
# Have "main" (potentially only) worker copy config files to output dataset for record keeping | ||
# TODO: perform copy of configs to output dataset outside of image (in service) for better performance | ||
cp -a ${CONFIG_DATASET_DIR:?Config dataset directory not defined}/. ${OUTPUT_DATASET_DIR:?} | ||
# TODO: (later) in ngen and ngen-cal entrypoints, consider adding controls for whether this is done or a simpler | ||
# TODO: 'cp' call, based on whether we write directly to output dataset dir or some other output write dir | ||
# Do a dry run first to sanity check directory and fail if needed before backgrounding process | ||
tar_and_copy --dry-run --compress ${CONFIG_DATASET_DIR:?Config dataset directory not defined} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} | ||
# Then actually run the archive and copy function in the background | ||
tar_and_copy --compress ${CONFIG_DATASET_DIR:?} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} & | ||
_CONFIG_COPY_PROC=$! | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't we There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am mostly kicking that can down the road, but practically I don't think we will have to. But this image is going to need extensive test regardless, and we can sort out any issues related to this then. |
||
# If there is partitioning, which implies multi-processing job ... | ||
if [ -n "${PARTITION_DATASET_DIR:-}" ]; then | ||
# Include partition config dataset too if appropriate | ||
# TODO: perform copy of configs to output dataset outside of image (in service) for better performance | ||
cp -a ${PARTITION_DATASET_DIR}/. ${OUTPUT_DATASET_DIR:?} | ||
fi | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -311,7 +311,7 @@ def output_formats(self) -> List[DataFormat]: | |
List[DataFormat] | ||
List of the formats of each required output dataset for the requested job. | ||
""" | ||
return [DataFormat.NGEN_OUTPUT] | ||
return [DataFormat.ARCHIVED_NGEN_CSV_OUTPUT] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thinking about the output from NextGen, lets say that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An interesting question. Probably something that deserves its own issue, as it isn't well defined at the moment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This may be simpler than I thought initially (although we can still consider giving it its own issue if more conversation is needed). First, it is not possible to completely, explicitly define an ngen output data format that covers all possible output variables. I'm not going to work through it entirely right now, but my back-of-the-envelop math says ngen may actually support uncountably infinitely many possible combinations of outputs, in theory at least. Regardless, too many to cover. And so, we don't try to. While some DataFormat values do define A format does have to enable domains to elaborate on certain things, though, via the standard indices. So it is useful (perhaps necessary) to distinguish particulars of how the indices data are available within the dataset, and this is done implicitly by the format and its name. E.g., for So, it is still useful to have ngen-related output formats that imply only part of how data are structured within such datasets, even if it doesn't (because it can't) tell us everything about what's in there. |
||
|
||
@property | ||
def partition_cfg_data_id(self) -> Optional[str]: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -118,11 +118,11 @@ class DataFormat(PydanticEnum): | |
True | ||
) | ||
""" The default format for "raw" AORC forcing data. """ | ||
NGEN_OUTPUT = (3, | ||
{StandardDatasetIndex.CATCHMENT_ID: None, StandardDatasetIndex.TIME: None, StandardDatasetIndex.DATA_ID: None}, | ||
None, | ||
True) | ||
""" Representation of the format for Nextgen output, with unknown/unspecified configuration of output fields. """ | ||
NGEN_CSV_OUTPUT = (3, | ||
{StandardDatasetIndex.CATCHMENT_ID: None, StandardDatasetIndex.TIME: None, StandardDatasetIndex.DATA_ID: None}, | ||
None, | ||
True) | ||
""" Format for output of ngen when written as CSV, with unknown/unspecified configuration of output fields. """ | ||
NGEN_REALIZATION_CONFIG = ( | ||
4, {StandardDatasetIndex.CATCHMENT_ID: None, StandardDatasetIndex.TIME: None, StandardDatasetIndex.DATA_ID: None}, None, True) | ||
""" Representation of the format of realization configs, which covers catchments (id) has a time period (time). """ | ||
|
@@ -221,15 +221,28 @@ class DataFormat(PydanticEnum): | |
is removed). | ||
""" | ||
|
||
ARCHIVED_NGEN_CSV_OUTPUT = (17, | ||
{StandardDatasetIndex.CATCHMENT_ID: None, StandardDatasetIndex.TIME: None, StandardDatasetIndex.DATA_ID: None}, | ||
None, | ||
True) | ||
""" Format for output of ngen, similar to ``NGEN_CSV_OUTPUT``, but with all output archived to single tar file. """ | ||
|
||
NGEN_NETCDF_OUTPUT = (18, | ||
{StandardDatasetIndex.CATCHMENT_ID: None, StandardDatasetIndex.TIME: None, | ||
StandardDatasetIndex.DATA_ID: None}, | ||
None, | ||
True) | ||
""" Format for output of ngen when written to single NetCDF file, with dynamically configured output fields. """ | ||
|
||
@classmethod | ||
def can_format_fulfill(cls, needed: DataFormat, alternate: DataFormat) -> bool: | ||
""" | ||
Test whether data in an alternate format is capable of satisfying requirements of some other format. | ||
Test whether a dataset and contained data in some format can satisfy requirements of a different format. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Props to the attention to documentation |
||
|
||
This function indicates whether data in one format (the alternate format) is compatible with requirements | ||
specified using a different format (the needed format). It is an indication of whether data is **potentially** | ||
capable of satisfying a requirement - even if the data formats of the two are not the same - due to the two | ||
formats being sufficiently similar. | ||
This function indicates whether a hypothetical dataset and its data, having some particular format (the | ||
alternate format) is compatible with hypothical requirements specified using a different format (the needed | ||
format). It is an indication of whether a dataset and its data are **potentially** capable of satisfying a | ||
requirement, even with a different format, due to the two formats being sufficiently similar. | ||
|
||
For example, the NextGen framework can support forcings in either CSV or NetCDF formats, represented as | ||
``AORC_CSV`` and ``NETCDF_FORCING_CANONICAL`` respectively. A job to execute NextGen would include a forcing | ||
|
@@ -264,7 +277,12 @@ def can_format_fulfill(cls, needed: DataFormat, alternate: DataFormat) -> bool: | |
compatible_forcing_formats = {cls.AORC_CSV, cls.NETCDF_FORCING_CANONICAL, cls.NETCDF_AORC_DEFAULT} | ||
if needed in compatible_forcing_formats and alternate in compatible_forcing_formats: | ||
return True | ||
# Anything else, they are compatible | ||
|
||
ngen_csv_output_formats = {cls.ARCHIVED_NGEN_CSV_OUTPUT, cls.NGEN_CSV_OUTPUT} | ||
if needed in ngen_csv_output_formats and alternate in ngen_csv_output_formats: | ||
return True | ||
|
||
# Anything else, they are not compatible | ||
return False | ||
|
||
@classmethod | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sort of logic needs to be implemented via python rather than bash to reduce the size of large and confusing bash scripts and to make the logic easy to debug, modify, and extend. As it stands, if something goes awry, it's incredibly difficult to debug and fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So my initial thought was that it didn't make sense to try to do new things as separate pieces in Python. That it probably is better overall for these things to be done in Python, rather than shell scripts, but that really all the shell scripts (or at least groups that run together in any particular setting, like those involved in container entrypoints) should be ported all at once.
I'll rethink this a bit. I don't want to introduce something that's on the whole even more complicated to follow than this, and that was my worry. But maybe it won't be quite that bad.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might do exactly what you want: https://docs.python.org/3.8/library/shutil.html#shutil.make_archive
Converting the scripts to a bit at a time would make things easier. That way we can ensure that each bit works with as small a scope as possible, it'll be less of a burden, it won't take as long, and we'll be less likely to run into errors.