Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48175][SQL][PYTHON] Store collation information in metadata and not in type for SER/DE #46280

Closed
wants to merge 46 commits into from
Closed
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
8edc5ea
initial impl of new delta schema
stefankandic Apr 27, 2024
2f5a1b8
some improvements
stefankandic Apr 29, 2024
8f6856b
formatting improvements
stefankandic Apr 29, 2024
af41b50
minor changes
stefankandic Apr 29, 2024
001b9e9
small refactoring
stefankandic Apr 29, 2024
e65285b
remove bogus comment
stefankandic Apr 29, 2024
77e4103
remove bogus comment
stefankandic Apr 29, 2024
e4a205b
fix scalastyle
stefankandic Apr 29, 2024
bd600da
use string as path instead of list of strings
stefankandic Apr 29, 2024
19f5588
add tests for schema ser/de
stefankandic May 7, 2024
3a7e8f6
add python impl and tests
stefankandic May 7, 2024
79e5aaa
merge with latest master
stefankandic May 7, 2024
6ebaddf
reformat python
stefankandic May 7, 2024
b488346
update method calls
stefankandic May 7, 2024
7747947
tests passing
stefankandic May 7, 2024
0315421
fix equality for struct fields
stefankandic May 8, 2024
3b9ad23
rename variables
stefankandic May 8, 2024
913095c
fix python tests
stefankandic May 8, 2024
f48aa27
revert changes to collationsuite
stefankandic May 8, 2024
9283b76
fix pyspark-connect and streaming test
stefankandic May 8, 2024
72cde10
fix python linter
stefankandic May 8, 2024
eb9be6d
fix python linter
stefankandic May 9, 2024
b9021c9
add test for map field with .key suffix in the name
stefankandic May 9, 2024
ac0aa1d
fix mypy type annotation
stefankandic May 9, 2024
a8a497f
improve key in name test
stefankandic May 9, 2024
9089f9b
try to fix mypy errors
stefankandic May 9, 2024
c34ae9d
fix mypy
stefankandic May 9, 2024
9cf5b04
add provider to schema as well to follow delta protocol
stefankandic May 13, 2024
7cde276
Update python/pyspark/sql/types.py
stefankandic May 13, 2024
9e6afe0
resolving pr comments
stefankandic May 13, 2024
b8626fa
add tests for collation metadata on non string types
stefankandic May 13, 2024
10562d4
add additional assert for schema check
stefankandic May 13, 2024
97631c0
remove leftover test
stefankandic May 13, 2024
33ed245
add checks for collations on wrong type in python
stefankandic May 13, 2024
2b0a799
add missing docstring and fix mypy error
stefankandic May 14, 2024
aff7b58
fix condition for valid types
stefankandic May 14, 2024
c35d781
respond to pr comments
stefankandic May 15, 2024
6e379aa
add more tests
stefankandic May 15, 2024
0879841
make provider lowercase
stefankandic May 16, 2024
1ae29a8
fix failing test
stefankandic May 16, 2024
fdb0a7e
fix mypy
stefankandic May 16, 2024
c123a4b
fix mypy
stefankandic May 16, 2024
eb461b3
minor changes
stefankandic May 16, 2024
1fb404b
Merge branch 'master' into newDeltaSchema
stefankandic May 16, 2024
51a9bd6
fix streaming deduplication collation type
stefankandic May 17, 2024
379b55a
make version getter return optional
stefankandic May 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,42 @@
* Provides functionality to the UTF8String object which respects defined collation settings.
*/
public final class CollationFactory {

public static class CollationIdentifier {
public final String provider;
public final String name;
public final String version;
stefankandic marked this conversation as resolved.
Show resolved Hide resolved

public CollationIdentifier(String provider, String collationName, String version) {
this.provider = provider;
this.name = collationName;
this.version = version;
}

public static CollationIdentifier fromString(String identifier) {
String[] parts = identifier.split("\\.", 3);
return new CollationIdentifier(parts[0], parts[1], parts[2]);
}

@Override
public String toString() {
return String.format("%s.%s.%s", provider, name, version);
}

/**
* Returns
stefankandic marked this conversation as resolved.
Show resolved Hide resolved
*/
public String versionLess() {
return String.format("%s.%s", provider, name);
}
}

/**
* Entry encapsulating all information about a collation.
*/
public static class Collation {
public final String collationName;
public final String provider;
public final Collator collator;
public final Comparator<UTF8String> comparator;

Expand Down Expand Up @@ -97,6 +128,7 @@ public Collation(
boolean supportsBinaryOrdering,
boolean supportsLowercaseEquality) {
this.collationName = collationName;
this.provider = collator == null ? "spark" : "icu";
this.collator = collator;
this.comparator = comparator;
this.version = version;
Expand Down Expand Up @@ -137,6 +169,11 @@ public Collation(
supportsBinaryOrdering,
supportsLowercaseEquality);
}

/** Returns the collation identifier*/
stefankandic marked this conversation as resolved.
Show resolved Hide resolved
public CollationIdentifier identifier() {
return new CollationIdentifier(provider, collationName, version);
}
}

private static final Collation[] collationTable = new Collation[4];
Expand Down
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -2322,6 +2322,12 @@
],
"sqlState" : "2203G"
},
"INVALID_JSON_DATA_TYPE_FOR_COLLATIONS" : {
"message" : [
"Collations can only be applied to string types, but the JSON data type is <jsonType>."
],
"sqlState" : "2203G"
},
"INVALID_JSON_ROOT_FIELD" : {
"message" : [
"Cannot convert JSON root field to target Spark type."
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/errors/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,11 @@
"All items in `<arg_name>` should be in <allowed_types>, got <item_type>."
]
},
"INVALID_JSON_DATA_TYPE_FOR_COLLATIONS" : {
"message" : [
"Collations can only be applied to string types, but the JSON data type is <jsonType>."
]
},
"INVALID_MULTIPLE_ARGUMENT_CONDITIONS": {
"message": [
"[{arg_names}] cannot be <condition>."
Expand Down
4 changes: 4 additions & 0 deletions python/pyspark/sql/tests/connect/test_parity_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ def test_rdd_with_udt(self):
def test_udt(self):
super().test_udt()

@unittest.skip("Requires JVM access.")
def test_schema_with_collations_json_ser_de(self):
super().test_schema_with_collations_json_ser_de()

@unittest.skip("Does not test anything related to Spark Connect")
def test_parse_datatype_string(self):
super().test_parse_datatype_string()
Expand Down
144 changes: 123 additions & 21 deletions python/pyspark/sql/tests/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,129 @@ def test_convert_list_to_str(self):
self.assertEqual(df.count(), 1)
self.assertEqual(df.head(), Row(name="[123]", income=120))

def test_schema_with_collations_json_ser_de(self):
from pyspark.sql.types import _parse_datatype_json_string

unicode_collation = "UNICODE"

simple_struct = StructType([StructField("c1", StringType(unicode_collation))])

nested_struct = StructType([StructField("nested", simple_struct)])

array_in_schema = StructType(
[StructField("array", ArrayType(StringType(unicode_collation)))]
)

map_in_schema = StructType(
[
StructField(
"map", MapType(StringType(unicode_collation), StringType(unicode_collation))
)
]
)

array_in_map = StructType(
[
StructField(
"arrInMap",
MapType(
StringType(unicode_collation), ArrayType(StringType(unicode_collation))
),
)
]
)

nested_array_in_map = StructType(
stefankandic marked this conversation as resolved.
Show resolved Hide resolved
[
StructField(
"nestedArrayInMap",
ArrayType(
MapType(
StringType(unicode_collation),
stefankandic marked this conversation as resolved.
Show resolved Hide resolved
ArrayType(ArrayType(StringType(unicode_collation))),
)
),
)
]
)

schema_with_multiple_fields = StructType(
simple_struct.fields
+ nested_struct.fields
+ array_in_schema.fields
+ map_in_schema.fields
+ array_in_map.fields
+ nested_array_in_map.fields
)

schemas = [
simple_struct,
nested_struct,
array_in_schema,
map_in_schema,
nested_array_in_map,
array_in_map,
schema_with_multiple_fields,
]

for schema in schemas:
scala_datatype = self.spark._jsparkSession.parseDataType(schema.json())
stefankandic marked this conversation as resolved.
Show resolved Hide resolved
python_datatype = _parse_datatype_json_string(scala_datatype.json())
assert schema == python_datatype
stefankandic marked this conversation as resolved.
Show resolved Hide resolved
assert schema == _parse_datatype_json_string(schema.json())

def test_schema_with_collations_on_non_string_types(self):
from pyspark.sql.types import _parse_datatype_json_string, _COLLATIONS_METADATA_KEY

collations_on_int_col_json = f"""
{{
"type": "struct",
"fields": [
{{
"name": "c1",
"type": "integer",
"nullable": true,
"metadata": {{
"{_COLLATIONS_METADATA_KEY}": {{
"c1": "icu.UNICODE"
}}
}}
}}
]
}}
"""

collations_in_map_json = f"""
stefankandic marked this conversation as resolved.
Show resolved Hide resolved
{{
"type": "struct",
"fields": [
{{
"name": "mapField",
"type": {{
"type": "map",
"keyType": "string",
"valueType": "integer",
"valueContainsNull": true
}},
"nullable": true,
"metadata": {{
"{_COLLATIONS_METADATA_KEY}": {{
"mapField.value": "icu.UNICODE"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about duplicate keys in this json object (should be a protocol error)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We talked about this one a bit offline, but I would rather tackle this as a separate issue than just a collation protocol error. Currently, both python and scala code will not fail when encountering duplicate keys; python will just pick one to put in the dictionary and scala will have both in the JObject. What do you think @cloud-fan ?

}}
}}
}}
]
}}
"""
stefankandic marked this conversation as resolved.
Show resolved Hide resolved

self.assertRaises(
PySparkTypeError, lambda: _parse_datatype_json_string(collations_on_int_col_json)
)

self.assertRaises(
PySparkTypeError, lambda: _parse_datatype_json_string(collations_in_map_json)
)

def test_udt(self):
from pyspark.sql.types import _parse_datatype_json_string, _infer_type, _make_type_verifier

Expand Down Expand Up @@ -864,27 +987,6 @@ def test_parse_datatype_string(self):
self.assertEqual(t(), _parse_datatype_string(k))
self.assertEqual(IntegerType(), _parse_datatype_string("int"))
self.assertEqual(StringType(), _parse_datatype_string("string"))
self.assertEqual(StringType(), _parse_datatype_string("string collate UTF8_BINARY"))
self.assertEqual(StringType(), _parse_datatype_string("string COLLATE UTF8_BINARY"))
self.assertEqual(
StringType.fromCollationId(0), _parse_datatype_string("string COLLATE UTF8_BINARY")
)
self.assertEqual(
StringType.fromCollationId(1),
_parse_datatype_string("string COLLATE UTF8_BINARY_LCASE"),
)
self.assertEqual(
StringType.fromCollationId(2), _parse_datatype_string("string COLLATE UNICODE")
)
self.assertEqual(
StringType.fromCollationId(2), _parse_datatype_string("string COLLATE `UNICODE`")
)
self.assertEqual(
StringType.fromCollationId(3), _parse_datatype_string("string COLLATE UNICODE_CI")
)
self.assertEqual(
StringType.fromCollationId(3), _parse_datatype_string("string COLLATE `UNICODE_CI`")
)
self.assertEqual(CharType(1), _parse_datatype_string("char(1)"))
self.assertEqual(CharType(10), _parse_datatype_string("char( 10 )"))
self.assertEqual(CharType(11), _parse_datatype_string("char( 11)"))
Expand Down