Skip to content

Commit 5bf6010

Browse files
committed
avoid removing cost too much times
1 parent 5c4c9e9 commit 5bf6010

File tree

2 files changed

+16
-15
lines changed

2 files changed

+16
-15
lines changed

storage/src/main/java/org/apache/uniffle/storage/common/AbstractStorage.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -91,21 +91,21 @@ public void removeHandlers(String appId) {
9191
@Override
9292
public void removeHandlers(String appId, Set<Integer> shuffleIds) {
9393
long start = System.currentTimeMillis();
94-
for (int shuffleId : shuffleIds) {
95-
String shuffleKeyPrefix = RssUtils.generateShuffleKeyWithSplitKey(appId, shuffleId);
96-
Map<String, ShuffleWriteHandler> writeHandlers = writerHandlers.get(appId);
97-
if (writeHandlers != null) {
98-
writeHandlers.keySet().stream().filter(x -> x.startsWith(shuffleKeyPrefix)).forEach(x -> writeHandlers.remove(x));
99-
}
100-
Map<String, ServerReadHandler> readHandlers = readerHandlers.get(appId);
101-
if (readHandlers != null) {
102-
readHandlers.keySet().stream().filter(x -> x.startsWith(shuffleKeyPrefix)).forEach(x -> writeHandlers.remove(x));
103-
}
104-
Map<String, CreateShuffleWriteHandlerRequest> requests = this.requests.get(appId);
105-
if (requests != null) {
106-
requests.keySet().stream().filter(x -> x.startsWith(shuffleKeyPrefix)).forEach(x -> writeHandlers.remove(x));
107-
}
108-
}
94+
// for (int shuffleId : shuffleIds) {
95+
// String shuffleKeyPrefix = RssUtils.generateShuffleKeyWithSplitKey(appId, shuffleId);
96+
// Map<String, ShuffleWriteHandler> writeHandlers = writerHandlers.get(appId);
97+
// if (writeHandlers != null) {
98+
// writeHandlers.keySet().stream().filter(x -> x.startsWith(shuffleKeyPrefix)).forEach(x -> writeHandlers.remove(x));
99+
// }
100+
// Map<String, ServerReadHandler> readHandlers = readerHandlers.get(appId);
101+
// if (readHandlers != null) {
102+
// readHandlers.keySet().stream().filter(x -> x.startsWith(shuffleKeyPrefix)).forEach(x -> readHandlers.remove(x));
103+
// }
104+
// Map<String, CreateShuffleWriteHandlerRequest> requests = this.requests.get(appId);
105+
// if (requests != null) {
106+
// requests.keySet().stream().filter(x -> x.startsWith(shuffleKeyPrefix)).forEach(x -> requests.remove(x));
107+
// }
108+
// }
109109
LOGGER.info("Removed the handlers for appId:{}, shuffleId:{} costs {} ms", appId, shuffleIds, System.currentTimeMillis() - start);
110110
}
111111

storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileWriteHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public synchronized void write(List<ShufflePartitionedBlock> shuffleBlocks) thro
8888
File baseFolder = new File(basePath);
8989
if (!baseFolder.exists()) {
9090
LOG.warn("{} don't exist, the app or shuffle may be deleted", baseFolder.getAbsolutePath());
91+
createBasePath();
9192
return;
9293
}
9394

0 commit comments

Comments
 (0)