Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix read from multiple s3 regions #1453

Merged
merged 26 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
7f52a1a
Take netloc into account for s3 filesystem when calling `_initialize_fs`
jiakai-li Dec 19, 2024
b53e89f
Fix unit test for s3 fileystem
jiakai-li Dec 20, 2024
7699a76
Merge branch 'main' into fix-read-from-multiple-s3-regions
jiakai-li Dec 21, 2024
eb5e491
Update ArrowScan to use different FileSystem per file
jiakai-li Dec 21, 2024
0c61ac8
Add unit test for `PyArrorFileIO.fs_by_scheme` cache behavior
jiakai-li Dec 21, 2024
327dbac
Add error handling
jiakai-li Dec 22, 2024
b4fccf2
Update tests/io/test_pyarrow.py
jiakai-li Dec 23, 2024
48bb811
Update `s3.region` document and a test case
jiakai-li Dec 23, 2024
8404e6b
Add test case for `PyArrowFileIO.new_input` multi region
jiakai-li Dec 24, 2024
53951f5
Shuffle code location for better maintainability
jiakai-li Dec 24, 2024
51fb6ff
Comment for future integration test
jiakai-li Dec 24, 2024
0cd06c4
Typo fix
jiakai-li Dec 24, 2024
64fbdab
Document wording
jiakai-li Dec 24, 2024
37d9ec2
Add warning when the bucket region for a file cannot be resolved (for…
jiakai-li Dec 29, 2024
74e78ae
Merge branch 'main' into fix-read-from-multiple-s3-regions
jiakai-li Dec 29, 2024
4ff4a7d
Fix code linting
jiakai-li Dec 29, 2024
b56f2ee
Update mkdocs/docs/configuration.md
jiakai-li Jan 3, 2025
2a4cee1
Merge branch 'main' into fix-read-from-multiple-s3-regions
jiakai-li Jan 4, 2025
9cc3a30
Code refactoring
jiakai-li Jan 4, 2025
ba5ef76
Unit test
jiakai-li Jan 4, 2025
8f06a15
Code refactoring
jiakai-li Jan 4, 2025
e5cac02
Test cases
jiakai-li Jan 4, 2025
9652baf
Code format
jiakai-li Jan 4, 2025
bc2adc7
Merge branch 'main' into fix-read-from-multiple-s3-regions
jiakai-li Jan 6, 2025
4b83fc0
Code tidy-up
jiakai-li Jan 6, 2025
7f207bf
Update pyiceberg/io/pyarrow.py
jiakai-li Jan 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,21 +102,21 @@ For the FileIO there are several configuration options available:

<!-- markdown-link-check-disable -->

| Key | Example | Description |
|----------------------|----------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| s3.endpoint | <https://10.0.19.25/> | Configure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. |
| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. |
| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. |
| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
| s3.role-session-name | session | An optional identifier for the assumed role session. |
| s3.role-arn | arn:aws:... | AWS Role ARN. If provided instead of access_key and secret_key, temporary credentials will be fetched by assuming this role. |
| s3.signer | bearer | Configure the signature version of the FileIO. |
| s3.signer.uri | <http://my.signer:8080/s3> | Configure the remote signing uri if it differs from the catalog uri. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. |
| s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. (default : v1/aws/s3/sign). |
| s3.region | us-west-2 | Sets the region of the bucket |
| s3.proxy-uri | <http://my.proxy.com:8080> | Configure the proxy server to be used by the FileIO. |
| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. |
| s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. |
| Key | Example | Description |
|----------------------|----------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| s3.endpoint | <https://10.0.19.25/> | Configure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3FileIO with any s3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. |
| s3.access-key-id | admin | Configure the static access key id used to access the FileIO. |
| s3.secret-access-key | password | Configure the static secret access key used to access the FileIO. |
| s3.session-token | AQoDYXdzEJr... | Configure the static session token used to access the FileIO. |
| s3.role-session-name | session | An optional identifier for the assumed role session. |
| s3.role-arn | arn:aws:... | AWS Role ARN. If provided instead of access_key and secret_key, temporary credentials will be fetched by assuming this role. |
| s3.signer | bearer | Configure the signature version of the FileIO. |
| s3.signer.uri | <http://my.signer:8080/s3> | Configure the remote signing uri if it differs from the catalog uri. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. |
| s3.signer.endpoint | v1/main/s3-sign | Configure the remote signing endpoint. Remote signing is only implemented for `FsspecFileIO`. The final request is sent to `<s3.signer.uri>/<s3.signer.endpoint>`. (default : v1/aws/s3/sign). |
| s3.region | us-west-2 | Configure the default region used to initialize an S3FileSystem. This setting will be overwritten if the bucket actually used resolves to a different region. |
jiakai-li marked this conversation as resolved.
Show resolved Hide resolved
| s3.proxy-uri | <http://my.proxy.com:8080> | Configure the proxy server to be used by the FileIO. |
| s3.connect-timeout | 60.0 | Configure socket connection timeout, in seconds. |
| s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. |

<!-- markdown-link-check-enable-->

Expand Down
22 changes: 13 additions & 9 deletions pyiceberg/io/pyarrow.py
jiakai-li marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def parse_location(location: str) -> Tuple[str, str, str]:

def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSystem:
if scheme in {"s3", "s3a", "s3n", "oss"}:
from pyarrow.fs import S3FileSystem
from pyarrow.fs import S3FileSystem, resolve_s3_region
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since oss scheme uses this path, does it also support resolve_s3_region?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @kevinjqliu . This is a really good catch. I didn't find too much information regarding support of oss regions by pyarrow.fs.resolve_s3_region. But I tried it on my end and it doens't seem to work as it throws me an error complaining the bucket cannot be found.

This could be a problem though, especially if the same bucket name is used by both Aliyun and AWS. In which case the user-provided bucket region for Aliyun could be wrongly overwritten (by the resolved AWS one).

I separate the oss path from s3 for now as I'm not sure if we want to tackle on the oss now (and I feel we probably want to treat the two protocol differently?). I also break the _initialize_fs code chunk into smaller blocks to make it a bit easier for future modification.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find too much information regarding support of oss regions by pyarrow.fs.resolve_s3_region. But I tried it on my end and it doens't seem to work as it throws me an error complaining the bucket cannot be found.

i dont think its supported, the underlying call is looking for x-amz-bucket-region which i dont think aliyun will set
https://github.com/apache/arrow/blob/48d5151b87f1b8f977344c7ac20cb0810e46f733/cpp/src/arrow/filesystem/s3fs.cc#L660

This could be a problem though, especially if the same bucket name is used by both Aliyun and AWS. In which case the user-provided bucket region for Aliyun could be wrongly overwritten (by the resolved AWS one).

since we're using both scheme and bucket to cache FS, this should be fine right? For the case of oss://bucket and s3://bucket.

I separate the oss path from s3 for now as I'm not sure if we want to tackle on the oss now (and I feel we probably want to treat the two protocol differently?). I also break the _initialize_fs code chunk into smaller blocks to make it a bit easier for future modification.

yea lets just deal with s3 for now. Btw fsspec splits construct per fs, i think it looks pretty clean.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think its supported, the underlying call is looking for x-amz-bucket-region which i dont think aliyun will set

Thank you for checking that, I should have looked at it :-)

