Skip to content

Commit f732fc2

Browse files
authored
[#493][FOLLOWUP] improvement: replace putIfAbsent to avoid performance loss (#2444)
### What changes were proposed in this pull request? Replace putIfAbsent to avoid performance loss ### Why are the changes needed? Fix: #493 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI
1 parent 6843c06 commit f732fc2

File tree

8 files changed

+51
-39
lines changed

8 files changed

+51
-39
lines changed

client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ public <K, V, C> ShuffleHandle registerShuffle(
115115

116116
if (dependency.partitioner().numPartitions() == 0) {
117117
shuffleIdToPartitionNum.putIfAbsent(shuffleId, 0);
118-
shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length);
118+
shuffleIdToNumMapTasks.computeIfAbsent(
119+
shuffleId, key -> dependency.rdd().partitions().length);
119120
LOG.info(
120121
"RegisterShuffle with ShuffleId["
121122
+ shuffleId
@@ -158,8 +159,9 @@ public <K, V, C> ShuffleHandle registerShuffle(
158159

159160
startHeartbeat();
160161

161-
shuffleIdToPartitionNum.putIfAbsent(shuffleId, dependency.partitioner().numPartitions());
162-
shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length);
162+
shuffleIdToPartitionNum.computeIfAbsent(
163+
shuffleId, key -> dependency.partitioner().numPartitions());
164+
shuffleIdToNumMapTasks.computeIfAbsent(shuffleId, key -> dependency.rdd().partitions().length);
163165
if (shuffleManagerRpcServiceEnabled && rssStageRetryForWriteFailureEnabled) {
164166
ShuffleHandleInfo handleInfo =
165167
new MutableShuffleHandleInfo(shuffleId, partitionToServers, remoteStorage);

client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ public <K, V, C> ShuffleHandle registerShuffle(
135135

136136
if (dependency.partitioner().numPartitions() == 0) {
137137
shuffleIdToPartitionNum.putIfAbsent(shuffleId, 0);
138-
shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length);
138+
shuffleIdToNumMapTasks.computeIfAbsent(
139+
shuffleId, key -> dependency.rdd().partitions().length);
139140
LOG.info(
140141
"RegisterShuffle with ShuffleId["
141142
+ shuffleId
@@ -175,8 +176,9 @@ public <K, V, C> ShuffleHandle registerShuffle(
175176
rssStageResubmitManager.getServerIdBlackList(),
176177
0);
177178
startHeartbeat();
178-
shuffleIdToPartitionNum.putIfAbsent(shuffleId, dependency.partitioner().numPartitions());
179-
shuffleIdToNumMapTasks.putIfAbsent(shuffleId, dependency.rdd().partitions().length);
179+
shuffleIdToPartitionNum.computeIfAbsent(
180+
shuffleId, key -> dependency.partitioner().numPartitions());
181+
shuffleIdToNumMapTasks.computeIfAbsent(shuffleId, key -> dependency.rdd().partitions().length);
180182
if (shuffleManagerRpcServiceEnabled && rssStageRetryForWriteFailureEnabled) {
181183
ShuffleHandleInfo shuffleHandleInfo =
182184
new MutableShuffleHandleInfo(

client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -870,7 +870,7 @@ public void addKnownInput(
870870
pendingPartition.add(p);
871871
}
872872
allRssPartition.add(p);
873-
partitionToInput.putIfAbsent(p, new ArrayList<>());
873+
partitionToInput.computeIfAbsent(p, key -> new ArrayList<>());
874874
partitionToInput.get(p).add(srcAttemptIdentifier);
875875
LOG.info("Add partition:{}, after add, now partition:{}", p, allRssPartition);
876876
}

client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1299,7 +1299,7 @@ public synchronized void addKnownMapOutput(
12991299
partitionIdToSuccessMapTaskAttempts.get(partitionId).add(srcAttempt);
13001300
String pathComponent = srcAttempt.getPathComponent();
13011301
TezTaskAttemptID tezTaskAttemptId = IdUtils.convertTezTaskAttemptID(pathComponent);
1302-
partitionIdToSuccessTezTasks.putIfAbsent(partitionId, new HashSet<>());
1302+
partitionIdToSuccessTezTasks.computeIfAbsent(partitionId, key -> new HashSet<>());
13031303
partitionIdToSuccessTezTasks.get(partitionId).add(tezTaskAttemptId.getTaskID());
13041304

13051305
uniqueHosts.add(new HostPort(inputHostName, port));

coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ public synchronized void decRemoteStorageCounter(String storagePath) {
276276
}
277277
} else {
278278
LOG.warn("Can't find counter for remote storage: {}", storagePath);
279-
remoteStoragePathRankValue.putIfAbsent(storagePath, new RankValue(0));
279+
remoteStoragePathRankValue.computeIfAbsent(storagePath, key -> new RankValue(0));
280280
}
281281
if (remoteStoragePathRankValue.get(storagePath).getAppNum().get() == 0
282282
&& !availableRemoteStorageInfo.containsKey(storagePath)) {

server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121
import java.util.Comparator;
2222
import java.util.Map;
23+
import java.util.concurrent.atomic.AtomicReference;
2324

2425
import com.google.common.annotations.VisibleForTesting;
2526
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -69,8 +70,23 @@ public Shuffle(
6970

7071
public void startSortMerge(int partitionId, Roaring64NavigableMap expectedBlockIdMap)
7172
throws IOException {
72-
this.partitions.putIfAbsent(partitionId, new Partition<K, V>(this, partitionId));
73-
this.partitions.get(partitionId).startSortMerge(expectedBlockIdMap);
73+
AtomicReference<IOException> exception = new AtomicReference<>();
74+
Partition<K, V> partition =
75+
this.partitions.computeIfAbsent(
76+
partitionId,
77+
key -> {
78+
try {
79+
return new Partition<K, V>(this, partitionId);
80+
} catch (IOException e) {
81+
exception.set(e);
82+
}
83+
return null;
84+
});
85+
if (exception.get() != null) {
86+
throw exception.get();
87+
}
88+
assert partition != null;
89+
partition.startSortMerge(expectedBlockIdMap);
7490
}
7591

7692
void cleanup() {

server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -163,22 +163,23 @@ public StatusCode registerShuffle(String appId, int shuffleId, MergeContext merg
163163
} else {
164164
comparator = defaultComparator;
165165
}
166-
this.shuffles.putIfAbsent(appId, JavaUtils.newConcurrentMap());
166+
this.shuffles.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap());
167167
this.shuffles
168168
.get(appId)
169-
.putIfAbsent(
169+
.computeIfAbsent(
170170
shuffleId,
171-
new Shuffle(
172-
serverConf,
173-
eventHandler,
174-
shuffleServer,
175-
appId,
176-
shuffleId,
177-
kClass,
178-
vClass,
179-
comparator,
180-
mergeContext.getMergedBlockSize(),
181-
classLoader));
171+
key ->
172+
new Shuffle(
173+
serverConf,
174+
eventHandler,
175+
shuffleServer,
176+
appId,
177+
shuffleId,
178+
kClass,
179+
vClass,
180+
comparator,
181+
mergeContext.getMergedBlockSize(),
182+
classLoader));
182183
} catch (ClassNotFoundException
183184
| InstantiationException
184185
| IllegalAccessException

storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -93,22 +93,13 @@ public void setSize(long diskSize) {
9393
this.size.set(diskSize);
9494
}
9595

96-
/**
97-
* If the method is implemented as below:
98-
*
99-
* <p>if (shuffleMetaMap.contains(shuffleId)) { // `Time A` return shuffleMetaMap.get(shuffleId) }
100-
* else { shuffleMetaMap.putIfAbsent(shuffleId, newMeta) return newMeta }
101-
*
102-
* <p>Because if shuffleMetaMap remove shuffleId at `Time A` in another thread,
103-
* shuffleMetaMap.get(shuffleId) will return null. We need to guarantee that this method is thread
104-
* safe, and won't return null.
105-
*/
10696
public void createMetadataIfNotExist(String shuffleKey) {
107-
ShuffleMeta meta = new ShuffleMeta();
108-
ShuffleMeta oldMeta = shuffleMetaMap.putIfAbsent(shuffleKey, meta);
109-
if (oldMeta == null) {
110-
LOG.info("Create metadata of shuffle {}.", shuffleKey);
111-
}
97+
shuffleMetaMap.computeIfAbsent(
98+
shuffleKey,
99+
key -> {
100+
LOG.info("Create metadata of shuffle {}.", key);
101+
return new ShuffleMeta();
102+
});
112103
}
113104

114105
private ShuffleMeta getShuffleMeta(String shuffleKey) {

0 commit comments

Comments
 (0)