Skip to content

Commit 53b5339

Browse files
committed
HADOOP-18568. magic committer optional cleanup
Signed-off-by: Hossein Torabi <[email protected]>
1 parent b04db82 commit 53b5339

File tree

5 files changed

+83
-10
lines changed

5 files changed

+83
-10
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,18 @@ private CommitConstants() {
258258
public static final boolean FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT =
259259
false;
260260

261+
/**
262+
* Should Magic committer cleanup all the staging dirs.
263+
*/
264+
public static final String FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED =
265+
"fs.s3a.committer.magic.cleanup.enabled";
266+
267+
/**
268+
* Default value for {@link #FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED}: {@value}.
269+
*/
270+
public static final boolean FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED_DEFAULT =
271+
true;
272+
261273
/**
262274
* Path in the cluster filesystem for temporary data: {@value}.
263275
* This is for HDFS, not the local filesystem.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTrackerUtils.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,16 @@ public static boolean isTrackMagicCommitsInMemoryEnabled(Configuration conf) {
6161
CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED,
6262
CommitConstants.FS_S3A_COMMITTER_MAGIC_TRACK_COMMITS_IN_MEMORY_ENABLED_DEFAULT);
6363
}
64+
65+
/**
66+
* Is cleanup of magic committer staging dirs enabled.
67+
* @param conf Configuration
68+
* @return true if cleanup of staging dir is enabled.
69+
*/
70+
public static boolean isCleanupMagicCommitterEnabled(
71+
Configuration conf) {
72+
return conf.getBoolean(
73+
CommitConstants.FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED,
74+
CommitConstants.FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED_DEFAULT);
75+
}
6476
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TEMP_DATA;
5151
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
5252
import static org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.*;
53+
import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isCleanupMagicCommitterEnabled;
5354
import static org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTrackerUtils.isTrackMagicCommitsInMemoryEnabled;
5455
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
5556

@@ -131,16 +132,18 @@ protected ActiveCommit listPendingUploadsToCommit(
131132
* Delete the magic directory.
132133
*/
133134
public void cleanupStagingDirs() {
134-
final Path out = getOutputPath();
135-
Path path = getMagicJobPath(getUUID(), out);
136-
try(DurationInfo ignored = new DurationInfo(LOG, true,
137-
"Deleting magic directory %s", path)) {
138-
Invoker.ignoreIOExceptions(LOG, "cleanup magic directory", path.toString(),
139-
() -> deleteWithWarning(getDestFS(), path, true));
140-
// and the job temp directory with manifests
141-
Invoker.ignoreIOExceptions(LOG, "cleanup job directory", path.toString(),
142-
() -> deleteWithWarning(getDestFS(),
143-
new Path(out, TEMP_DATA), true));
135+
if (isCleanupMagicCommitterEnabled(getConf())) {
136+
final Path out = getOutputPath();
137+
Path path = getMagicJobPath(getUUID(), out);
138+
try(DurationInfo ignored = new DurationInfo(LOG, true,
139+
"Deleting magic directory %s", path)) {
140+
Invoker.ignoreIOExceptions(LOG, "cleanup magic directory", path.toString(),
141+
() -> deleteWithWarning(getDestFS(), path, true));
142+
// and the job temp directory with manifests
143+
Invoker.ignoreIOExceptions(LOG, "cleanup job directory", path.toString(),
144+
() -> deleteWithWarning(getDestFS(),
145+
new Path(out, TEMP_DATA), true));
146+
}
144147
}
145148
}
146149

hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,7 @@ The table below provides a summary of each option.
558558
| `fs.s3a.committer.threads` | Number of threads in committers for parallel operations on files.| -4 |
559559
| `fs.s3a.committer.generate.uuid` | Generate a Job UUID if none is passed down from Spark | `false` |
560560
| `fs.s3a.committer.require.uuid` |Require the Job UUID to be passed down from Spark | `false` |
561+
| `fs.s3a.committer.magic.cleanup.enabled` | Cleanup the magic path after the job is committed. | `true` |
561562

562563
The examples below shows how these options can be configured in XML.
563564

@@ -1058,3 +1059,20 @@ one of the following conditions are met
10581059
1. The committer is being used in spark, and the version of spark being used does not
10591060
set the `spark.sql.sources.writeJobUUID` property.
10601061
Either upgrade to a new spark release, or set `fs.s3a.committer.generate.uuid` to true.
1062+
1063+
### Long Job Completion Time Due to Magic Committer Cleanup
1064+
When using the S3A Magic Committer in large Spark or MapReduce jobs, job completion can be significantly delayed
1065+
due to the cleanup of temporary files (such as those under the `__magic` directory).
1066+
This happens because deleting many small files in S3 is a slow and expensive operation, especially at scale.
1067+
In some cases, the cleanup phase alone can take several minutes or more — even after all data has already been written.
1068+
1069+
To reduce this overhead, Hadoop 3.4.1+ introduced a configuration option in
1070+
[HADOOP-18568](https://issues.apache.org/jira/browse/HADOOP-18568) that allows users to disable this automatic cleanup
1071+
and use lifecycle policies instead to clean up the temporary files.
1072+
#### Configuration
1073+
```xml
1074+
<property>
1075+
<name>fs.s3a.committer.magic.cleanup.enabled</name>
1076+
<value>false</value>
1077+
</property>
1078+
```

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,34 @@ public void testCommittersPathsHaveUUID() throws Throwable {
213213
.contains(ta0);
214214
}
215215

216+
/**
217+
* Verify that the magic committer cleanup
218+
*/
219+
@Test
220+
public void testCommitterCleanup() throws Throwable {
221+
describe("Committer cleanup enabled. hence it should delete the task attempt path after commit");
222+
JobData jobData = startJob(true);
223+
JobContext jContext = jobData.getJContext();
224+
TaskAttemptContext tContext = jobData.getTContext();
225+
AbstractS3ACommitter committer = jobData.getCommitter();
226+
227+
commit(committer, jContext, tContext);
228+
assertJobAttemptPathDoesNotExist(committer, jContext);
229+
230+
describe("Committer cleanup is disabled. hence it should not delete the task attempt path after commit");
231+
JobData jobData2 = startJob(true);
232+
JobContext jContext2 = jobData2.getJContext();
233+
TaskAttemptContext tContext2 = jobData2.getTContext();
234+
AbstractS3ACommitter committer2 = jobData2.getCommitter();
235+
236+
committer2.getConf().setBoolean(FS_S3A_COMMITTER_MAGIC_CLEANUP_ENABLED, false);
237+
238+
239+
commit(committer2, jContext2, tContext2);
240+
assertJobAttemptPathExists(committer2, jContext2);
241+
}
242+
243+
216244
/**
217245
* The class provides a overridden implementation of commitJobInternal which
218246
* causes the commit failed for the first time then succeed.

0 commit comments

Comments
 (0)