since we're using both scheme and bucket to cache FS, this should be fine right? For the case of oss://bucket and s3://bucket

Yes, there is no issue after the change now. What I was thinking is for the oss://bucket scenario (ignore the caching behavior). If the bucket used by oss also exists in AWS, then the previous version (before your comment) would try to resolve the bucket and use it to overwrite the defaul setting. This will be wrong though, because oss bucket region cannot be resolved using pyarrow.

I updated the test case to take this into account and also added an integration test for multiple filesystem read.


client_kwargs: Dict[str, Any] = {
"endpoint_override": self.properties.get(S3_ENDPOINT),
Expand All @@ -362,6 +362,12 @@ def _initialize_fs(self, scheme: str, netloc: Optional[str] = None) -> FileSyste
"region": get_first_property_value(self.properties, S3_REGION, AWS_REGION),
}

# Override the default s3.region if netloc(bucket) resolves to a different region
try:
client_kwargs["region"] = resolve_s3_region(netloc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about doing this lookup only when the region is not provided explicitly? I think this will do another call to S3.

Copy link
Contributor Author

@jiakai-li jiakai-li Dec 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Fokko, my understanding is that the problem occurs when the provided region doesn't match the data file bucket region, and that will fail the file read for pyarrow. And by overwriting the bucket region (fall back to provided region), we make sure the real bucket region that a data file is stored takes precedence. (this function is cached when using fs_by_scheme, so it will be called only for new bucket that's not resolved previously to save calls to S3)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are these 3 cases we're worried about:

# region match
region=us-east-1
s3://foo-us-east-1/
s3://bar-us-east-1/

# region mismatch
region=us-west-2
s3://foo-us-east-1/
s3://bar-us-west-2/

# region not provided
region=None
s3://foo-us-east-1/
s3://bar-us-west-2/

We have 2 options here

  1. use region when provided, fallback to resolve_s3_region
  2. always use resolve_s3_region
  3. resolve_s3_region, fall back to region

Option 1 is difficult since we dont know that the provided region is wrong until we try to use the FileIO.

The code above uses option 2 which will always make an extra call to S3 to get the correct bucket region. This extra call to S3 is cached though, so resolve_s3_region is only called once per bucket.
This is similar to the cache_regions option for s3fs.core.S3FileSystem

I like option 3, we can resolve the bucket region and fallback to the provided region. It might be confusing to the enduser when a region is specified but the FileIO uses a different region, so lets add a warning for that.

Something like this

# Attempt to resolve the S3 region for the bucket, falling back to configured region if resolution fails
# Note, bucket resolution is cached and only called once per bucket
provided_region = get_first_property_value(self.properties, S3_REGION, AWS_REGION)
try:
    bucket_region = resolve_s3_region(bucket=netloc)
except (OSError, TypeError):
    bucket_region = None
    logger.warning(f"Unable to resolve region for bucket {netloc}, using default region {provided_region}")

if bucket_region and bucket_region != provided_region:
    logger.warning(
        f"PyArrow FileIO overriding S3 bucket region for bucket {netloc}: "
        f"provided region {provided_region}, actual region {bucket_region}"
    )
region = bucket_region or provided_region

