Skip to content

Commit fea3e89

Browse files
committed
HADOOP-19569. output stream close.
Took the opportunity to move all of WriteOperationHelper to calling store, not FS, which then required all multipart IO to go there too. Next stage of store design: multipart IO is its own service underneath the store, so keeping the size of Store interface and Impl under control.
1 parent bd3dd6d commit fea3e89

18 files changed

+792
-357
lines changed
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.io;
20+
21+
import java.time.Duration;
22+
23+
/**
24+
* Interface for specific rate limiting of read and write operations.
25+
* {@see org.apache.hadoop.util.RateLimiting}.
26+
*/
27+
public interface IORateLimiting {
28+
29+
/**
30+
* Acquire write capacity for operations.
31+
* This should be done within retry loops.
32+
* @param capacity capacity to acquire.
33+
* @return time spent waiting for output.
34+
*/
35+
Duration acquireWriteCapacity(int capacity);
36+
37+
/**
38+
* Acquire read capacity for operations.
39+
* This should be done within retry loops.
40+
* @param capacity capacity to acquire.
41+
* @return time spent waiting for output.
42+
*/
43+
Duration acquireReadCapacity(int capacity);
44+
45+
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 17 additions & 210 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,9 @@
2525
import java.io.UncheckedIOException;
2626
import java.net.URI;
2727
import java.nio.file.AccessDeniedException;
28-
import java.text.DateFormat;
29-
import java.text.SimpleDateFormat;
3028
import java.time.Duration;
31-
import java.time.Instant;
3229
import java.util.ArrayList;
3330
import java.util.Collections;
34-
import java.util.Date;
3531
import java.util.EnumSet;
3632
import java.util.Iterator;
3733
import java.util.List;
@@ -53,15 +49,10 @@
5349

5450
import software.amazon.awssdk.core.exception.SdkException;
5551
import software.amazon.awssdk.services.s3.S3Client;
56-
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
57-
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
5852
import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
5953
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
6054
import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
6155
import software.amazon.awssdk.services.s3.model.MultipartUpload;
62-
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
63-
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
64-
import software.amazon.awssdk.services.s3.model.ListMultipartUploadsRequest;
6556
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
6657
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
6758
import software.amazon.awssdk.awscore.exception.AwsServiceException;
@@ -146,7 +137,6 @@
146137
import org.apache.hadoop.fs.s3a.impl.StoreContext;
147138
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
148139
import org.apache.hadoop.fs.s3a.impl.StoreContextFactory;
149-
import org.apache.hadoop.fs.s3a.impl.UploadContentProviders;
150140
import org.apache.hadoop.fs.s3a.impl.CSEUtils;
151141
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
152142
import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
@@ -253,7 +243,6 @@
253243
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_OVERWRITE;
254244
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_PERFORMANCE;
255245
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
256-
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.CONTENT_TYPE_OCTET_STREAM;
257246
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AP_REQUIRED_EXCEPTION;
258247
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.ARN_BUCKET_OPTION;
259248
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
@@ -1364,9 +1353,13 @@ public FlagSet<PerformanceFlagEnum> getPerformanceFlags() {
13641353

13651354
/**
13661355
* Get the store for low-level operations.
1356+
* This is absolutely not for external access; it's a single method
1357+
* just to ease use throughout internal code.
13671358
* @return the store the S3A FS is working through.
13681359
*/
1369-
private S3AStore getStore() {
1360+
1361+
@InterfaceAudience.Private
1362+
public S3AStore getStore() {
13701363
return store;
13711364
}
13721365

@@ -1431,7 +1424,10 @@ private void initMultipartUploads(Configuration conf) throws IOException {
14311424
Duration.ofSeconds(DEFAULT_PURGE_EXISTING_MULTIPART_AGE),
14321425
TimeUnit.SECONDS,
14331426
Duration.ZERO);
1434-
abortOutstandingMultipartUploads(purgeDuration.getSeconds());
1427+
getStore().getMultipartIO().abortOutstandingMultipartUploads(
1428+
purgeDuration.getSeconds(),
1429+
"",
1430+
maxKeys);
14351431
} catch (AccessDeniedException e) {
14361432
instrumentation.errorIgnored();
14371433
LOG.debug("Failed to purge multipart uploads against {}," +
@@ -1440,34 +1436,6 @@ private void initMultipartUploads(Configuration conf) throws IOException {
14401436
}
14411437
}
14421438

1443-
/**
1444-
* Abort all outstanding MPUs older than a given age.
1445-
* @param seconds time in seconds
1446-
* @throws IOException on any failure, other than 403 "permission denied"
1447-
*/
1448-
@Retries.RetryTranslated
1449-
public void abortOutstandingMultipartUploads(long seconds)
1450-
throws IOException {
1451-
Preconditions.checkArgument(seconds >= 0);
1452-
Instant purgeBefore =
1453-
Instant.now().minusSeconds(seconds);
1454-
LOG.debug("Purging outstanding multipart uploads older than {}",
1455-
purgeBefore);
1456-
invoker.retry("Purging multipart uploads", bucket, true,
1457-
() -> {
1458-
RemoteIterator<MultipartUpload> uploadIterator =
1459-
MultipartUtils.listMultipartUploads(createStoreContext(),
1460-
getS3Client(), null, maxKeys);
1461-
1462-
while (uploadIterator.hasNext()) {
1463-
MultipartUpload upload = uploadIterator.next();
1464-
if (upload.initiated().compareTo(purgeBefore) < 0) {
1465-
abortMultipartUpload(upload);
1466-
}
1467-
}
1468-
});
1469-
}
1470-
14711439
/**
14721440
* Return the protocol scheme for the FileSystem.
14731441
*
@@ -1934,33 +1902,6 @@ private ObjectInputStreamCallbacks createInputStreamCallbacks(
19341902
}
19351903

19361904

1937-
/**
1938-
* Callbacks for WriteOperationHelper.
1939-
*/
1940-
private final class WriteOperationHelperCallbacksImpl
1941-
implements WriteOperationHelper.WriteOperationHelperCallbacks {
1942-
1943-
@Override
1944-
@Retries.OnceRaw
1945-
public CompleteMultipartUploadResponse completeMultipartUpload(
1946-
CompleteMultipartUploadRequest request) {
1947-
checkRunning();
1948-
return getStore().completeMultipartUpload(request);
1949-
}
1950-
1951-
@Override
1952-
@Retries.OnceRaw
1953-
public UploadPartResponse uploadPart(
1954-
final UploadPartRequest request,
1955-
final RequestBody body,
1956-
final DurationTrackerFactory durationTrackerFactory)
1957-
throws AwsServiceException, UncheckedIOException {
1958-
checkRunning();
1959-
return getStore().uploadPart(request, body, durationTrackerFactory);
1960-
}
1961-
1962-
}
1963-
19641905
/**
19651906
* Create the read context for reading from the referenced file,
19661907
* using FS state as well as the status.
@@ -2250,12 +2191,11 @@ public WriteOperationHelper getWriteOperationHelper() {
22502191
*/
22512192
@InterfaceAudience.Private
22522193
public WriteOperationHelper createWriteOperationHelper(AuditSpan auditSpan) {
2253-
return new WriteOperationHelper(this,
2254-
getConf(),
2255-
statisticsContext,
2194+
return new WriteOperationHelper(
22562195
getAuditSpanSource(),
22572196
auditSpan,
2258-
new WriteOperationHelperCallbacksImpl());
2197+
getStore().createWriteOperationHelperCallbacks()
2198+
);
22592199
}
22602200

22612201
/**
@@ -2710,7 +2650,7 @@ private long abortMultipartUploadsUnderPrefix(StoreContext storeContext,
27102650
span.activate();
27112651
return foreach(uploads, upload ->
27122652
invoker.retry("Aborting multipart commit", upload.key(), true, () ->
2713-
abortMultipartUpload(upload)));
2653+
getStore().getMultipartIO().abortMultipartUpload(upload)));
27142654
}
27152655

27162656
/**
@@ -2942,14 +2882,7 @@ protected void incrementGauge(Statistic statistic, long count) {
29422882
* @param ex exception.
29432883
*/
29442884
public void operationRetried(Exception ex) {
2945-
if (isThrottleException(ex)) {
2946-
LOG.debug("Request throttled");
2947-
incrementStatistic(STORE_IO_THROTTLED);
2948-
statisticsContext.addValueToQuantiles(STORE_IO_THROTTLE_RATE, 1);
2949-
} else {
2950-
incrementStatistic(STORE_IO_RETRY);
2951-
incrementStatistic(IGNORED_ERRORS);
2952-
}
2885+
getStore().operationRetried(ex);
29532886
}
29542887

29552888
/**
@@ -3294,64 +3227,6 @@ public UploadInfo putObject(PutObjectRequest putObjectRequest, File file,
32943227
return getStore().putObject(putObjectRequest, file, listener);
32953228
}
32963229

3297-
/**
3298-
* PUT an object directly (i.e. not via the transfer manager).
3299-
* Byte length is calculated from the file length, or, if there is no
3300-
* file, from the content length of the header.
3301-
*
3302-
* Retry Policy: none.
3303-
* Auditing: must be inside an audit span.
3304-
* <i>Important: this call will close any input stream in the request.</i>
3305-
* @param putObjectRequest the request
3306-
* @param putOptions put object options
3307-
* @param uploadData data to be uploaded
3308-
* @param durationTrackerFactory factory for duration tracking
3309-
* @return the upload initiated
3310-
* @throws SdkException on problems
3311-
*/
3312-
@VisibleForTesting
3313-
@Retries.OnceRaw("For PUT; post-PUT actions are RetryExceptionsSwallowed")
3314-
PutObjectResponse putObjectDirect(PutObjectRequest putObjectRequest,
3315-
PutObjectOptions putOptions,
3316-
S3ADataBlocks.BlockUploadData uploadData,
3317-
DurationTrackerFactory durationTrackerFactory)
3318-
throws SdkException {
3319-
checkRunning();
3320-
long len = getPutRequestLength(putObjectRequest);
3321-
LOG.debug("PUT {} bytes to {}", len, putObjectRequest.key());
3322-
incrementPutStartStatistics(len);
3323-
final UploadContentProviders.BaseContentProvider provider =
3324-
uploadData.getContentProvider();
3325-
try {
3326-
PutObjectResponse response =
3327-
trackDurationOfSupplier(nonNullDurationTrackerFactory(durationTrackerFactory),
3328-
OBJECT_PUT_REQUESTS.getSymbol(),
3329-
() -> getS3Client().putObject(putObjectRequest,
3330-
RequestBody.fromContentProvider(
3331-
provider,
3332-
provider.getSize(),
3333-
CONTENT_TYPE_OCTET_STREAM)));
3334-
incrementPutCompletedStatistics(true, len);
3335-
return response;
3336-
} catch (SdkException e) {
3337-
incrementPutCompletedStatistics(false, len);
3338-
throw e;
3339-
}
3340-
}
3341-
3342-
/**
3343-
* Get the length of the PUT, verifying that the length is known.
3344-
* @param putObjectRequest a request bound to a file or a stream.
3345-
* @return the request length
3346-
* @throws IllegalArgumentException if the length is negative
3347-
*/
3348-
private long getPutRequestLength(PutObjectRequest putObjectRequest) {
3349-
long len = putObjectRequest.contentLength();
3350-
3351-
Preconditions.checkState(len >= 0, "Cannot PUT object of unknown length");
3352-
return len;
3353-
}
3354-
33553230
/**
33563231
* Upload part of a multi-partition file.
33573232
* Increments the write and put counters.
@@ -4653,23 +4528,6 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size,
46534528
return response;
46544529
}
46554530

4656-
/**
4657-
* Initiate a multipart upload from the preconfigured request.
4658-
* Retry policy: none + untranslated.
4659-
* @param request request to initiate
4660-
* @return the result of the call
4661-
* @throws SdkException on failures inside the AWS SDK
4662-
* @throws IOException Other IO problems
4663-
*/
4664-
@Retries.OnceRaw
4665-
CreateMultipartUploadResponse initiateMultipartUpload(
4666-
CreateMultipartUploadRequest request) throws IOException {
4667-
LOG.debug("Initiate multipart upload to {}", request.key());
4668-
return trackDurationOfSupplier(getDurationTrackerFactory(),
4669-
OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(),
4670-
() -> getS3Client().createMultipartUpload(request));
4671-
}
4672-
46734531
/**
46744532
* Perform post-write actions.
46754533
* <p>
@@ -4719,7 +4577,7 @@ private void createEmptyObject(final String objectName, PutObjectOptions putOpti
47194577
new byte[0], 0, 0, null);
47204578

47214579
invoker.retry("PUT 0-byte object ", objectName, true,
4722-
() -> putObjectDirect(
4580+
() -> getStore().putObjectDirect(
47234581
getRequestFactory().newDirectoryMarkerRequest(objectName).build(),
47244582
putOptions,
47254583
uploadData,
@@ -5299,13 +5157,7 @@ public RemoteIterator<MultipartUpload> listUploadsUnderPrefix(
52995157
final StoreContext storeContext,
53005158
final @Nullable String prefix)
53015159
throws IOException {
5302-
// span is picked up retained in the listing.
5303-
String p = prefix;
5304-
if (prefix != null && !prefix.isEmpty() && !prefix.endsWith("/")) {
5305-
p = prefix + "/";
5306-
}
5307-
// duration tracking is done in iterator.
5308-
return MultipartUtils.listMultipartUploads(storeContext, getS3Client(), p, maxKeys);
5160+
return getStore().getMultipartIO().listMultipartUploads(storeContext, prefix, maxKeys);
53095161
}
53105162

53115163
/**
@@ -5321,54 +5173,9 @@ public RemoteIterator<MultipartUpload> listUploadsUnderPrefix(
53215173
@Retries.RetryTranslated
53225174
public List<MultipartUpload> listMultipartUploads(String prefix)
53235175
throws IOException {
5324-
// add a trailing / if needed.
5325-
if (prefix != null && !prefix.isEmpty() && !prefix.endsWith("/")) {
5326-
prefix = prefix + "/";
5327-
}
5328-
String p = prefix;
5329-
return invoker.retry("listMultipartUploads", p, true, () -> {
5330-
final ListMultipartUploadsRequest request = getRequestFactory()
5331-
.newListMultipartUploadsRequestBuilder(p).build();
5332-
return trackDuration(getInstrumentation(), MULTIPART_UPLOAD_LIST.getSymbol(), () ->
5333-
getS3Client().listMultipartUploads(request).uploads());
5334-
});
5335-
}
5336-
5337-
/**
5338-
* Abort a multipart upload.
5339-
* Retry policy: none.
5340-
* @param destKey destination key
5341-
* @param uploadId Upload ID
5342-
* @throws IOException IO failure, including any uprated SdkException
5343-
*/
5344-
@Retries.OnceTranslated
5345-
public void abortMultipartUpload(String destKey, String uploadId) throws IOException {
5346-
LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey);
5347-
trackDuration(getInstrumentation(), OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), () ->
5348-
getS3Client().abortMultipartUpload(
5349-
getRequestFactory().newAbortMultipartUploadRequestBuilder(
5350-
destKey,
5351-
uploadId).build()));
5176+
return getStore().getMultipartIO().listMultipartUploads(prefix);
53525177
}
53535178

5354-
/**
5355-
* Abort a multipart upload.
5356-
* Retry policy: none.
5357-
* @param upload the listed upload to abort.
5358-
* @throws IOException IO failure, including any uprated SdkException
5359-
*/
5360-
@Retries.OnceTranslated
5361-
public void abortMultipartUpload(MultipartUpload upload) throws IOException {
5362-
String destKey = upload.key();
5363-
String uploadId = upload.uploadId();
5364-
if (LOG.isDebugEnabled()) {
5365-
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
5366-
LOG.debug("Aborting multipart upload {} to {} initiated by {} on {}",
5367-
uploadId, destKey, upload.initiator(),
5368-
df.format(Date.from(upload.initiated())));
5369-
}
5370-
abortMultipartUpload(destKey, uploadId);
5371-
}
53725179

53735180
/**
53745181
* Create a new instance of the committer statistics.

0 commit comments

Comments
 (0)