Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebased LOFAR parallelism #372

Merged
merged 5 commits into from
Mar 21, 2025
Merged

Conversation

sstansill
Copy link
Collaborator

@sstansill sstansill commented Mar 13, 2025

Adds a new method read_col_conversion_dask that allows larger than memory columns to be converted. It also provides speed-ups for columns that comfortably fit in memory. Various changes:

  1. TableManager class has been added so that multi-thread/process conversion can happen without having to serialize casacore table objects. This replaces open_table_ro and open_query in convert_and_write_partition.
  2. read_col_conversion_dask uses dask's map_blocks to create tasks for each chunk of a DataArray which reads data from a MSv2 column and reshapes it

This has been used to convert 9TB of lofar data in ~4.5 hours using a compute node with ~100GB of system memory. Previously this was impossible unless a compute node with >9TB of memory could be procured

Notes:

  • This implementation is completely backwards compatible with the previous implementation and well contained--the issues with encoding have been avoided here.
  • Outputs with lofar=False and lofar=True have been validated against one another. They are identical.
  • If both lofar=True and parallel=True, it defaults to parallel to adhere to dask best practices.
  • I'd prefer to move exclusively to the dask version of read_col_conversion to improve maintainability.

@sstansill sstansill force-pushed the add-lofar-parallelism branch from 3d4b060 to 95e6e31 Compare March 13, 2025 13:35
@v-morello
Copy link

As mentioned in the last xradio meeting, we can probably come up with a better name than lofar for the option in convert_msv2_to_processing_set, which is an important top-level function.

Something that would either suggest that:

  • The parallelism is along the time dimension
  • This is an option created to deal with single scans/partitions larger than memory, as suggested by @scpmw

One thing to note is that there is already an option called parallel and this new option effectively signals that parallelism should be along the time dimension, while being incompatible with the existing parallel option:

    if lofar == True and parallel == True:
        logger.warn(
            "Unsupported config. `lofar` and `parallel` both true isn't supported.\nSwitching off `lofar` mode"
        )
        lofar = False

This strongly suggests that parallel should be made an option of type string which could be either partition or time, but I am not familiar enough with the codebase to know if that would be a practical choice.

@v-morello
Copy link

v-morello commented Mar 13, 2025

If changing the meaning/type of parallel is not possible because the backwards compatibility is too important, then maybe:

  • Keep parallel
  • Rename lofar to parallelism_mode which would be partition by default, but could also be time (corresponding to lofar=True at the moment)

@sstansill
Copy link
Collaborator Author

sstansill commented Mar 13, 2025

I think Peter's suggestion of large_scan_mode is sensible if we leave it as a boolean option.

I was about to write a suggestion similar to parallel and parallelism_mode. This would be my preferred option but it depends on how widely adopted the time-based parallelism will become.

@Jan-Willem would it be worth deprecating the use_table_iter option in this branch? Time-based parallelism can be used to circumvent the casacore issue casacore/python-casacore#130. Another point to think about is that read_col_conversion_numpy() and read_col_conversion_dask() retrieve the extra_dimensions tuple differently—I prefer the method in read_col_conversion_dask() but this isn't a strong preference.

@sstansill
Copy link
Collaborator Author

@Jan-Willem I've just realised that most of the usual checks haven't run because this is a PR from a fork. You might want to change the other checks to "on: pull_request" too

@sstansill sstansill self-assigned this Mar 13, 2025
@Jan-Willem
Copy link
Member

I have added on: pull_request to the GitHub workflows on main. @sstansill, can you please merge main into your pull request?

I have no objections to changing parallel to parallelism_mode.

A check will need to be added for when parallelism_mode='time' to ensure that there is a single DDI and observation present in the MSv2 being converted.

The dask documentation recommends not calling delayed within delayed functions (https://docs.dask.org/en/latest/delayed-best-practices.html). If we go with just read_col_conversion_dask() and parallel_mode is not 'time' then we end up with the dask.delayed(convert_and_write_partition) in measurement_set/convert_msv2_to_processing_set.py containing a map_blocks (a delayed object). In the best practices, they say, "While this may actually work, it’s usually slow and results in hard-to-understand solutions." so, we might be able to get away with this. I am planning on doing some testing. @sjperkins, do you have any additional insight?

@sjperkins
Copy link

Within my own code-bases, I've exclusively used dask.array.blockwise to generate I/O reads, rather than combining it with dask.delayed. Combined with my lack of familiarity with xradio's general dask handling strategies, I unfortunately don't have any advice to offer here.

@v-morello
Copy link

I have no objections to changing parallel to parallelism_mode.

@Jan-Willem Just to 100% clarify, this would mean the updated interface would only have a parallelism_mode parameter that can be partition, time, or None (latter being equivalent to parallel=False). We remove both parameters parallel and lofar, and default value for parallelism_mode remains None? Sounds good to me, just looking for unambiguous agreement on the interface 😄

@Jan-Willem
Copy link
Member

@v-morello yes, only one parameter called parallelism_mode.

@sstansill sstansill force-pushed the add-lofar-parallelism branch from cccc7d1 to 807f688 Compare March 20, 2025 10:31
The `if is_single_dish` clause was only being called when `overwrite=False`. This is undesired behaviour
@sstansill sstansill force-pushed the add-lofar-parallelism branch from a611380 to 4640868 Compare March 21, 2025 11:11
@sstansill
Copy link
Collaborator Author

Ready for second review

Commit c361da2 contains a bug fix that's independent of the parallelism changes

@Jan-Willem
Copy link
Member

It looks good to me. If there are no objections I will merge it today.

@Jan-Willem Jan-Willem merged commit 439026b into casangi:main Mar 21, 2025
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants