Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48233][SS][TESTS] Tests for streaming on columns with non-default collations #46247

Closed
wants to merge 3 commits into from

Conversation

dbatomic
Copy link
Contributor

@dbatomic dbatomic commented Apr 26, 2024

What changes were proposed in this pull request?

This change covers tests for streaming operations under columns of string type that are collated with non-utf8-binary collations. PR introduces following tests:

  1. Non-stateful streaming for non-binary collated columns. We use UTF8_BINARY_LCASE non-binary collation as the input and assert that streaming propagates collation and that filtering behaves under rules of given collation.
  2. Stateful streaming for binary collations. We use UNICODE collation as source and make sure that stateful operations (deduplication as taken as the example) work.
  3. More tests that assert that stateful operations in combination with non-binary collations throw proper exception.

Why are the changes needed?

You can find more information about collation effort in document attached to root jira ticket.

This PR adds tests for basic non-stateful streaming operations with collations (e.g. filtering).

Does this PR introduce any user-facing change?

No

How was this patch tested?

PR is test only.

Was this patch authored or co-authored using generative AI tooling?

No

@dbatomic dbatomic changed the title [Draft] - Testing of Streaming and Collations [SPARK-48233][SQL][STREAMING] Tests for non-stateful streaming on columns with collations May 10, 2024
@dbatomic dbatomic marked this pull request as ready for review May 10, 2024 13:22
val inputData = MemoryStream[(String)]
val result = inputData.toDF()
.select(col("value")
.try_cast(StringType("UTF8_BINARY_LCASE")).as("str"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused by this - is the test name flipped or is UTF8_BINARY_LCASE considered a non-binary collation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, LCASE means comparing as lowercase, so yes it's bound to non-binary equality.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, UTF8_BINARY_LCASE is a non-binary collation.

In other words, "AAA" and "aaa" are considered equal, even though binary representations are clearly different.

testStream(filteredDf)(
StartStream(triggerClock = clock, trigger = Trigger.ProcessingTime(100)),
Execute { _ =>
spark.createDataFrame(Seq("aaa" -> 1, "AAA" -> 2, "bbb" -> 3, "aa" -> 4))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's important to also test a scenario where the incoming stream has a non-default collation itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand. streamDf has non-default collation, UTF8_BINARY_LCASE. What do you mean by "incoming" stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to what @HeartSaVioR said. Idea was exactly to use UTF8_BINARY_LCASE in the source.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I think I misread the test case the first time.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make clear the scope of tests we are adding here. I see the PR title is about "stateless" but you are also aware that deduplication is "stateful". While I agree that we probably won't want to add the collation test for all stateful operators, let's make the scope more clear in PR title.

val inputData = MemoryStream[(String)]
val result = inputData.toDF()
.select(col("value")
.try_cast(StringType("UTF8_BINARY_LCASE")).as("str"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, LCASE means comparing as lowercase, so yes it's bound to non-binary equality.

val inputData = MemoryStream[(String, Int)]
val result = inputData.toDF()
.select(col("_1")
.try_cast(StringType("UNICODE")).as("str"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we are here, I see UNICODE is binary equality but non-binary ordering. Does this still ensure that we can put this into RocksDB which key is binary sorted and find the key group based on prefix of key including this column?

E.g. Say we have two columns, dept (String with UNICODE collation), session start (timestamp) as grouping key, and want to scan all grouping keys which are having dept as 'dept1'. This is required for several operations like session window aggregation.

My gut feeling is yes, but I would like to double confirm.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, it should work, but we need to test this as well.
binary ordering means that you can use binary representation to check which string alphabetically comes first. But if we care only about equality (which is usually used in "groupings" and joins), binary equality is only important.

I will follow up with additional testing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, using UNICODE in grouping key of session window aggregation with RocksDB state store provider should cover it.

testStream(filteredDf)(
StartStream(triggerClock = clock, trigger = Trigger.ProcessingTime(100)),
Execute { _ =>
spark.createDataFrame(Seq("aaa" -> 1, "AAA" -> 2, "bbb" -> 3, "aa" -> 4))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand. streamDf has non-default collation, UTF8_BINARY_LCASE. What do you mean by "incoming" stream?

@dbatomic dbatomic changed the title [SPARK-48233][SQL][STREAMING] Tests for non-stateful streaming on columns with collations [SPARK-48233][SQL][STREAMING] Tests for streaming on columns with non-default collations May 14, 2024
@dbatomic
Copy link
Contributor Author

Let's make clear the scope of tests we are adding here. I see the PR title is about "stateless" but you are also aware that deduplication is "stateful". While I agree that we probably won't want to add the collation test for all stateful operators, let's make the scope more clear in PR title.

Let's make clear the scope of tests we are adding here. I see the PR title is about "stateless" but you are also aware that deduplication is "stateful". While I agree that we probably won't want to add the collation test for all stateful operators, let's make the scope more clear in PR title.

Let's make clear the scope of tests we are adding here. I see the PR title is about "stateless" but you are also aware that deduplication is "stateful". While I agree that we probably won't want to add the collation test for all stateful operators, let's make the scope more clear in PR title.

Right, I updated both PR title and PR description. And yes, tests for collations are still pretty ad-hoc/selective.

Goal of this PR is to assert that basics work. As we create more thorough plan for collations and streaming we will start adding better organized test strategies.

Let me know if you think now is a good time to start with this. I was also thinking about creating new test suite only for collations, but that seemed like an overkill for this change.

Copy link
Contributor

@jose-torres jose-torres left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good from my side

@HeartSaVioR HeartSaVioR changed the title [SPARK-48233][SQL][STREAMING] Tests for streaming on columns with non-default collations [SPARK-48233][SS] Tests for streaming on columns with non-default collations May 15, 2024
@HeartSaVioR HeartSaVioR changed the title [SPARK-48233][SS] Tests for streaming on columns with non-default collations [SPARK-48233][SS][TESTS] Tests for streaming on columns with non-default collations May 15, 2024
Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@HeartSaVioR
Copy link
Contributor

Thanks! Merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants