Skip to content

Commit cf32b54

Browse files
committed
HADOOP-19569. S3A: stream write/close fails badly once FS is closed
Executors in hadoop-common to - pick up shutdown of inner executor and shut themselves down. - semaphore executor to decrement counters in this process so that queue state is updated This stops callers being able to submit work when the inner executor has shut down. S3A code - StoreImpl to IllegalStateException on method invocation whene the service isn't running. Some methods are kept open as they do seem needed. - WriteOperationHelper callbacks to raise IllegalStateException when invoked when FS is closed. This is complex. TODO: - WriteOperationHelper MUST make all calls to the FS through its callback interface, rather than given a ref to S3AFS. This makes it easy to identify and lock down the methods. - What is the correct exception to raise in write/close() failures? IOE or illegal state?
1 parent d491f0b commit cf32b54

File tree

5 files changed

+173
-14
lines changed

5 files changed

+173
-14
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -130,21 +130,20 @@ public static BlockingThreadPoolExecutorService newInstance(
130130
slower than enqueueing. */
131131
final BlockingQueue<Runnable> workQueue =
132132
new LinkedBlockingQueue<>(waitingTasks + activeTasks);
133+
final InnerExecutorRejection rejection = new InnerExecutorRejection();
133134
ThreadPoolExecutor eventProcessingExecutor =
134135
new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit,
135136
workQueue, newDaemonThreadFactory(prefixName),
136-
new RejectedExecutionHandler() {
137-
@Override
138-
public void rejectedExecution(Runnable r,
139-
ThreadPoolExecutor executor) {
140-
// This is not expected to happen.
141-
LOG.error("Could not submit task to executor {}",
142-
executor.toString());
143-
}
144-
});
137+
rejection);
145138
eventProcessingExecutor.allowCoreThreadTimeOut(true);
146-
return new BlockingThreadPoolExecutorService(waitingTasks + activeTasks,
147-
eventProcessingExecutor);
139+
final BlockingThreadPoolExecutorService service =
140+
new BlockingThreadPoolExecutorService(waitingTasks + activeTasks,
141+
eventProcessingExecutor);
142+
rejection.setDelegate((r, executor) -> {
143+
service.shutdown();
144+
});
145+
146+
return service;
148147
}
149148

