-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize worker behavior for object store writes #650
Optimize worker behavior for object store writes #650
Conversation
Adjusting to account for slowness of writes for output when output dataset directory is actually a mounted object store bucket (the default at the moment), such that we work in container-specific dirs and archive things before writing to such a place; also adding TODO comments to come back and add more general behavior later, when output datasets may be somewhere else.
Renaming NGEN_OUTPUT to NGEN_CSV_OUTPUT, introducing ARCHIVED_NGEN_CSV_OUTPUT format for when CSVs are bundled into tar file before being placed in dataset, and proactively add NGEN_NETCDF_OUTPUT format for eventual use; also, switch to ARCHIVED_NGEN_CSV_OUTPUT as current job default.
Relates to #637. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tar_and_copy and its related functions needs to be separated out into its own python script. There's not much keeping the commands tar_and_copy --dry-run --compress ${CONFIG_DATASET_DIR:?Config dataset directory not defined} config_dataset.tgz ${OUTPUT_DATASET_DIR:?}
and `./tar_and_copy.py --dry-run --compress ${CONFIG_DATASET_DIR:?Config dataset directory not defined} config_dataset.tgz ${OUTPUT_DATASET_DIR:?}' from yielding the same results.
docker/main/ngen/funcs.sh
Outdated
@@ -212,4 +212,95 @@ run_secondary_mpi_ssh_worker_node() | |||
while [ -e ${RUN_SENTINEL} ] && kill -s 0 "${_SSH_D_PID}" 2>/dev/null ; do | |||
sleep 5 | |||
done | |||
} | |||
|
|||
tar_and_copy() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sort of logic needs to be implemented via python rather than bash to reduce the size of large and confusing bash scripts and to make the logic easy to debug, modify, and extend. As it stands, if something goes awry, it's incredibly difficult to debug and fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So my initial thought was that it didn't make sense to try to do new things as separate pieces in Python. That it probably is better overall for these things to be done in Python, rather than shell scripts, but that really all the shell scripts (or at least groups that run together in any particular setting, like those involved in container entrypoints) should be ported all at once.
I'll rethink this a bit. I don't want to introduce something that's on the whole even more complicated to follow than this, and that was my worry. But maybe it won't be quite that bad.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might do exactly what you want: https://docs.python.org/3.8/library/shutil.html#shutil.make_archive
Converting the scripts to a bit at a time would make things easier. That way we can ensure that each bit works with as small a scope as possible, it'll be less of a burden, it won't take as long, and we'll be less likely to run into errors.
docker/main/ngen/funcs.sh
Outdated
>&2 echo "$(print_date) Error: tar_and_copy copy destination directory '${_DEST_DIR}' exist but is not a directory!" | ||
exit 1 | ||
elif [ -e "${_TAR_FILE}" ]; then | ||
>&2 echo "$(print_date) Error: tar_and_copy archive file '${_TAR_FILE}' already exists!" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sort of error makes rerunning commands impossible. dumping data to the same location is a reasonable use case even if it's not something we want in a prod-like setting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a vacuum, yes, but this is specifically intended to be run inside a fresh container. So there really should never be anything there already. Chances are that if there is, something is going wrong.
@classmethod | ||
def can_format_fulfill(cls, needed: DataFormat, alternate: DataFormat) -> bool: | ||
""" | ||
Test whether data in an alternate format is capable of satisfying requirements of some other format. | ||
Test whether a dataset and contained data in some format can satisfy requirements of a different format. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Props to the attention to documentation
@@ -787,11 +787,18 @@ def _create_output_datasets(self, job: Job): | |||
break | |||
|
|||
# TODO: (later) more intelligently determine type | |||
mgr = self._managers.manager(DatasetType.OBJECT_STORE) | |||
dataset_type = DatasetType.OBJECT_STORE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is waaaay better than it was. Good job!
docker/main/ngen/funcs.sh
Outdated
|
||
if [ $(ls ${1} | grep '.csv' | wc -l) -gt 0 ]; then | ||
echo "$(print_date) Archiving and copying output CSVs to output dataset" | ||
tar_and_copy ${1} job-${JOB_ID:?}-output.tar ${2} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason not to use --compress
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think because the goal was to reduce practical running time by reducing transferred file count specifically. The size of the data was not of tremendous importance. Compression would slow things, both now and later. In contrast, since the configs were effectively duplicated here, and not the primary content of the output dataset, it made more sense to compress that data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to @christophertubbs's above comment about using python for example. While out of scope for this PR, we should really consider rewriting this entry point and related helper functions in something other than POSIX compliant shell.
echo "Error: NGEN calibration yaml file not found" 2>&1 | ||
exit 1 | ||
fi | ||
cp -a ${_ORIG_CAL_CONFIG_FILE:?} ${OUTPUT_DATASET_DIR:?}/. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Marking this as related to #407. I am not sure where we should document the expectations / spec for a worker image's data requirements, but this feels like a prime candidate for that sort of documentation. Just want to track this b.c. it will affect how we need to normalize paths.
tar_and_copy --dry-run --compress ${CONFIG_DATASET_DIR:?Config dataset directory not defined} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} | ||
# Then actually run the archive and copy function in the background | ||
tar_and_copy --compress ${CONFIG_DATASET_DIR:?} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} & | ||
_CONFIG_COPY_PROC=$! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we wait
this after start_calibration
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am mostly kicking that can down the road, but practically I don't think we will have to. But this image is going to need extensive test regardless, and we can sort out any issues related to this then.
@@ -311,7 +311,7 @@ def output_formats(self) -> List[DataFormat]: | |||
List[DataFormat] | |||
List of the formats of each required output dataset for the requested job. | |||
""" | |||
return [DataFormat.NGEN_OUTPUT] | |||
return [DataFormat.ARCHIVED_NGEN_CSV_OUTPUT] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about the output from NextGen, lets say that csv
output is what is configured, troute
(and ngen-cal
) output parquet
files. How does that factor into the DataDomain
of an output dataset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An interesting question. Probably something that deserves its own issue, as it isn't well defined at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be simpler than I thought initially (although we can still consider giving it its own issue if more conversation is needed).
First, it is not possible to completely, explicitly define an ngen output data format that covers all possible output variables. I'm not going to work through it entirely right now, but my back-of-the-envelop math says ngen may actually support uncountably infinitely many possible combinations of outputs, in theory at least. Regardless, too many to cover.
And so, we don't try to. While some DataFormat values do define data_fields
, the ngen output formats do not. The implication is that we cannot necessarily completely summarize what could be in there, so users of the data need to determine that for themselves.
A format does have to enable domains to elaborate on certain things, though, via the standard indices. So it is useful (perhaps necessary) to distinguish particulars of how the indices data are available within the dataset, and this is done implicitly by the format and its name. E.g., for ARCHIVED_NGEN_CSV_OUTPUT
, there will be a bunch of CSV inside an archive, with catchment ids in the names of the CSVs and times (and other data) inside the CSVs.
So, it is still useful to have ngen-related output formats that imply only part of how data are structured within such datasets, even if it doesn't (because it can't) tell us everything about what's in there.
Bumping versions of dmod-core and dmod-communication for direct changes, and then bumping other library and service packages to use the updated versions for their dependencies (along with bumping their own versions).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still exploring some functionality that I'm not sure about with the python ArgumentParser. I may have further comments later
docker/main/ngen/py_funcs.py
Outdated
argparse.Namespace | ||
The parsed arguments namespace object. | ||
""" | ||
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, prog='py_funcs') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure to include a description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I've made another change and added one.
The tests that are failing seem to be in the data service tests and related (somehow) to the recent move to REST:
I find this peculiar, since there have been many PRs since #575, so surely that module was present during Github Action checks for the PRs between that and this. I'd feel like we could probably disregard it here. @aaraney and @christophertubbs, any thoughts on either the disregarding or root cause here? |
It might be worth putting a reference to fastapi in the requirements as a cya manuever, but I'm not sure if that's a big enough reason to be a huge blocker as long as an "Urgent"/"Low Hanging Fruit" ticket is added for it. Did that cause the other issues where programs didn't know what |
docker/main/ngen/py_funcs.py
Outdated
else: | ||
raise ValueError("Unsupported command {}".format(args.command)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition is unneccessary - ArgumentParser
will throw an error and print the usage statement if a command other than one for a registered subparser is given
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strictly speaking, yes, but I like this for future proofing. I'm one to occasionally forget the "connecting wiring" between things, like adding the lines in main
between the arg parsing for a new function and the function implementation itself.
Would it be more appropriate if I changed this to NotImplementedError
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah - NotImplementedError
is more appropriate here. Adding a whole sub parser and not handling it in main
is a big no-no and should be caught in PR, but this is perhaps the only way to catch a mess up of that magnitude.
That scenario is also why subparsers should be avoided, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eh, actually, I went and read the documentation for NotImplementedError after suggesting it. I'm not sure it's more appropriate here:
In user defined base classes, abstract methods should raise this exception when they require derived classes to override the method, or while the class is being developed to indicate that the real implementation still needs to be added.
Aside from outright forgetting, raising some kind of exception here also protects against typos in the compared-to literal strings in the above expressions; otherwise, things would just silently not run (but also not actually fail). I've switched this to a RuntimeError and modified the message a bit.
docker/main/ngen/py_funcs.py
Outdated
for host in scp_subs: | ||
return_code = scp_subs[host].wait() | ||
if return_code != 0: | ||
raise RuntimeError(f"{get_date_str()} gather_output failed for {host} w/ return code {return_code!s}!") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try the following instead:
for host, process in scp_subs.items():
_, error_in_bytes = process.communicate()
if process.returncode != 0:
raise RuntimeError(f"{get_date_str()} gather_output failed for '{host}': {error_in_bytes.decode()} (code={process.returncode}!")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a slight tweak to the error message for line formatting reasons but essentially adopted your suggestion.
Switching to use this instead of ValueError if command arg doesn't match something supported by main(), and making message reflect the situation a little better.
@christophertubbs and @aaraney, are there still some requested changes outstanding for this? |
Looks solid to me. I removed my flag for requested changes. If Austin is good, I'm good. |
Optimizing workers for object store writes (i.e., the only current dataset backing type), essentially by having them archive large collections of files to be written - e.g. output CSVs - and only write these archives - i.e., minimal individuals files to the object store.