-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-54392][SS] Optimize JVM-Python communication for TWS initial state #53122
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
holdenk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just started taking a look, it's been a hot minute since I looked at the Arrow serialization logic so some perhaps silly questions.
|
|
||
| # Check if the entire column is null | ||
| if data_column.null_count == len(data_column): | ||
| return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given we've changed the implicit type signature of the function lets maybe add a type annotation on generate_data_batches for readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
python/pyspark/worker.py
Outdated
|
|
||
| parsed_offsets = extract_key_value_indexes(arg_offsets) | ||
|
|
||
| import pandas as pd |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Random place for an import
| init_rows.append(init_row) | ||
|
|
||
| total_len = len(input_rows) + len(init_rows) | ||
| if total_len >= self.arrow_max_records_per_batch: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SQLConf config param says if set to zero or negative number there is no limit, in this case if it's set to zero or a negative number we will always output a fresh batch per row. Let's change the behaviour and add a test covering this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, right;
copied that from non-init state handling; 🫠
nice catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| def row_stream(): | ||
| for batch in batches: | ||
| if self.arrow_max_bytes_per_batch != 2**31 - 1 and batch.num_rows > 0: | ||
| batch_bytes = sum( | ||
| buf.size | ||
| for col in batch.columns | ||
| for buf in col.buffers() | ||
| if buf is not None | ||
| ) | ||
| self.total_bytes += batch_bytes | ||
| self.total_rows += batch.num_rows | ||
| self.average_arrow_row_size = self.total_bytes / self.total_rows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic seems to be duplicated from elsewhere in the file, maybe we can add it to a base class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
What changes were proposed in this pull request?
generally will have much less batches in case of high keys cardinality.
init_dataandinput_datain batch0: instead of it serializeinit_datafirst, and theninput_data;in worst case we're going to have one more chunk by not grouping them together, but winning by having much simpler logic on python side.
Why are the changes needed?
Benchmark results show that in high-cardinality scenarios, this optimization improves batch0 time by ~40%. No visible regressions for low cardinality case.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Existing UT and Benchmark:
10,000,000 distinct keys in init state (8xi3.4xlarge):
- Without Optimization: 11400 records/s
- With Optimization: 30000 records/s
Was this patch authored or co-authored using generative AI tooling?
No