Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize worker behavior for object store writes #650

Merged

Conversation

robertbartel
Copy link
Contributor

Optimizing workers for object store writes (i.e., the only current dataset backing type), essentially by having them archive large collections of files to be written - e.g. output CSVs - and only write these archives - i.e., minimal individuals files to the object store.

Adjusting to account for slowness of writes for output when output
dataset directory is actually a mounted object store bucket (the default
at the moment), such that we work in container-specific dirs and archive
things before writing to such a place; also adding TODO comments to come
back and add more general behavior later, when output datasets may be
somewhere else.
Renaming NGEN_OUTPUT to NGEN_CSV_OUTPUT, introducing
ARCHIVED_NGEN_CSV_OUTPUT format for when CSVs are bundled into tar file
before being placed in dataset, and proactively add NGEN_NETCDF_OUTPUT
format for eventual use; also, switch to ARCHIVED_NGEN_CSV_OUTPUT as
current job default.
@robertbartel robertbartel added enhancement New feature or request maas MaaS Workstream labels Jun 6, 2024
@robertbartel
Copy link
Contributor Author

Relates to #637.

Copy link
Contributor

@christophertubbs christophertubbs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tar_and_copy and its related functions needs to be separated out into its own python script. There's not much keeping the commands tar_and_copy --dry-run --compress ${CONFIG_DATASET_DIR:?Config dataset directory not defined} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} and `./tar_and_copy.py --dry-run --compress ${CONFIG_DATASET_DIR:?Config dataset directory not defined} config_dataset.tgz ${OUTPUT_DATASET_DIR:?}' from yielding the same results.

@@ -212,4 +212,95 @@ run_secondary_mpi_ssh_worker_node()
while [ -e ${RUN_SENTINEL} ] && kill -s 0 "${_SSH_D_PID}" 2>/dev/null ; do
sleep 5
done
}

tar_and_copy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sort of logic needs to be implemented via python rather than bash to reduce the size of large and confusing bash scripts and to make the logic easy to debug, modify, and extend. As it stands, if something goes awry, it's incredibly difficult to debug and fix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So my initial thought was that it didn't make sense to try to do new things as separate pieces in Python. That it probably is better overall for these things to be done in Python, rather than shell scripts, but that really all the shell scripts (or at least groups that run together in any particular setting, like those involved in container entrypoints) should be ported all at once.

I'll rethink this a bit. I don't want to introduce something that's on the whole even more complicated to follow than this, and that was my worry. But maybe it won't be quite that bad.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might do exactly what you want: https://docs.python.org/3.8/library/shutil.html#shutil.make_archive

Converting the scripts to a bit at a time would make things easier. That way we can ensure that each bit works with as small a scope as possible, it'll be less of a burden, it won't take as long, and we'll be less likely to run into errors.

>&2 echo "$(print_date) Error: tar_and_copy copy destination directory '${_DEST_DIR}' exist but is not a directory!"
exit 1
elif [ -e "${_TAR_FILE}" ]; then
>&2 echo "$(print_date) Error: tar_and_copy archive file '${_TAR_FILE}' already exists!"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sort of error makes rerunning commands impossible. dumping data to the same location is a reasonable use case even if it's not something we want in a prod-like setting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a vacuum, yes, but this is specifically intended to be run inside a fresh container. So there really should never be anything there already. Chances are that if there is, something is going wrong.

@classmethod
def can_format_fulfill(cls, needed: DataFormat, alternate: DataFormat) -> bool:
"""
Test whether data in an alternate format is capable of satisfying requirements of some other format.
Test whether a dataset and contained data in some format can satisfy requirements of a different format.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Props to the attention to documentation

@@ -787,11 +787,18 @@ def _create_output_datasets(self, job: Job):
break

# TODO: (later) more intelligently determine type
mgr = self._managers.manager(DatasetType.OBJECT_STORE)
dataset_type = DatasetType.OBJECT_STORE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is waaaay better than it was. Good job!


if [ $(ls ${1} | grep '.csv' | wc -l) -gt 0 ]; then
echo "$(print_date) Archiving and copying output CSVs to output dataset"
tar_and_copy ${1} job-${JOB_ID:?}-output.tar ${2}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to use --compress here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think because the goal was to reduce practical running time by reducing transferred file count specifically. The size of the data was not of tremendous importance. Compression would slow things, both now and later. In contrast, since the configs were effectively duplicated here, and not the primary content of the output dataset, it made more sense to compress that data.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to @christophertubbs's above comment about using python for example. While out of scope for this PR, we should really consider rewriting this entry point and related helper functions in something other than POSIX compliant shell.

echo "Error: NGEN calibration yaml file not found" 2>&1
exit 1
fi
cp -a ${_ORIG_CAL_CONFIG_FILE:?} ${OUTPUT_DATASET_DIR:?}/.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Marking this as related to #407. I am not sure where we should document the expectations / spec for a worker image's data requirements, but this feels like a prime candidate for that sort of documentation. Just want to track this b.c. it will affect how we need to normalize paths.

tar_and_copy --dry-run --compress ${CONFIG_DATASET_DIR:?Config dataset directory not defined} config_dataset.tgz ${OUTPUT_DATASET_DIR:?}
# Then actually run the archive and copy function in the background
tar_and_copy --compress ${CONFIG_DATASET_DIR:?} config_dataset.tgz ${OUTPUT_DATASET_DIR:?} &
_CONFIG_COPY_PROC=$!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we wait this after start_calibration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am mostly kicking that can down the road, but practically I don't think we will have to. But this image is going to need extensive test regardless, and we can sort out any issues related to this then.

@@ -311,7 +311,7 @@ def output_formats(self) -> List[DataFormat]:
List[DataFormat]
List of the formats of each required output dataset for the requested job.
"""
return [DataFormat.NGEN_OUTPUT]
return [DataFormat.ARCHIVED_NGEN_CSV_OUTPUT]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about the output from NextGen, lets say that csv output is what is configured, troute (and ngen-cal) output parquet files. How does that factor into the DataDomain of an output dataset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An interesting question. Probably something that deserves its own issue, as it isn't well defined at the moment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be simpler than I thought initially (although we can still consider giving it its own issue if more conversation is needed).

First, it is not possible to completely, explicitly define an ngen output data format that covers all possible output variables. I'm not going to work through it entirely right now, but my back-of-the-envelop math says ngen may actually support uncountably infinitely many possible combinations of outputs, in theory at least. Regardless, too many to cover.

And so, we don't try to. While some DataFormat values do define data_fields, the ngen output formats do not. The implication is that we cannot necessarily completely summarize what could be in there, so users of the data need to determine that for themselves.

A format does have to enable domains to elaborate on certain things, though, via the standard indices. So it is useful (perhaps necessary) to distinguish particulars of how the indices data are available within the dataset, and this is done implicitly by the format and its name. E.g., for ARCHIVED_NGEN_CSV_OUTPUT, there will be a bunch of CSV inside an archive, with catchment ids in the names of the CSVs and times (and other data) inside the CSVs.

So, it is still useful to have ngen-related output formats that imply only part of how data are structured within such datasets, even if it doesn't (because it can't) tell us everything about what's in there.

Bumping versions of dmod-core and dmod-communication for direct changes,
and then bumping other library and service packages to use the updated
versions for their dependencies (along with bumping their own versions).
Copy link
Contributor

@christophertubbs christophertubbs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still exploring some functionality that I'm not sure about with the python ArgumentParser. I may have further comments later

docker/main/ngen/py_funcs.py Show resolved Hide resolved
docker/main/ngen/py_funcs.py Outdated Show resolved Hide resolved
argparse.Namespace
The parsed arguments namespace object.
"""
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter, prog='py_funcs')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure to include a description.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I've made another change and added one.

@robertbartel
Copy link
Contributor Author

The tests that are failing seem to be in the data service tests and related (somehow) to the recent move to REST:

ModuleNotFoundError: No module named 'fastapi'

I find this peculiar, since there have been many PRs since #575, so surely that module was present during Github Action checks for the PRs between that and this. fastapi is included in install_requires within python/services/dataservice/setup.py, though it isn't in requirements.txt.

I'd feel like we could probably disregard it here. @aaraney and @christophertubbs, any thoughts on either the disregarding or root cause here?

@christophertubbs
Copy link
Contributor

It might be worth putting a reference to fastapi in the requirements as a cya manuever, but I'm not sure if that's a big enough reason to be a huge blocker as long as an "Urgent"/"Low Hanging Fruit" ticket is added for it.

Did that cause the other issues where programs didn't know what dmod was?

Comment on lines 305 to 306
else:
raise ValueError("Unsupported command {}".format(args.command))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition is unneccessary - ArgumentParser will throw an error and print the usage statement if a command other than one for a registered subparser is given

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly speaking, yes, but I like this for future proofing. I'm one to occasionally forget the "connecting wiring" between things, like adding the lines in main between the arg parsing for a new function and the function implementation itself.

Would it be more appropriate if I changed this to NotImplementedError?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - NotImplementedError is more appropriate here. Adding a whole sub parser and not handling it in main is a big no-no and should be caught in PR, but this is perhaps the only way to catch a mess up of that magnitude.

That scenario is also why subparsers should be avoided, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh, actually, I went and read the documentation for NotImplementedError after suggesting it. I'm not sure it's more appropriate here:

In user defined base classes, abstract methods should raise this exception when they require derived classes to override the method, or while the class is being developed to indicate that the real implementation still needs to be added.

Aside from outright forgetting, raising some kind of exception here also protects against typos in the compared-to literal strings in the above expressions; otherwise, things would just silently not run (but also not actually fail). I've switched this to a RuntimeError and modified the message a bit.

docker/main/ngen/py_funcs.py Outdated Show resolved Hide resolved
docker/main/ngen/py_funcs.py Outdated Show resolved Hide resolved
Comment on lines 141 to 144
for host in scp_subs:
return_code = scp_subs[host].wait()
if return_code != 0:
raise RuntimeError(f"{get_date_str()} gather_output failed for {host} w/ return code {return_code!s}!")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try the following instead:

for host, process in scp_subs.items():
    _, error_in_bytes = process.communicate()
    if process.returncode != 0:
        raise RuntimeError(f"{get_date_str()} gather_output failed for '{host}': {error_in_bytes.decode()} (code={process.returncode}!")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a slight tweak to the error message for line formatting reasons but essentially adopted your suggestion.

docker/main/ngen/py_funcs.py Outdated Show resolved Hide resolved
Switching to use this instead of ValueError if command arg doesn't match
something supported by main(), and making message reflect the situation
a little better.
@robertbartel robertbartel mentioned this pull request Jun 20, 2024
@robertbartel
Copy link
Contributor Author

@christophertubbs and @aaraney, are there still some requested changes outstanding for this?

@christophertubbs
Copy link
Contributor

Looks solid to me. I removed my flag for requested changes. If Austin is good, I'm good.

@robertbartel robertbartel merged commit 987042c into NOAA-OWP:master Jul 2, 2024
4 of 8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request maas MaaS Workstream
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants