-
Notifications
You must be signed in to change notification settings - Fork 5.5k
feat(connector): Migrate presto-hive PrestoS3FileSystem to AWS SDK v2 for better compatibility #25927
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
9d36a30 to
3d3f473
Compare
5983801 to
4785c37
Compare
imjalpreet
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@imsayari404, thank you. I'll be reviewing the current changes this week, but can you please look into the test failures?
Also, have you been able to run any manual integration tests with AWS S3?
674d3e3 to
8daef65
Compare
0985501 to
9767dbe
Compare
f153361 to
67b4961
Compare
imjalpreet
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @imsayari404! I have done a partial review and will add more in my second round.
| AWS_EXEC_READ(ObjectCannedACL.AWS_EXEC_READ), | ||
| BUCKET_OWNER_FULL_CONTROL(ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL), | ||
| BUCKET_OWNER_READ(ObjectCannedACL.BUCKET_OWNER_READ), | ||
| // LOG_DELIVERY_WRITE is not available in AWS SDK v2, removing it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's document this change
| synchronized S3Client getS3Client(Configuration config, HiveClientConfig clientConfig) | ||
| { | ||
| if (s3Client != null) { | ||
| return s3Client; | ||
| } | ||
|
|
||
| s3Client = buildS3Client(config, clientConfig); | ||
| return s3Client; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As PrestoS3SelectClient uses selectObjectContent and in V2, 'selectObjectContent()' is available only on the S3AsyncClient, I don't think we need this anymore, as this class PrestoS3ClientFactory is only used by PrestoS3SelectClient.
| return s3AsyncClient; | ||
| } | ||
|
|
||
| private S3Client buildS3Client(Configuration config, HiveClientConfig clientConfig) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
| .retryPolicy(retryPolicyBuilder -> retryPolicyBuilder | ||
| .numRetries(maxErrorRetries)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the AWS SDK transition docs, we should use ClientOverrideConfiguration.builder().retryStrategy(b -> b.maxAttempts(...)) when upgrading from ClientConfiguration().withMaxErrorRetry(...).
| .overrideConfiguration(builder -> builder | ||
| .retryPolicy(retryPolicyBuilder -> retryPolicyBuilder | ||
| .numRetries(maxErrorRetries)) | ||
| .apiCallTimeout(java.time.Duration.ofMillis(socketTimeout.toMillis())) | ||
| .apiCallAttemptTimeout(java.time.Duration.ofMillis(connectTimeout.toMillis())) | ||
| .putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, userAgentPrefix) | ||
| .putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, s3UserAgentSuffix)) | ||
| .httpClientBuilder(NettyNioAsyncHttpClient.builder() | ||
| .maxConcurrency(maxConnections) | ||
| .connectionTimeout(java.time.Duration.ofMillis(connectTimeout.toMillis())) | ||
| .readTimeout(java.time.Duration.ofMillis(socketTimeout.toMillis()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's create a ClientOverrideConfiguration.builder() as a separate variable for better readability.
| filesToMount.put("hive_s3_insert_overwrite/hadoop-core-site.xml", hadoopCoreSitePath); | ||
| if (isSslEnabledTest) { | ||
| try { | ||
| // Copy dynamically generated keystore files into target/test-classes so that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's revert this?
| Path trustStoreTarget = targetDir.resolve("truststore.jks"); | ||
|
|
||
| synchronized (SSL_LOCK) { | ||
| // Copy freshly generated keystores, replacing if they exist |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's revert
|
|
||
| try { | ||
| this.minIOContainer.start(); | ||
| log.info("MinIO started on port: " + minIOContainer.getMinioApiEndpoint().getPort()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add this as debug log
| S3Client testClient = S3Client.builder() | ||
| .endpointOverride(URI.create("http://localhost:" + minIOContainer.getMinioApiEndpoint().getPort())) | ||
| .region(Region.US_EAST_1) | ||
| .credentialsProvider(StaticCredentialsProvider.create( | ||
| AwsBasicCredentials.create(ACCESS_KEY, SECRET_KEY))) | ||
| .forcePathStyle(true) | ||
| .build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a utility method for this, as the client is being created in multiple methods
|
|
||
| testClient.listBuckets(); | ||
| testClient.close(); | ||
| log.info("MinIO is ready"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep all these test logs as debug logs
imjalpreet
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did another round of review
| private static final MediaType OCTET_STREAM_MEDIA_TYPE = MediaType.create("application", "octet-stream"); | ||
| private static final Set<String> GLACIER_STORAGE_CLASSES = ImmutableSet.of(Glacier.toString(), DeepArchive.toString()); | ||
| private static final Set<String> GLACIER_STORAGE_CLASSES = ImmutableSet.of( | ||
| StorageClass.GLACIER.toString(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: static import
| private static final Set<String> GLACIER_STORAGE_CLASSES = ImmutableSet.of(Glacier.toString(), DeepArchive.toString()); | ||
| private static final Set<String> GLACIER_STORAGE_CLASSES = ImmutableSet.of( | ||
| StorageClass.GLACIER.toString(), | ||
| StorageClass.DEEP_ARCHIVE.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: static import
| StorageClass.GLACIER.toString(), | ||
| StorageClass.DEEP_ARCHIVE.toString()); | ||
|
|
||
| // Configuration fields |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I don't think this comment is required
| this.s3IamRole = conf.get(S3_IAM_ROLE, defaults.getS3IamRole()); | ||
| this.s3IamRoleSessionName = conf.get(S3_IAM_ROLE_SESSION_NAME, defaults.getS3IamRoleSessionName()); | ||
|
|
||
| // Validation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not required
| // Initialize clients | ||
| this.credentialsProvider = createAwsCredentialsProvider(uri, conf); | ||
| this.s3 = createAmazonS3Client(conf, configuration); | ||
| this.s3 = createS3Client(conf, connectTimeout, socketTimeout, maxConnections, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we are already passing conf as a method argument, let's move the extraction of the other method arguments(connectTimeout and so on) to the createS3Client utility method itself. They are not being used anywhere else.
| @Override | ||
| public int read() | ||
| { | ||
| // This stream is wrapped with BufferedInputStream, so this method should never be called |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: revert
| case 404: // NOT_FOUND | ||
| throw new FileNotFoundException("File does not exist: " + path); | ||
| case HTTP_FORBIDDEN: | ||
| case HTTP_BAD_REQUEST: | ||
| case 403: // FORBIDDEN | ||
| case 400: // BAD_REQUEST |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we not using static constants anymore?
| if (in instanceof S3ObjectInputStream) { | ||
| ((S3ObjectInputStream) in).abort(); | ||
| } | ||
| else { | ||
| in.close(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it not valid anymore? I think we can still use abort
| switch (storageClass) { | ||
| case STANDARD: | ||
| return StorageClass.STANDARD; | ||
| case REDUCED_REDUNDANCY: | ||
| return StorageClass.REDUCED_REDUNDANCY; | ||
| case GLACIER: | ||
| return StorageClass.GLACIER; | ||
| case DEEP_ARCHIVE: | ||
| return StorageClass.DEEP_ARCHIVE; | ||
| case INTELLIGENT_TIERING: | ||
| return StorageClass.INTELLIGENT_TIERING; | ||
| default: | ||
| return StorageClass.STANDARD; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on Presto's documentation we only support STANDARD and INTELLIGENT_TIERING
| /** | ||
| * Helper function used to work around the fact that if you use an S3 bucket with an '_' that java.net.URI | ||
| * behaves differently and sets the host value to null whereas S3 buckets without '_' have a properly | ||
| * set host field. '_' is only allowed in S3 bucket names in us-east-1. | ||
| * | ||
| * @param uri The URI from which to extract a host value. | ||
| * @return The host value where uri.getAuthority() is used when uri.getHost() returns null as long as no UserInfo is present. | ||
| * @throws IllegalArgumentException If the bucket can not be determined from the URI. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please revert
cfe86dc to
9767dbe
Compare
1. SdkClient Unable to marshall request to JSON: Key cannot be empty 2. IO unencrypted-content-length header is not set on an encrypted object 3. NoClassDefFound net/jpountz/lz4/LZ4Factory 4. org.apache.hadoop.fs.UnsupportedFileSystemException No FileSystem for scheme "s3" Co-authored-by: Sayari Mukherjee <[email protected]>
Co-authored-by: Sayari Mukherjee <[email protected]>
7ec29fa to
2ec201c
Compare
Description
AWS SDK v1 is in maintenance mode and will be deprecated. Migrating to AWS SDK v2 ensures long-term support, improved performance, async client support, and better compatibility with future AWS features.
Motivation and Context
Impact
Test Plan
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.
If release note is NOT required, use: