New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48233][SS][TESTS] Tests for streaming on columns with non-default collations #46247
Conversation
val inputData = MemoryStream[(String)] | ||
val result = inputData.toDF() | ||
.select(col("value") | ||
.try_cast(StringType("UTF8_BINARY_LCASE")).as("str")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused by this - is the test name flipped or is UTF8_BINARY_LCASE considered a non-binary collation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK, LCASE means comparing as lowercase, so yes it's bound to non-binary equality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right, UTF8_BINARY_LCASE is a non-binary collation.
In other words, "AAA" and "aaa" are considered equal, even though binary representations are clearly different.
testStream(filteredDf)( | ||
StartStream(triggerClock = clock, trigger = Trigger.ProcessingTime(100)), | ||
Execute { _ => | ||
spark.createDataFrame(Seq("aaa" -> 1, "AAA" -> 2, "bbb" -> 3, "aa" -> 4)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's important to also test a scenario where the incoming stream has a non-default collation itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand. streamDf has non-default collation, UTF8_BINARY_LCASE. What do you mean by "incoming" stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to what @HeartSaVioR said. Idea was exactly to use UTF8_BINARY_LCASE
in the source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I think I misread the test case the first time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make clear the scope of tests we are adding here. I see the PR title is about "stateless" but you are also aware that deduplication is "stateful". While I agree that we probably won't want to add the collation test for all stateful operators, let's make the scope more clear in PR title.
val inputData = MemoryStream[(String)] | ||
val result = inputData.toDF() | ||
.select(col("value") | ||
.try_cast(StringType("UTF8_BINARY_LCASE")).as("str")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK, LCASE means comparing as lowercase, so yes it's bound to non-binary equality.
val inputData = MemoryStream[(String, Int)] | ||
val result = inputData.toDF() | ||
.select(col("_1") | ||
.try_cast(StringType("UNICODE")).as("str"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While we are here, I see UNICODE is binary equality but non-binary ordering. Does this still ensure that we can put this into RocksDB which key is binary sorted and find the key group based on prefix of key including this column?
E.g. Say we have two columns, dept (String with UNICODE collation), session start (timestamp) as grouping key, and want to scan all grouping keys which are having dept as 'dept1'. This is required for several operations like session window aggregation.
My gut feeling is yes, but I would like to double confirm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory, it should work, but we need to test this as well.
binary ordering means that you can use binary representation to check which string alphabetically comes first. But if we care only about equality (which is usually used in "groupings" and joins), binary equality is only important.
I will follow up with additional testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, using UNICODE in grouping key of session window aggregation with RocksDB state store provider should cover it.
testStream(filteredDf)( | ||
StartStream(triggerClock = clock, trigger = Trigger.ProcessingTime(100)), | ||
Execute { _ => | ||
spark.createDataFrame(Seq("aaa" -> 1, "AAA" -> 2, "bbb" -> 3, "aa" -> 4)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand. streamDf has non-default collation, UTF8_BINARY_LCASE. What do you mean by "incoming" stream?
Right, I updated both PR title and PR description. And yes, tests for collations are still pretty ad-hoc/selective. Goal of this PR is to assert that basics work. As we create more thorough plan for collations and streaming we will start adding better organized test strategies. Let me know if you think now is a good time to start with this. I was also thinking about creating new test suite only for collations, but that seemed like an overkill for this change. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good from my side
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Thanks! Merging to master. |
What changes were proposed in this pull request?
This change covers tests for streaming operations under columns of string type that are collated with non-utf8-binary collations. PR introduces following tests:
UTF8_BINARY_LCASE
non-binary collation as the input and assert that streaming propagates collation and that filtering behaves under rules of given collation.UNICODE
collation as source and make sure that stateful operations (deduplication as taken as the example) work.Why are the changes needed?
You can find more information about collation effort in document attached to root jira ticket.
This PR adds tests for basic non-stateful streaming operations with collations (e.g. filtering).
Does this PR introduce any user-facing change?
No
How was this patch tested?
PR is test only.
Was this patch authored or co-authored using generative AI tooling?
No