Skip to content

Commit dea04ac

Browse files
committed
refactor the shuffle status
1 parent 4eee9a8 commit dea04ac

File tree

10 files changed

+306
-104
lines changed

10 files changed

+306
-104
lines changed

client-spark/common/src/main/java/org/apache/spark/shuffle/RssStageResubmitManager.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,68 @@
1919

2020
import java.util.Map;
2121
import java.util.Set;
22+
import java.util.concurrent.ConcurrentHashMap;
2223

2324
import com.google.common.collect.Sets;
25+
import org.apache.spark.SparkConf;
26+
import org.apache.spark.shuffle.stage.RssShuffleStatus;
27+
import org.apache.spark.shuffle.stage.RssShuffleStatusForReader;
28+
import org.apache.spark.shuffle.stage.RssShuffleStatusForWriter;
2429
import org.slf4j.Logger;
2530
import org.slf4j.LoggerFactory;
2631

2732
import org.apache.uniffle.common.util.JavaUtils;
2833

2934
public class RssStageResubmitManager {
30-
3135
private static final Logger LOG = LoggerFactory.getLogger(RssStageResubmitManager.class);
36+
37+
private final SparkConf sparkConf = new SparkConf();
38+
private final Map<Integer, RssShuffleStatusForReader> shuffleStatusForReader =
39+
new ConcurrentHashMap<>();
40+
private final Map<Integer, RssShuffleStatusForWriter> shuffleStatusForWriter =
41+
new ConcurrentHashMap<>();
42+
43+
public void clear(int shuffleId) {
44+
shuffleStatusForReader.remove(shuffleId);
45+
shuffleStatusForWriter.remove(shuffleId);
46+
}
47+
48+
public RssShuffleStatus getShuffleStatusForReader(int shuffleId, int stageId, int stageAttempt) {
49+
RssShuffleStatus shuffleStatus =
50+
shuffleStatusForReader.computeIfAbsent(
51+
shuffleId, x -> new RssShuffleStatusForReader(stageId, shuffleId));
52+
if (shuffleStatus.updateStageAttemptIfNecessary(stageAttempt)) {
53+
return shuffleStatus;
54+
}
55+
return null;
56+
}
57+
58+
public RssShuffleStatus getShuffleStatusForWriter(int shuffleId, int stageId, int stageAttempt) {
59+
RssShuffleStatus shuffleStatus =
60+
shuffleStatusForWriter.computeIfAbsent(
61+
shuffleId, x -> new RssShuffleStatusForWriter(stageId, shuffleId));
62+
if (shuffleStatus.updateStageAttemptIfNecessary(stageAttempt)) {
63+
return shuffleStatus;
64+
}
65+
return null;
66+
}
67+
68+
public boolean triggerStageRetry(RssShuffleStatus shuffleStatus) {
69+
final String TASK_MAX_FAILURE = "spark.task.maxFailures";
70+
int sparkTaskMaxFailures = sparkConf.getInt(TASK_MAX_FAILURE, 4);
71+
if (shuffleStatus instanceof RssShuffleStatusForReader) {
72+
if (shuffleStatus.getStageRetriedNumber() > 1) {
73+
LOG.warn("The shuffleId:{}, stageId:{} has been retried. Ignore it.");
74+
return false;
75+
}
76+
if (shuffleStatus.getTaskFailureAttemptCount() >= sparkTaskMaxFailures) {
77+
shuffleStatus.markStageAttemptRetried();
78+
return true;
79+
}
80+
}
81+
return false;
82+
}
83+
3284
/** Blacklist of the Shuffle Server when the write fails. */
3385
private Set<String> serverIdBlackList;
3486
/**
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.stage;
19+
20+
import java.util.HashSet;
21+
import java.util.Set;
22+
import java.util.concurrent.locks.ReentrantReadWriteLock;
23+
import java.util.function.Supplier;
24+
25+
/**
26+
* This class is to track the stage attempt status to check whether to trigger the stage retry of
27+
* Spark.
28+
*/
29+
public class RssShuffleStatus {
30+
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
31+
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
32+
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
33+
private final int stageId;
34+
private final int shuffleId;
35+
// the retried stage attempt records
36+
private final Set<Integer> stageAttemptRetriedRecords;
37+
38+
private int stageAttemptNumber;
39+
// the failed task attempt numbers. Attention: these are not task attempt ids!
40+
private Set<Integer> taskAttemptFailureRecords;
41+
42+
public RssShuffleStatus(int stageId, int shuffleId) {
43+
this.shuffleId = shuffleId;
44+
this.stageId = stageId;
45+
this.stageAttemptRetriedRecords = new HashSet<>();
46+
this.taskAttemptFailureRecords = new HashSet<>();
47+
}
48+
49+
private <T> T withReadLock(Supplier<T> fn) {
50+
readLock.lock();
51+
try {
52+
return fn.get();
53+
} finally {
54+
readLock.unlock();
55+
}
56+
}
57+
58+
private <T> T withWriteLock(Supplier<T> fn) {
59+
writeLock.lock();
60+
try {
61+
return fn.get();
62+
} finally {
63+
writeLock.unlock();
64+
}
65+
}
66+
67+
public int getStageRetriedNumber() {
68+
return withReadLock(() -> this.stageAttemptRetriedRecords.size());
69+
}
70+
71+
public void markStageAttemptRetried() {
72+
withWriteLock(
73+
() -> {
74+
this.stageAttemptRetriedRecords.add(stageAttemptNumber);
75+
return null;
76+
});
77+
}
78+
79+
public int getStageAttempt() {
80+
return withReadLock(() -> this.stageAttemptNumber);
81+
}
82+
83+
public boolean updateStageAttemptIfNecessary(int stageAttempt) {
84+
return withWriteLock(
85+
() -> {
86+
if (this.stageAttemptNumber < stageAttempt) {
87+
// a new stage attempt is issued.
88+
this.stageAttemptNumber = stageAttempt;
89+
this.taskAttemptFailureRecords = new HashSet<>();
90+
return true;
91+
} else if (this.stageAttemptNumber > stageAttempt) {
92+
return false;
93+
}
94+
return true;
95+
});
96+
}
97+
98+
public void incTaskFailure(int taskAttemptNumber) {
99+
withWriteLock(
100+
() -> {
101+
taskAttemptFailureRecords.add(taskAttemptNumber);
102+
return null;
103+
});
104+
}
105+
106+
public int getTaskFailureAttemptCount() {
107+
return withReadLock(() -> taskAttemptFailureRecords.size());
108+
}
109+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.stage;
19+
20+
public class RssShuffleStatusForReader extends RssShuffleStatus {
21+
public RssShuffleStatusForReader(int stageId, int shuffleId) {
22+
super(stageId, shuffleId);
23+
}
24+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.stage;
19+
20+
public class RssShuffleStatusForWriter extends RssShuffleStatus {
21+
public RssShuffleStatusForWriter(int stageId, int shuffleId) {
22+
super(stageId, shuffleId);
23+
}
24+
}

client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -987,4 +987,8 @@ protected RemoteStorageInfo getRemoteStorageInfo() {
987987
public boolean isRssResubmitStage() {
988988
return rssResubmitStage;
989989
}
990+
991+
public RssStageResubmitManager getStageResubmitManager() {
992+
return rssStageResubmitManager;
993+
}
990994
}

client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerInterface.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Map;
2222

2323
import org.apache.spark.SparkException;
24+
import org.apache.spark.shuffle.RssStageResubmitManager;
2425
import org.apache.spark.shuffle.handle.MutableShuffleHandleInfo;
2526
import org.apache.spark.shuffle.handle.ShuffleHandleInfo;
2627

@@ -84,4 +85,6 @@ public interface RssShuffleManagerInterface {
8485

8586
MutableShuffleHandleInfo reassignOnBlockSendFailure(
8687
int shuffleId, Map<Integer, List<ReceivingFailureServer>> partitionToFailureServers);
88+
89+
RssStageResubmitManager getStageResubmitManager();
8790
}

0 commit comments

Comments
 (0)