client_kwargs: Dict[str, Any] = {
    "endpoint_override": self.properties.get(S3_ENDPOINT),
    "access_key": get_first_property_value(self.properties, S3_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
    "secret_key": get_first_property_value(self.properties, S3_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
    "session_token": get_first_property_value(self.properties, S3_SESSION_TOKEN, AWS_SESSION_TOKEN),
    "region": region,
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for elaborating on this, I want to make sure that the user is aware of it, and I think we do that right with the warning.

For some additional context, for Java we don't have this issue because when you try to query the wrong region, the AWS SDK returns an HTTP 301 to the correct region. This introduces another 200 call but that's okay. The PyArrow implementation (that I believe uses the AWS C++ SDK underneath), just throws an error that it got a 301. We saw that in the past for example here: #515 (comment).

except (OSError, TypeError):
logger.warning(f"Unable to resolve region for bucket {netloc}, using default region {client_kwargs['region']}")

if proxy_uri := self.properties.get(S3_PROXY_URI):
client_kwargs["proxy_options"] = proxy_uri

Expand Down Expand Up @@ -1326,13 +1332,14 @@ def _task_to_table(
return None


def _read_all_delete_files(fs: FileSystem, tasks: Iterable[FileScanTask]) -> Dict[str, List[ChunkedArray]]:
def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> Dict[str, List[ChunkedArray]]:
deletes_per_file: Dict[str, List[ChunkedArray]] = {}
unique_deletes = set(itertools.chain.from_iterable([task.delete_files for task in tasks]))
if len(unique_deletes) > 0:
executor = ExecutorFactory.get_or_create()
deletes_per_files: Iterator[Dict[str, ChunkedArray]] = executor.map(
lambda args: _read_deletes(*args), [(fs, delete) for delete in unique_deletes]
lambda args: _read_deletes(*args),
[(_fs_from_file_path(delete_file.file_path, io), delete_file) for delete_file in unique_deletes],
)
for delete in deletes_per_files:
for file, arr in delete.items():
Expand Down Expand Up @@ -1366,7 +1373,6 @@ def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem:
class ArrowScan:
_table_metadata: TableMetadata
_io: FileIO
_fs: FileSystem
_projected_schema: Schema
_bound_row_filter: BooleanExpression
_case_sensitive: bool
Expand All @@ -1376,7 +1382,6 @@ class ArrowScan:
Attributes:
_table_metadata: Current table metadata of the Iceberg table
_io: PyIceberg FileIO implementation from which to fetch the io properties
_fs: PyArrow FileSystem to use to read the files
_projected_schema: Iceberg Schema to project onto the data files
_bound_row_filter: Schema bound row expression to filter the data with
_case_sensitive: Case sensitivity when looking up column names
Expand All @@ -1394,7 +1399,6 @@ def __init__(
) -> None:
self._table_metadata = table_metadata
self._io = io
self._fs = _fs_from_file_path(table_metadata.location, io) # TODO: use different FileSystem per file
jiakai-li marked this conversation as resolved.
Show resolved Hide resolved
self._projected_schema = projected_schema
self._bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive)
self._case_sensitive = case_sensitive
Expand Down Expand Up @@ -1434,7 +1438,7 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
ResolveError: When a required field cannot be found in the file
ValueError: When a field type in the file cannot be projected to the schema type
"""
deletes_per_file = _read_all_delete_files(self._fs, tasks)
deletes_per_file = _read_all_delete_files(self._io, tasks)
executor = ExecutorFactory.get_or_create()

def _table_from_scan_task(task: FileScanTask) -> pa.Table:
Expand Down Expand Up @@ -1497,7 +1501,7 @@ def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.Record
ResolveError: When a required field cannot be found in the file
ValueError: When a field type in the file cannot be projected to the schema type
"""
deletes_per_file = _read_all_delete_files(self._fs, tasks)
deletes_per_file = _read_all_delete_files(self._io, tasks)
return self._record_batches_from_scan_tasks_and_deletes(tasks, deletes_per_file)

def _record_batches_from_scan_tasks_and_deletes(
Expand All @@ -1508,7 +1512,7 @@ def _record_batches_from_scan_tasks_and_deletes(
if self._limit is not None and total_row_count >= self._limit:
break
batches = _task_to_record_batches(
self._fs,
_fs_from_file_path(task.file.file_path, self._io),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice! I think this solves #1041 as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it is :-)

Copy link
Contributor

@Fokko Fokko Dec 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I noticed that we first pass in the path here, and the IO as a second argument. For _read_all_delete_files it is the other way around. How about unifying this?

task,
self._bound_row_filter,
self._projected_schema,
Expand Down
69 changes: 67 additions & 2 deletions tests/io/test_pyarrow.py
jiakai-li marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,12 @@ def test_pyarrow_s3_session_properties() -> None:
**UNIFIED_AWS_SESSION_PROPERTIES,
}

with patch("pyarrow.fs.S3FileSystem") as mock_s3fs:
with patch("pyarrow.fs.S3FileSystem") as mock_s3fs, patch("pyarrow.fs.resolve_s3_region") as mock_s3_region_resolver:
jiakai-li marked this conversation as resolved.
Show resolved Hide resolved
s3_fileio = PyArrowFileIO(properties=session_properties)
filename = str(uuid.uuid4())

# Mock `resolve_s3_region` to prevent from the location used resolving to a different s3 region
mock_s3_region_resolver.side_effect = OSError("S3 bucket is not found")
s3_fileio.new_input(location=f"s3://warehouse/{filename}")

mock_s3fs.assert_called_with(
Expand All @@ -381,10 +383,11 @@ def test_pyarrow_unified_session_properties() -> None:
**UNIFIED_AWS_SESSION_PROPERTIES,
}

with patch("pyarrow.fs.S3FileSystem") as mock_s3fs:
with patch("pyarrow.fs.S3FileSystem") as mock_s3fs, patch("pyarrow.fs.resolve_s3_region") as mock_s3_region_resolver:
jiakai-li marked this conversation as resolved.
Show resolved Hide resolved
s3_fileio = PyArrowFileIO(properties=session_properties)
filename = str(uuid.uuid4())

mock_s3_region_resolver.return_value = "client.region"
s3_fileio.new_input(location=f"s3://warehouse/{filename}")

mock_s3fs.assert_called_with(
Expand Down Expand Up @@ -2074,3 +2077,65 @@ def test__to_requested_schema_timestamps_without_downcast_raises_exception(
_to_requested_schema(requested_schema, file_schema, batch, downcast_ns_timestamp_to_us=False, include_field_ids=False)

assert "Unsupported schema projection from timestamp[ns] to timestamp[us]" in str(exc_info.value)

kevinjqliu marked this conversation as resolved.
Show resolved Hide resolved

def test_pyarrow_file_io_fs_by_scheme_cache() -> None:
# It's better to set up multi-region minio servers for an integration test once `endpoint_url` argument becomes available for `resolve_s3_region`
# Refer to: https://github.com/apache/arrow/issues/43713

pyarrow_file_io = PyArrowFileIO()
us_east_1_region = "us-east-1"
ap_southeast_2_region = "ap-southeast-2"

with patch("pyarrow.fs.resolve_s3_region") as mock_s3_region_resolver:
# Call with new argument resolves region automatically
mock_s3_region_resolver.return_value = us_east_1_region
filesystem_us = pyarrow_file_io.fs_by_scheme("s3", "us-east-1-bucket")
assert filesystem_us.region == us_east_1_region
assert pyarrow_file_io.fs_by_scheme.cache_info().misses == 1 # type: ignore
assert pyarrow_file_io.fs_by_scheme.cache_info().currsize == 1 # type: ignore

# Call with different argument also resolves region automatically
mock_s3_region_resolver.return_value = ap_southeast_2_region
filesystem_ap_southeast_2 = pyarrow_file_io.fs_by_scheme("s3", "ap-southeast-2-bucket")
assert filesystem_ap_southeast_2.region == ap_southeast_2_region
assert pyarrow_file_io.fs_by_scheme.cache_info().misses == 2 # type: ignore
assert pyarrow_file_io.fs_by_scheme.cache_info().currsize == 2 # type: ignore

# Call with same argument hits cache
filesystem_us_cached = pyarrow_file_io.fs_by_scheme("s3", "us-east-1-bucket")
assert filesystem_us_cached.region == us_east_1_region
assert pyarrow_file_io.fs_by_scheme.cache_info().hits == 1 # type: ignore

# Call with same argument hits cache
filesystem_ap_southeast_2_cached = pyarrow_file_io.fs_by_scheme("s3", "ap-southeast-2-bucket")
assert filesystem_ap_southeast_2_cached.region == ap_southeast_2_region
assert pyarrow_file_io.fs_by_scheme.cache_info().hits == 2 # type: ignore


def test_pyarrow_io_new_input_multi_region() -> None:
# It's better to set up multi-region minio servers for an integration test once `endpoint_url` argument becomes available for `resolve_s3_region`
# Refer to: https://github.com/apache/arrow/issues/43713

bucket_regions = [
("us-east-2-bucket", "us-east-2"),
("ap-southeast-2-bucket", "ap-southeast-2"),
]

def _s3_region_map(bucket: str) -> str:
for bucket_region in bucket_regions:
if bucket_region[0] == bucket:
return bucket_region[1]
raise OSError("Unknown bucket")

# For one single pyarrow io instance with configured default s3 region
pyarrow_file_io = PyArrowFileIO({"s3.region": "ap-southeast-1"})
with patch("pyarrow.fs.resolve_s3_region") as mock_s3_region_resolver:
mock_s3_region_resolver.side_effect = _s3_region_map

# The filesystem region is set by provided property by default (when bucket region cannot be resolved)
assert pyarrow_file_io.new_input("s3://non-exist-bucket/path/to/file")._filesystem.region == "ap-southeast-1"

# The filesystem region is overwritten by provided bucket region (when bucket region resolves to a different one)
for bucket_region in bucket_regions:
assert pyarrow_file_io.new_input(f"s3://{bucket_region[0]}/path/to/file")._filesystem.region == bucket_region[1]
Loading