Skip to content

Commit f0f99ac

Browse files
committed
fix
1 parent 2ddac28 commit f0f99ac

File tree

5 files changed

+23
-9
lines changed

5 files changed

+23
-9
lines changed

client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public Future<Long> send(AddBlockEvent event) {
118118
.filter(x -> succeedBlockIds.contains(x.getBlockId()))
119119
.map(x -> x.getFreeMemory())
120120
.reduce((a, b) -> a + b)
121-
.get();
121+
.orElseGet(() -> 0L);
122122
});
123123
event.doPrepare(future);
124124
executorService.submit(future);

client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -360,8 +360,7 @@ private void checkSentBlockCount() {
360360
*
361361
* @param shuffleBlockInfoList
362362
*/
363-
private List<Future<Long>> processShuffleBlockInfos(
364-
List<ShuffleBlockInfo> shuffleBlockInfoList) {
363+
private List<Future<Long>> processShuffleBlockInfos(List<ShuffleBlockInfo> shuffleBlockInfoList) {
365364
if (shuffleBlockInfoList != null && !shuffleBlockInfoList.isEmpty()) {
366365
shuffleBlockInfoList.stream()
367366
.forEach(
@@ -389,8 +388,7 @@ private List<Future<Long>> processShuffleBlockInfos(
389388

390389
// don't send huge block to shuffle server, or there will be OOM if shuffle sever receives data
391390
// more than expected
392-
protected List<Future<Long>> postBlockEvent(
393-
List<ShuffleBlockInfo> shuffleBlockInfoList) {
391+
protected List<Future<Long>> postBlockEvent(List<ShuffleBlockInfo> shuffleBlockInfoList) {
394392
List<Future<Long>> futures = new ArrayList<>();
395393
for (AddBlockEvent event : bufferManager.buildBlockEvents(shuffleBlockInfoList)) {
396394
futures.add(shuffleManager.sendData(event));

client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,13 @@
2626
import java.util.List;
2727
import java.util.Map;
2828
import java.util.Set;
29+
import java.util.concurrent.CompletableFuture;
30+
import java.util.concurrent.ExecutionException;
2931
import java.util.concurrent.ExecutorService;
3032
import java.util.concurrent.Executors;
3133
import java.util.concurrent.Future;
3234
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.TimeoutException;
3336
import java.util.function.Function;
3437
import java.util.function.Supplier;
3538
import java.util.stream.Collectors;
@@ -474,6 +477,7 @@ protected void checkBlockSendResult(Set<Long> blockIds) {
474477

475478
while (!interrupted) {
476479
try {
480+
LOG.warn("checkBlockSendResult," + blockIds.size() + "," + inFlightEvent.size());
477481
checkDataIfAnyFailure();
478482
Set<Long> successBlockIds = shuffleManager.getSuccessBlockIds(taskId);
479483
blockIds.removeAll(successBlockIds);
@@ -492,9 +496,10 @@ protected void checkBlockSendResult(Set<Long> blockIds) {
492496
inFlightEvent.values().stream()
493497
.filter(f -> !f.isDone())
494498
.findAny()
495-
.get()
499+
.orElseGet(() -> CompletableFuture.completedFuture(0L))
496500
.get(remainingMs, TimeUnit.MILLISECONDS);
497501
} else {
502+
LOG.warn("blockSize:" + blockIds.size() + ",inflightEvent:" + inFlightEvent.size());
498503
// it seems never reach here, since `blockIds.isEmpty()` will break the loop first
499504
break;
500505
}
@@ -503,8 +508,9 @@ protected void checkBlockSendResult(Set<Long> blockIds) {
503508
interrupted = true;
504509
inFlightEvent.values().stream().forEach(f -> f.cancel(true));
505510
Thread.currentThread().interrupt();
506-
} catch (Exception e) {
507-
LOG.error("Exception happened when check block send result", e);
511+
} catch (ExecutionException | TimeoutException e) {
512+
LOG.error("check err", e);
513+
break;
508514
}
509515
}
510516
if (!blockIds.isEmpty()) {

client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ private boolean sendShuffleDataAsync(
249249
}
250250
return true;
251251
});
252+
dataTransferPool.submit(future);
252253
futures.add(future);
253254
}
254255

client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,19 @@
2424
import java.util.Set;
2525
import java.util.concurrent.Future;
2626
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.TimeoutException;
2728
import java.util.stream.Collectors;
2829

30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
2933
import org.apache.uniffle.client.api.ShuffleWriteClient;
3034
import org.apache.uniffle.common.ClientType;
3135
import org.apache.uniffle.common.RemoteStorageInfo;
3236
import org.apache.uniffle.storage.util.StorageType;
3337

3438
public class ClientUtils {
39+
private static final Logger LOG = LoggerFactory.getLogger(ClientUtils.class);
3540

3641
public static RemoteStorageInfo fetchRemoteStorage(
3742
String appId,
@@ -65,11 +70,12 @@ public static boolean waitUntilDoneOrFail(List<Future<Boolean>> list, boolean al
6570
while (iterator.hasNext()) {
6671
Future<Boolean> future = iterator.next();
6772
if (future.isDone()) {
68-
finished++;
6973
iterator.remove();
7074
try {
7175
if (!future.get()) {
7276
failed++;
77+
} else {
78+
finished++;
7379
}
7480
} catch (Exception e) {
7581
// cancel or execution exception
@@ -92,7 +98,10 @@ public static boolean waitUntilDoneOrFail(List<Future<Boolean>> list, boolean al
9298
futures.forEach(x -> x.cancel(true));
9399
Thread.currentThread().interrupt();
94100
return false;
101+
} catch (TimeoutException e) {
102+
// ignore
95103
} catch (Exception e) {
104+
LOG.warn("Exception in waitUntilDoneOrFail", e);
96105
// ignore timeout or execution err
97106
}
98107
}

0 commit comments

Comments
 (0)