-
Notifications
You must be signed in to change notification settings - Fork 218
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Location Providers #1452
Changes from 5 commits
adfbd3c
ea2b456
ce5f0d5
d3e0c0f
00917e9
c4e6be9
bc2eab8
9999cbb
23ef8f5
e47e18f
45391de
065bcbf
e5214d4
568af55
e77af29
651aaea
5bfa24b
8cd46fa
3dbb8d0
e992c24
f1e4a31
46dd7ab
490d08c
3555932
55d6c4f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -136,6 +136,10 @@ | |
visit, | ||
visit_with_partner, | ||
) | ||
from pyiceberg.table import ( | ||
LocationProvider, | ||
load_location_provider, | ||
) | ||
from pyiceberg.table.metadata import TableMetadata | ||
from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping | ||
from pyiceberg.transforms import TruncateTransform | ||
|
@@ -2415,7 +2419,9 @@ def data_file_statistics_from_parquet_metadata( | |
) | ||
|
||
|
||
def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]: | ||
def write_file( | ||
io: FileIO, location_provider: LocationProvider, table_metadata: TableMetadata, tasks: Iterator[WriteTask] | ||
) -> Iterator[DataFile]: | ||
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties | ||
|
||
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties) | ||
|
@@ -2446,7 +2452,10 @@ def write_parquet(task: WriteTask) -> DataFile: | |
for batch in task.record_batches | ||
] | ||
arrow_table = pa.Table.from_batches(batches) | ||
file_path = f"{table_metadata.location}/data/{task.generate_data_file_path('parquet')}" | ||
file_path = location_provider.new_data_location( | ||
data_file_name=task.generate_data_file_filename("parquet"), | ||
partition_key=task.partition_key, | ||
) | ||
fo = io.new_output(file_path) | ||
with fo.create(overwrite=True) as fos: | ||
with pq.ParquetWriter(fos, schema=arrow_table.schema, **parquet_writer_kwargs) as writer: | ||
|
@@ -2622,13 +2631,15 @@ def _dataframe_to_data_files( | |
property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, | ||
default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, | ||
) | ||
location_provider = load_location_provider(table_location=table_metadata.location, table_properties=table_metadata.properties) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't love this. I wanted to do something like this and cache on at least the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thats an interesting edge case. it seems like an anti-pattern to change the table property and write in the same transaction, although its currently allowed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 3555932 (fyi the Java tests don't have one) |
||
name_mapping = table_metadata.schema().name_mapping | ||
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False | ||
task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us) | ||
|
||
if table_metadata.spec().is_unpartitioned(): | ||
yield from write_file( | ||
io=io, | ||
location_provider=location_provider, | ||
table_metadata=table_metadata, | ||
tasks=iter([ | ||
WriteTask(write_uuid=write_uuid, task_id=next(counter), record_batches=batches, schema=task_schema) | ||
|
@@ -2639,6 +2650,7 @@ def _dataframe_to_data_files( | |
partitions = _determine_partitions(spec=table_metadata.spec(), schema=table_metadata.schema(), arrow_table=df) | ||
yield from write_file( | ||
io=io, | ||
location_provider=location_provider, | ||
table_metadata=table_metadata, | ||
tasks=iter([ | ||
WriteTask( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,9 @@ | |
# under the License. | ||
from __future__ import annotations | ||
|
||
import importlib | ||
import itertools | ||
import logging | ||
import uuid | ||
import warnings | ||
from abc import ABC, abstractmethod | ||
|
@@ -138,7 +140,6 @@ | |
from pyiceberg.utils.concurrent import ExecutorFactory | ||
from pyiceberg.utils.config import Config | ||
from pyiceberg.utils.deprecated import deprecated | ||
from pyiceberg.utils.deprecated import deprecation_message as deprecation_message | ||
from pyiceberg.utils.properties import property_as_bool | ||
|
||
if TYPE_CHECKING: | ||
|
@@ -150,6 +151,8 @@ | |
|
||
from pyiceberg.catalog import Catalog | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
ALWAYS_TRUE = AlwaysTrue() | ||
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write" | ||
|
||
|
@@ -192,6 +195,14 @@ class TableProperties: | |
WRITE_PARTITION_SUMMARY_LIMIT = "write.summary.partition-limit" | ||
WRITE_PARTITION_SUMMARY_LIMIT_DEFAULT = 0 | ||
|
||
WRITE_LOCATION_PROVIDER_IMPL = "write.location-provider.impl" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Though the docs say that the default is |
||
|
||
OBJECT_STORE_ENABLED = "write.object-storage.enabled" | ||
OBJECT_STORE_ENABLED_DEFAULT = False | ||
|
||
WRITE_OBJECT_STORE_PARTITIONED_PATHS = "write.object-storage.partitioned-paths" | ||
WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT = True | ||
|
||
DELETE_MODE = "write.delete.mode" | ||
DELETE_MODE_COPY_ON_WRITE = "copy-on-write" | ||
DELETE_MODE_MERGE_ON_READ = "merge-on-read" | ||
|
@@ -1616,13 +1627,6 @@ def generate_data_file_filename(self, extension: str) -> str: | |
# https://github.com/apache/iceberg/blob/a582968975dd30ff4917fbbe999f1be903efac02/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java#L92-L101 | ||
return f"00000-{self.task_id}-{self.write_uuid}.{extension}" | ||
|
||
def generate_data_file_path(self, extension: str) -> str: | ||
if self.partition_key: | ||
file_path = f"{self.partition_key.to_path()}/{self.generate_data_file_filename(extension)}" | ||
return file_path | ||
else: | ||
return self.generate_data_file_filename(extension) | ||
|
||
|
||
@dataclass(frozen=True) | ||
class AddFileTask: | ||
|
@@ -1632,6 +1636,67 @@ class AddFileTask: | |
partition_field_value: Record | ||
|
||
|
||
class LocationProvider(ABC): | ||
Fokko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"""A base class for location providers, that provide data file locations for write tasks.""" | ||
|
||
table_location: str | ||
table_properties: Properties | ||
|
||
def __init__(self, table_location: str, table_properties: Properties): | ||
self.table_location = table_location | ||
self.table_properties = table_properties | ||
|
||
@abstractmethod | ||
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str: | ||
"""Return a fully-qualified data file location for the given filename. | ||
|
||
Args: | ||
data_file_name (str): The name of the data file. | ||
partition_key (Optional[PartitionKey]): The data file's partition key. If None, the data is not partitioned. | ||
|
||
Returns: | ||
str: A fully-qualified location URI for the data file. | ||
""" | ||
|
||
|
||
def _import_location_provider( | ||
location_provider_impl: str, table_location: str, table_properties: Properties | ||
) -> Optional[LocationProvider]: | ||
try: | ||
path_parts = location_provider_impl.split(".") | ||
if len(path_parts) < 2: | ||
raise ValueError( | ||
f"{TableProperties.WRITE_LOCATION_PROVIDER_IMPL} should be full path (module.CustomLocationProvider), got: {location_provider_impl}" | ||
) | ||
module_name, class_name = ".".join(path_parts[:-1]), path_parts[-1] | ||
module = importlib.import_module(module_name) | ||
class_ = getattr(module, class_name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, wonder if we should reduce duplication between this and file IO loading. |
||
return class_(table_location, table_properties) | ||
except ModuleNotFoundError: | ||
logger.warning("Could not initialize LocationProvider: %s", location_provider_impl) | ||
return None | ||
|
||
|
||
def load_location_provider(table_location: str, table_properties: Properties) -> LocationProvider: | ||
table_location = table_location.rstrip("/") | ||
|
||
if location_provider_impl := table_properties.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL): | ||
if location_provider := _import_location_provider(location_provider_impl, table_location, table_properties): | ||
logger.info("Loaded LocationProvider: %s", location_provider_impl) | ||
return location_provider | ||
else: | ||
raise ValueError(f"Could not initialize LocationProvider: {location_provider_impl}") | ||
|
||
if property_as_bool(table_properties, TableProperties.OBJECT_STORE_ENABLED, TableProperties.OBJECT_STORE_ENABLED_DEFAULT): | ||
from pyiceberg.table.locations import ObjectStoreLocationProvider | ||
|
||
return ObjectStoreLocationProvider(table_location, table_properties) | ||
else: | ||
from pyiceberg.table.locations import DefaultLocationProvider | ||
|
||
return DefaultLocationProvider(table_location, table_properties) | ||
|
||
|
||
def _parquet_files_to_data_files(table_metadata: TableMetadata, file_paths: List[str], io: FileIO) -> Iterable[DataFile]: | ||
"""Convert a list files into DataFiles. | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
from typing import Optional | ||
|
||
import mmh3 | ||
|
||
from pyiceberg.partitioning import PartitionKey | ||
from pyiceberg.table import LocationProvider, TableProperties | ||
from pyiceberg.typedef import Properties | ||
from pyiceberg.utils.properties import property_as_bool | ||
|
||
|
||
class DefaultLocationProvider(LocationProvider): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The biggest difference vs the Java implementations is that I've not supported There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks! would be great to have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. opened an issue on supporting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry guys, didn't notice this thread until now. |
||
def __init__(self, table_location: str, table_properties: Properties): | ||
super().__init__(table_location, table_properties) | ||
|
||
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str: | ||
prefix = f"{self.table_location}/data" | ||
return f"{prefix}/{partition_key.to_path()}/{data_file_name}" if partition_key else f"{prefix}/{data_file_name}" | ||
|
||
|
||
HASH_BINARY_STRING_BITS = 20 | ||
ENTROPY_DIR_LENGTH = 4 | ||
ENTROPY_DIR_DEPTH = 3 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: move these into ObjectStoreLocationProvider There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Makes sense esp given the file has now grown. It's pretty unreadable to prefix all the constants here with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we had issues dealing with constants in the file itself. https://github.com/apache/iceberg-python/pull/1217/files#diff-942c2f54eac4f30f1a1e2fa18b719e17cc1cb03ad32908a402c4ba3abe9eca63L37-L38 if its only used in but also this is a nit comment :P There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I fully agree that it should be within the class - will find a way to do it readably 👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
|
||
class ObjectStoreLocationProvider(LocationProvider): | ||
_include_partition_paths: bool | ||
|
||
def __init__(self, table_location: str, table_properties: Properties): | ||
super().__init__(table_location, table_properties) | ||
self._include_partition_paths = property_as_bool( | ||
table_properties, | ||
TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS, | ||
TableProperties.WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULT, | ||
) | ||
|
||
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tried to make this as consistent with its Java counter-part so file locations are consistent too. This means hashing on both the partition key and the data file name below, and using the same hash function. Seemed reasonable to port over the the object storage stuff in this PR, given that the original issue #861 mentions this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since Iceberg is mainly focussed on object-stores, I'm leaning towards making the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for this great suggestion and context! I agree:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ^ cc @kevinjqliu, how does this sound to you? I realise the concerns you raised re things silently working differently with Java and PyIceberg seem a little contradicting with the above (but I think it's fine). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, I've not yet changed
I'm very open to be swayed / discuss this. After reading through apache/iceberg#11112 it seems there was a strong case for still supporting partition values in paths though I haven't been able to flesh it out fully. Perhaps it's backwards compatibility, for folks that inspect storage to see how their files are actually laid out; it does group them together nicely. I'd be happy to change the default if there's reason for it. The readability of file paths will arguably anyway decrease with these hashes so the above might be a non-issue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While im in favor of making There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Makes sense! We can have the discussion regarding defaults there. I'd like to keep the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SGTM! 🚀 |
||
if self._include_partition_paths and partition_key: | ||
return self.new_data_location(f"{partition_key.to_path()}/{data_file_name}") | ||
|
||
prefix = f"{self.table_location}/data" | ||
hashed_path = self._compute_hash(data_file_name) | ||
|
||
return ( | ||
f"{prefix}/{hashed_path}/{data_file_name}" | ||
if self._include_partition_paths | ||
else f"{prefix}/{hashed_path}-{data_file_name}" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting that disabling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is an interesting case, do we have a test to show this behavior explicitly? i think it'll be valuable to refer to it at a later time |
||
) | ||
|
||
@staticmethod | ||
def _compute_hash(data_file_name: str) -> str: | ||
# Bitwise AND to combat sign-extension; bitwise OR to preserve leading zeroes that `bin` would otherwise strip. | ||
hash_code = mmh3.hash(data_file_name) & ((1 << HASH_BINARY_STRING_BITS) - 1) | (1 << HASH_BINARY_STRING_BITS) | ||
return ObjectStoreLocationProvider._dirs_from_hash(bin(hash_code)[-HASH_BINARY_STRING_BITS:]) | ||
|
||
@staticmethod | ||
def _dirs_from_hash(file_hash: str) -> str: | ||
"""Divides hash into directories for optimized orphan removal operation using ENTROPY_DIR_DEPTH and ENTROPY_DIR_LENGTH.""" | ||
hash_with_dirs = [] | ||
for i in range(0, ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH, ENTROPY_DIR_LENGTH): | ||
hash_with_dirs.append(file_hash[i : i + ENTROPY_DIR_LENGTH]) | ||
|
||
if len(file_hash) > ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH: | ||
hash_with_dirs.append(file_hash[ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH :]) | ||
|
||
return "/".join(hash_with_dirs) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
from typing import Optional | ||
|
||
import pytest | ||
|
||
from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec | ||
from pyiceberg.schema import Schema | ||
from pyiceberg.table import ( | ||
LocationProvider, | ||
load_location_provider, | ||
) | ||
from pyiceberg.transforms import IdentityTransform | ||
from pyiceberg.typedef import EMPTY_DICT | ||
from pyiceberg.types import NestedField, StringType | ||
|
||
TABLE_SCHEMA = Schema(NestedField(field_id=2, name="field", field_type=StringType(), required=False)) | ||
PARTITION_FIELD = PartitionField(source_id=2, field_id=1002, transform=IdentityTransform(), name="part#field") | ||
PARTITION_SPEC = PartitionSpec(PARTITION_FIELD) | ||
PARTITION_KEY = PartitionKey( | ||
raw_partition_field_values=[PartitionFieldValue(PARTITION_FIELD, "example#val")], | ||
partition_spec=PARTITION_SPEC, | ||
schema=TABLE_SCHEMA, | ||
) | ||
|
||
|
||
class CustomLocationProvider(LocationProvider): | ||
def new_data_location(self, data_file_name: str, partition_key: Optional[PartitionKey] = None) -> str: | ||
return f"custom_location_provider/{data_file_name}" | ||
|
||
|
||
def test_default_location_provider() -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The tests in this file are inspired by https://github.com/apache/iceberg/blob/main/core/src/test/java/org/apache/iceberg/TestLocationProvider.java. The hash functions are the same so those constants are unchanged. |
||
provider = load_location_provider(table_location="table_location", table_properties=EMPTY_DICT) | ||
|
||
assert provider.new_data_location("my_file") == "table_location/data/my_file" | ||
|
||
|
||
def test_custom_location_provider() -> None: | ||
qualified_name = CustomLocationProvider.__module__ + "." + CustomLocationProvider.__name__ | ||
provider = load_location_provider( | ||
table_location="table_location", table_properties={"write.location-provider.impl": qualified_name} | ||
) | ||
|
||
assert provider.new_data_location("my_file") == "custom_location_provider/my_file" | ||
|
||
|
||
def test_custom_location_provider_single_path() -> None: | ||
with pytest.raises(ValueError, match=r"write\.location-provider\.impl should be full path"): | ||
load_location_provider(table_location="table_location", table_properties={"write.location-provider.impl": "not_found"}) | ||
|
||
|
||
def test_custom_location_provider_not_found() -> None: | ||
with pytest.raises(ValueError, match=r"Could not initialize LocationProvider"): | ||
load_location_provider( | ||
table_location="table_location", table_properties={"write.location-provider.impl": "module.not_found"} | ||
) | ||
|
||
|
||
def test_object_storage_injects_entropy() -> None: | ||
provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "true"}) | ||
|
||
location = provider.new_data_location("test.parquet") | ||
parts = location.split("/") | ||
|
||
assert len(parts) == 7 | ||
assert parts[0] == "table_location" | ||
assert parts[1] == "data" | ||
# Entropy directories in the middle | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since this test is called similar to
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test was inspired by https://github.com/apache/iceberg/blob/main/core/src/test/java/org/apache/iceberg/TestLocationProvider.java#L275. fyi, my reading of this was: this tests that there's some stuff in the middle. The later (To me, it made sense for the integration test to have the balance of checking both entropy and that they're binary-hashed, but not the hash itself because that feels unit-test-y) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's fair for this test to check that it's binary too. That way, if e.g. the wrong hash method is used, this unit test still passes (the provider does indeed inject entropy) but the hash-injection is wrong (so that unit test fails). This sounds good to me, thanks! |
||
assert parts[-1] == "test.parquet" | ||
|
||
|
||
@pytest.mark.parametrize("object_storage", [True, False]) | ||
def test_partition_value_in_path(object_storage: bool) -> None: | ||
provider = load_location_provider( | ||
table_location="table_location", | ||
table_properties={ | ||
"write.object-storage.enabled": str(object_storage), | ||
}, | ||
) | ||
|
||
location = provider.new_data_location("test.parquet", PARTITION_KEY) | ||
partition_segment = location.split("/")[-2] | ||
|
||
# Field name is not encoded but partition value is - this differs from the Java implementation | ||
# https://github.com/apache/iceberg/blob/cdf748e8e5537f13d861aa4c617a51f3e11dc97c/core/src/test/java/org/apache/iceberg/TestLocationProvider.java#L304 | ||
assert partition_segment == "part#field=example%23val" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Put up #1457 - I'll remove this special-character testing (that the Java test counterpart does) here because it'll be tested in that PR. |
||
|
||
|
||
def test_object_storage_exclude_partition_in_path() -> None: | ||
provider = load_location_provider( | ||
table_location="table_location", | ||
table_properties={ | ||
"write.object-storage.enabled": "true", | ||
"write.object-storage.partitioned-paths": "false", | ||
}, | ||
) | ||
|
||
location = provider.new_data_location("test.parquet", PARTITION_KEY) | ||
|
||
# No partition values included in the path and last part of entropy is seperated with "-" | ||
assert location == "table_location/data/0110/1010/0011/11101000-test.parquet" | ||
|
||
|
||
@pytest.mark.parametrize( | ||
["data_file_name", "expected_hash"], | ||
[ | ||
("a", "0101/0110/1001/10110010"), | ||
("b", "1110/0111/1110/00000011"), | ||
("c", "0010/1101/0110/01011111"), | ||
("d", "1001/0001/0100/01110011"), | ||
], | ||
) | ||
def test_hash_injection(data_file_name: str, expected_hash: str) -> None: | ||
provider = load_location_provider(table_location="table_location", table_properties={"write.object-storage.enabled": "true"}) | ||
|
||
assert provider.new_data_location(data_file_name) == f"table_location/data/{expected_hash}/{data_file_name}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we might want
location_provider: LocationProvider
last for backwards compatibilityThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about leaving the signature as before and doing
load_location_provider
at the start of this function (aboveparquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
instead of in_dataframe_to_data_files
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that would mean we need to run
load_location_provider
per data file and can potentially get expensiveThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so? At the start of the function means not in
write_parquet
- thelocation_provider
loaded would be just be used within that, similar toparquet_writer_kwargs
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah makes sense, write_parquet is called once per
_dataframe_to_data_files
we can do that to preserve backwards compatibility
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good! (typo correction:
write_file
above 😄)