150149
/**
@@ -164,4 +163,28 @@ public String toString() {
164163
.append('}');
165164
return sb.toString();
166165
}
166+
167+
private static class InnerExecutorRejection implements RejectedExecutionHandler {
168+
169+
private RejectedExecutionHandler delegate;
170+
171+
private RejectedExecutionHandler getDelegate() {
172+
return delegate;
173+
}
174+
175+
private void setDelegate(final RejectedExecutionHandler delegate) {
176+
this.delegate = delegate;
177+
}
178+
179+
@Override
180+
public void rejectedExecution(Runnable r,
181+
ThreadPoolExecutor executor) {
182+
// This is not expected to happen.
183+
LOG.error("Could not submit task to executor {}",
184+
executor.toString());
185+
if (getDelegate() != null) {
186+
delegate.rejectedExecution(r, executor);
187+
}
188+
}
189+
}
167190
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.ExecutionException;
3232
import java.util.concurrent.ExecutorService;
3333
import java.util.concurrent.Future;
34+
import java.util.concurrent.RejectedExecutionException;
3435
import java.util.concurrent.Semaphore;
3536
import java.util.concurrent.TimeUnit;
3637
import java.util.concurrent.TimeoutException;
@@ -151,6 +152,7 @@ public <T> Future<T> submit(Runnable task, T result) {
151152

152153
@Override
153154
public Future<?> submit(Runnable task) {
155+
rejectWhenShutdown();
154156
try (DurationTracker ignored =
155157
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
156158
queueingPermits.acquire();
@@ -163,6 +165,7 @@ public Future<?> submit(Runnable task) {
163165

164166
@Override
165167
public void execute(Runnable command) {
168+
rejectWhenShutdown();
166169
try (DurationTracker ignored =
167170
trackerFactory.trackDuration(ACTION_EXECUTOR_ACQUIRED)) {
168171
queueingPermits.acquire();
@@ -208,6 +211,12 @@ public String toString() {
208211
return sb.toString();
209212
}
210213

214+
private void rejectWhenShutdown() {
215+
if (isShutdown()) {
216+
throw new RejectedExecutionException("ExecutorService is shutdown");
217+
}
218+
}
219+
211220
/**
212221
* Releases a permit after the task is executed.
213222
*/
@@ -222,6 +231,7 @@ class RunnableWithPermitRelease implements Runnable {
222231
@Override
223232
public void run() {
224233
try {
234+
rejectWhenShutdown();
225235
delegatee.run();
226236
} finally {
227237
queueingPermits.release();
@@ -244,6 +254,7 @@ class CallableWithPermitRelease<T> implements Callable<T> {
244254
@Override
245255
public T call() throws Exception {
246256
try {
257+
rejectWhenShutdown();
247258
return delegatee.call();
248259
} finally {
249260
queueingPermits.release();

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1944,6 +1944,7 @@ private final class WriteOperationHelperCallbacksImpl
19441944
@Retries.OnceRaw
19451945
public CompleteMultipartUploadResponse completeMultipartUpload(
19461946
CompleteMultipartUploadRequest request) {
1947+
checkRunning();
19471948
return getStore().completeMultipartUpload(request);
19481949
}
19491950

@@ -1954,6 +1955,7 @@ public UploadPartResponse uploadPart(
19541955
final RequestBody body,
19551956
final DurationTrackerFactory durationTrackerFactory)
19561957
throws AwsServiceException, UncheckedIOException {
1958+
checkRunning();
19571959
return getStore().uploadPart(request, body, durationTrackerFactory);
19581960
}
19591961

@@ -3211,6 +3213,7 @@ void deleteObjectAtPath(Path f,
32113213
String key,
32123214
boolean isFile)
32133215
throws SdkException, IOException {
3216+
checkRunning();
32143217
if (isFile) {
32153218
instrumentation.fileDeleted(1);
32163219
} else {
@@ -3313,7 +3316,7 @@ PutObjectResponse putObjectDirect(PutObjectRequest putObjectRequest,
33133316
S3ADataBlocks.BlockUploadData uploadData,
33143317
DurationTrackerFactory durationTrackerFactory)
33153318
throws SdkException {
3316-
3319+
checkRunning();
33173320
long len = getPutRequestLength(putObjectRequest);
33183321
LOG.debug("PUT {} bytes to {}", len, putObjectRequest.key());
33193322
incrementPutStartStatistics(len);
@@ -4422,6 +4425,14 @@ private void checkNotClosed() throws PathIOException {
44224425
}
44234426
}
44244427

4428+
/**
4429+
* Check the FS is running.
4430+
* @throws IllegalStateException if closed
4431+
*/
4432+
protected void checkRunning() throws IllegalStateException {
4433+
Preconditions.checkState(!isClosed, "FileSystem is closed");
4434+
}
4435+
44254436
/**
44264437
* Get the delegation token support for this filesystem;
44274438
* not null iff delegation support is enabled.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,15 @@ protected void serviceStart() throws Exception {
252252
initLocalDirAllocator();
253253
}
254254

255+
/**
256+
* Check the service is running.
257+
* @throws IllegalStateException if not in STARTED.
258+
*/
259+
protected void checkRunning() throws IllegalStateException {
260+
Preconditions.checkState(isInState(STATE.STARTED),
261+
"Store is in state %s", getServiceState());
262+
}
263+
255264
/**
256265
* Return the store path capabilities.
257266
* If the object stream factory is non-null, hands off the
@@ -262,6 +271,7 @@ protected void serviceStart() throws Exception {
262271
*/
263272
@Override
264273
public boolean hasPathCapability(final Path path, final String capability) {
274+
checkRunning();
265275
switch (toLowerCase(capability)) {
266276
case StreamCapabilities.IOSTATISTICS:
267277
return true;
@@ -278,6 +288,7 @@ public boolean hasPathCapability(final Path path, final String capability) {
278288
*/
279289
@Override
280290
public boolean inputStreamHasCapability(final String capability) {
291+
checkRunning();
281292
if (objectInputStreamFactory != null) {
282293
return objectInputStreamFactory.hasCapability(capability);
283294
}
@@ -298,26 +309,29 @@ private void initLocalDirAllocator() {
298309
/** Acquire write capacity for rate limiting {@inheritDoc}. */
299310
@Override
300311
public Duration acquireWriteCapacity(final int capacity) {
312+
checkRunning();
301313
return writeRateLimiter.acquire(capacity);
302314
}
303315

304316
/** Acquire read capacity for rate limiting {@inheritDoc}. */
305317
@Override
306318
public Duration acquireReadCapacity(final int capacity) {
319+
checkRunning();
307320
return readRateLimiter.acquire(capacity);
308-
309321
}
310322

311323
/**
312324
* Create a new store context.
313325
* @return a new store context.
314326
*/
315327
private StoreContext createStoreContext() {
328+
checkRunning();
316329
return storeContextFactory.createStoreContext();
317330
}
318331

319332
@Override
320333
public StoreContext getStoreContext() {
334+
checkRunning();
321335
return storeContext;
322336
}
323337

@@ -327,41 +341,49 @@ public StoreContext getStoreContext() {
327341
* @throws UncheckedIOException on any failure to create the client.
328342
*/
329343
private S3Client getS3Client() throws UncheckedIOException {
344+
checkRunning();
330345
return clientManager.getOrCreateS3ClientUnchecked();
331346
}
332347

333348
@Override
334349
public S3TransferManager getOrCreateTransferManager() throws IOException {
350+
checkRunning();
335351
return clientManager.getOrCreateTransferManager();
336352
}
337353

338354
@Override
339355
public S3Client getOrCreateS3Client() throws IOException {
356+
checkRunning();
340357
return clientManager.getOrCreateS3Client();
341358
}
342359

343360
@Override
344361
public S3AsyncClient getOrCreateAsyncClient() throws IOException {
362+
checkRunning();
345363
return clientManager.getOrCreateAsyncClient();
346364
}
347365

348366
@Override
349367
public S3Client getOrCreateS3ClientUnchecked() throws UncheckedIOException {
368+
checkRunning();
350369
return clientManager.getOrCreateS3ClientUnchecked();
351370
}
352371

353372
@Override
354373
public S3Client getOrCreateAsyncS3ClientUnchecked() throws UncheckedIOException {
374+
checkRunning();
355375
return clientManager.getOrCreateAsyncS3ClientUnchecked();
356376
}
357377

358378
@Override
359379
public S3Client getOrCreateUnencryptedS3Client() throws IOException {
380+
checkRunning();
360381
return clientManager.getOrCreateUnencryptedS3Client();
361382
}
362383

363384
@Override
364385
public DurationTrackerFactory getDurationTrackerFactory() {
386+
checkRunning();
365387
return durationTrackerFactory;
366388
}
367389

@@ -380,6 +402,7 @@ private S3AStorageStatistics getStorageStatistics() {
380402

381403
@Override
382404
public RequestFactory getRequestFactory() {
405+
checkRunning();
383406
return requestFactory;
384407
}
385408

@@ -389,6 +412,7 @@ public RequestFactory getRequestFactory() {
389412
*/
390413
@Override
391414
public ClientManager clientManager() {
415+
checkRunning();
392416
return clientManager;
393417
}
394418

@@ -507,6 +531,7 @@ private void incrementBytesWritten(final long bytes) {
507531
*/
508532
@Override
509533
public void incrementPutStartStatistics(long bytes) {
534+
checkRunning();
510535
LOG.debug("PUT start {} bytes", bytes);
511536
incrementWriteOperations();
512537
incrementGauge(OBJECT_PUT_REQUESTS_ACTIVE, 1);
@@ -599,6 +624,7 @@ public Map.Entry<Duration, DeleteObjectsResponse> deleteObjects(
599624
final DeleteObjectsRequest deleteRequest)
600625
throws SdkException {
601626

627+
checkRunning();
602628
DeleteObjectsResponse response;
603629
BulkDeleteRetryHandler retryHandler = new BulkDeleteRetryHandler(createStoreContext());
604630

@@ -671,6 +697,7 @@ public HeadObjectResponse headObject(String key,
671697
Invoker changeInvoker,
672698
S3AFileSystemOperations fsHandler,
673699
String operation) throws IOException {
700+
checkRunning();
674701
HeadObjectResponse response = getStoreContext().getInvoker()
675702
.retryUntranslated("HEAD " + key, true,
676703
() -> {
@@ -729,6 +756,7 @@ public HeadObjectResponse headObject(String key,
729756
public ResponseInputStream<GetObjectResponse> getRangedS3Object(String key,
730757
long start,
731758
long end) throws IOException {
759+
checkRunning();
732760
final GetObjectRequest request = getRequestFactory().newGetObjectRequestBuilder(key)
733761
.range(S3AUtils.formatRange(start, end))
734762
.build();
@@ -757,7 +785,7 @@ public ResponseInputStream<GetObjectResponse> getRangedS3Object(String key,
757785
public Map.Entry<Duration, Optional<DeleteObjectResponse>> deleteObject(
758786
final DeleteObjectRequest request)
759787
throws SdkException {
760-
788+
checkRunning();
761789
String key = request.key();
762790
blockRootDelete(key);
763791
DurationInfo d = new DurationInfo(LOG, false, "deleting %s", key);
@@ -810,6 +838,7 @@ public UploadPartResponse uploadPart(
810838
final RequestBody body,
811839
@Nullable final DurationTrackerFactory trackerFactory)
812840
throws AwsServiceException, UncheckedIOException {
841+
checkRunning();
813842
long len = request.contentLength();
814843
incrementPutStartStatistics(len);
815844
try {
@@ -852,6 +881,7 @@ public UploadInfo putObject(
852881
PutObjectRequest putObjectRequest,
853882
File file,
854883
ProgressableProgressListener listener) throws IOException {
884+
checkRunning();
855885
long len = getPutRequestLength(putObjectRequest);
856886
LOG.debug("PUT {} bytes to {} via transfer manager ", len, putObjectRequest.key());
857887
incrementPutStartStatistics(len);
@@ -882,6 +912,7 @@ public UploadInfo putObject(
882912
@Retries.OnceTranslated
883913
public CompletedFileUpload waitForUploadCompletion(String key, UploadInfo uploadInfo)
884914
throws IOException {
915+
checkRunning();
885916
FileUpload upload = uploadInfo.getFileUpload();
886917
try {
887918
CompletedFileUpload result = upload.completionFuture().join();
@@ -931,6 +962,7 @@ public LocalDirAllocator getDirectoryAllocator() {
931962
public File createTemporaryFileForWriting(String pathStr,
932963
long size,
933964
Configuration conf) throws IOException {
965+
checkRunning();
934966
requireNonNull(directoryAllocator, "directory allocator not initialized");
935967
Path path = directoryAllocator.getLocalPathForWrite(pathStr,
936968
size, conf);

0 commit comments

Comments
 (0)