Skip to content

Commit 00f2cd2

Browse files
committed
refactor helper method and put in-mem store in HandleTaskReturn
Signed-off-by: Sagar Sumit <[email protected]>
1 parent 5f10a92 commit 00f2cd2

File tree

1 file changed

+15
-4
lines changed

1 file changed

+15
-4
lines changed

src/ray/core_worker/task_manager.cc

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,16 @@ constexpr int64_t kTaskFailureThrottlingThreshold = 50;
3939
// Throttle task failure logs to once this interval.
4040
constexpr int64_t kTaskFailureLoggingFrequencyMillis = 5000;
4141

42-
static inline rpc::ErrorType MapStatusToErrorType(const Status &status) {
42+
namespace {
43+
44+
rpc::ErrorType MapPlasmaPutStatusToErrorType(const Status &status) {
45+
// Only the following should be returned from plasma put paths today.
46+
RAY_DCHECK(status.IsObjectStoreFull() || status.IsTransientObjectStoreFull() ||
47+
status.IsOutOfDisk() || status.IsIOError())
48+
<< "Unexpected status from plasma put: " << status;
49+
4350
if (status.IsObjectStoreFull() || status.IsTransientObjectStoreFull()) {
51+
// TODO(codope): introduce a dedicated OBJECT_STORE_FULL error type and map to it here.
4452
return rpc::ErrorType::OUT_OF_MEMORY;
4553
}
4654
if (status.IsOutOfDisk()) {
@@ -53,6 +61,8 @@ static inline rpc::ErrorType MapStatusToErrorType(const Status &status) {
5361
return rpc::ErrorType::WORKER_DIED;
5462
}
5563

64+
} // namespace
65+
5666
absl::flat_hash_set<ObjectID> ObjectRefStream::GetItemsUnconsumed() const {
5767
absl::flat_hash_set<ObjectID> result;
5868
for (int64_t index = 0; index <= max_index_seen_; index++) {
@@ -603,6 +613,7 @@ StatusOr<bool> TaskManager::HandleTaskReturn(const ObjectID &object_id,
603613
if (!s.ok()) {
604614
return s;
605615
}
616+
in_memory_store_.Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id);
606617
} else {
607618
in_memory_store_.Put(object, object_id);
608619
direct_return = true;
@@ -937,7 +948,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
937948
RAY_LOG(WARNING).WithField(object_id)
938949
<< "Failed to handle dynamic task return: " << direct_or.status();
939950
Status st = direct_or.status();
940-
rpc::ErrorType err_type = MapStatusToErrorType(st);
951+
rpc::ErrorType err_type = MapPlasmaPutStatusToErrorType(st);
941952
rpc::RayErrorInfo err_info;
942953
err_info.set_error_message(st.ToString());
943954
FailOrRetryPendingTask(task_id,
@@ -965,7 +976,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
965976
// If storing return in plasma failed, treat as system failure for this attempt.
966977
// Do not proceed with normal completion. Mark task failed immediately.
967978
Status st = direct_or.status();
968-
rpc::ErrorType err_type = MapStatusToErrorType(st);
979+
rpc::ErrorType err_type = MapPlasmaPutStatusToErrorType(st);
969980
rpc::RayErrorInfo err_info;
970981
err_info.set_error_message(st.ToString());
971982
FailOrRetryPendingTask(task_id,
@@ -1109,7 +1120,7 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
11091120
<< "Failed to handle generator return during app error propagation: "
11101121
<< res.status();
11111122
Status st = res.status();
1112-
rpc::ErrorType err_type = MapStatusToErrorType(st);
1123+
rpc::ErrorType err_type = MapPlasmaPutStatusToErrorType(st);
11131124
rpc::RayErrorInfo err_info;
11141125
err_info.set_error_message(st.ToString());
11151126
FailOrRetryPendingTask(spec.TaskId(),

0 commit comments

Comments
 (0)