Skip to content

Commit 700015d

Browse files
authored
[18/N][VirtualCluster] Handle job finished event in gcs virtual cluster manager (#436)
* [18/N][VirtualCluster] Handle job finished event in gcs virtual cluster manager Signed-off-by: sule <[email protected]> * Better log style Signed-off-by: sule <[email protected]> --------- Signed-off-by: sule <[email protected]>
1 parent 18f085f commit 700015d

File tree

9 files changed

+77
-8
lines changed

9 files changed

+77
-8
lines changed

src/ray/core_worker/core_worker.cc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -924,8 +924,13 @@ void CoreWorker::ConnectToRayletInternal() {
924924
// NOTE: This also marks the worker as available in Raylet. We do this at the
925925
// very end in case there is a problem during construction.
926926
if (options_.worker_type == WorkerType::DRIVER) {
927+
// Get virtual cluster id from worker env to put it into job table data
928+
std::string virtual_cluster_id = std::getenv(kEnvVarKeyVirtualClusterID)
929+
? std::getenv(kEnvVarKeyVirtualClusterID)
930+
: "";
931+
927932
Status status = local_raylet_client_->AnnounceWorkerPortForDriver(
928-
core_worker_server_->GetPort(), options_.entrypoint);
933+
core_worker_server_->GetPort(), options_.entrypoint, virtual_cluster_id);
929934
RAY_CHECK(status.ok()) << "Failed to announce driver's port to raylet and GCS: "
930935
<< status;
931936
} else {

src/ray/gcs/gcs_server/gcs_server.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,7 @@ void GcsServer::InstallEventListeners() {
774774
const auto job_id = JobID::FromBinary(job_data.job_id());
775775
gcs_task_manager_->OnJobFinished(job_id, job_data.end_time());
776776
gcs_placement_group_manager_->CleanPlacementGroupIfNeededWhenJobDead(job_id);
777+
gcs_virtual_cluster_manager_->OnJobFinished(job_data);
777778
});
778779

779780
// Install scheduling event listeners.

src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,56 @@ void GcsVirtualClusterManager::OnNodeDead(const rpc::GcsNodeInfo &node) {
3131
primary_cluster_->OnNodeDead(node);
3232
}
3333

34+
void GcsVirtualClusterManager::OnJobFinished(const rpc::JobTableData &job_data) {
35+
// exit early when job has no virtual cluster id
36+
const auto &virtual_cluster_id = job_data.virtual_cluster_id();
37+
if (virtual_cluster_id.empty()) {
38+
return;
39+
}
40+
41+
auto job_cluster_id = VirtualClusterID::FromBinary(virtual_cluster_id);
42+
43+
if (!job_cluster_id.IsJobClusterID()) {
44+
// exit early when this job is submitted in a mixed cluster
45+
return;
46+
}
47+
48+
std::string exclusive_cluster_id = job_cluster_id.ParentID().Binary();
49+
50+
auto virtual_cluster = GetVirtualCluster(exclusive_cluster_id);
51+
if (virtual_cluster == nullptr) {
52+
RAY_LOG(WARNING) << "Failed to remove job cluster " << job_cluster_id.Binary()
53+
<< " when handling job finished event, parent cluster not exists.";
54+
return;
55+
}
56+
57+
if (virtual_cluster->GetMode() != rpc::AllocationMode::EXCLUSIVE) {
58+
// this should not happen, virtual cluster should be exclusive
59+
return;
60+
}
61+
62+
ExclusiveCluster *exclusive_cluster =
63+
dynamic_cast<ExclusiveCluster *>(virtual_cluster.get());
64+
65+
auto status = exclusive_cluster->RemoveJobCluster(
66+
virtual_cluster_id,
67+
[this, job_cluster_id](const Status &status,
68+
std::shared_ptr<rpc::VirtualClusterTableData> data) {
69+
if (!status.ok() || !data->is_removed()) {
70+
RAY_LOG(WARNING) << "Failed to remove job cluster " << job_cluster_id.Binary()
71+
<< " when handling job finished event. status: "
72+
<< status.message();
73+
} else {
74+
RAY_LOG(INFO) << "Successfully removed job cluster " << job_cluster_id.Binary()
75+
<< " after handling job finished event.";
76+
}
77+
});
78+
if (!status.ok()) {
79+
RAY_LOG(WARNING) << "Failed to remove job cluster " << job_cluster_id.Binary()
80+
<< " when handling job finished event. status: " << status.message();
81+
}
82+
}
83+
3484
std::shared_ptr<VirtualCluster> GcsVirtualClusterManager::GetVirtualCluster(
3585
const std::string &virtual_cluster_id) {
3686
if (virtual_cluster_id.empty()) {
@@ -150,8 +200,7 @@ void GcsVirtualClusterManager::HandleCreateJobCluster(
150200
ReplicaSets replica_sets(request.replica_sets().begin(), request.replica_sets().end());
151201

152202
auto exclusive_cluster = dynamic_cast<ExclusiveCluster *>(virtual_cluster.get());
153-
const std::string &job_cluster_id =
154-
exclusive_cluster->BuildJobClusterID(request.job_id());
203+
std::string job_cluster_id = exclusive_cluster->BuildJobClusterID(request.job_id());
155204

156205
exclusive_cluster->CreateJobCluster(
157206
job_cluster_id,

src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ class GcsVirtualClusterManager : public rpc::VirtualClusterInfoHandler {
5151
/// \param node The node that is dead.
5252
void OnNodeDead(const rpc::GcsNodeInfo &node);
5353

54+
/// Handle the job finished event.
55+
///
56+
/// \param job_data The job that is finished.
57+
void OnJobFinished(const rpc::JobTableData &job_data);
58+
5459
/// Get virtual cluster by virtual cluster id
5560
///
5661
/// \param virtual_cluster_id The id of virtual cluster

src/ray/protobuf/gcs.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -706,6 +706,8 @@ message JobTableData {
706706
optional bool is_running_tasks = 11;
707707
// Address of the driver that started this job.
708708
Address driver_address = 12;
709+
// The virtual cluster this job belongs to.
710+
string virtual_cluster_id = 13;
709711
}
710712
///////////////////////////////////////////////////////////////////////////////
711713

src/ray/raylet/format/node_manager.fbs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ table AnnounceWorkerPort {
149149
port: int;
150150
// The entrypoint of the job. Only populated if the worker is a driver.
151151
entrypoint: string;
152+
// The virtual cluster this job belongs to.
153+
virtual_cluster_id: string;
152154
}
153155

154156
table AnnounceWorkerPortReply {

src/ray/raylet/node_manager.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1451,6 +1451,9 @@ void NodeManager::ProcessAnnounceWorkerPortMessage(
14511451
string_from_flatbuf(*message->entrypoint()),
14521452
*job_config);
14531453

1454+
job_data_ptr->set_virtual_cluster_id(
1455+
string_from_flatbuf(*message->virtual_cluster_id()));
1456+
14541457
RAY_CHECK_OK(
14551458
gcs_client_->Jobs().AsyncAdd(job_data_ptr, [this, client](Status status) {
14561459
if (!status.ok()) {

src/ray/raylet_client/raylet_client.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,11 +205,11 @@ Status raylet::RayletClient::AnnounceWorkerPortForWorker(int port) {
205205
return conn_->WriteMessage(MessageType::AnnounceWorkerPort, &fbb);
206206
}
207207

208-
Status raylet::RayletClient::AnnounceWorkerPortForDriver(int port,
209-
const std::string &entrypoint) {
208+
Status raylet::RayletClient::AnnounceWorkerPortForDriver(
209+
int port, const std::string &entrypoint, const std::string &virtual_cluster_id) {
210210
flatbuffers::FlatBufferBuilder fbb;
211-
auto message =
212-
protocol::CreateAnnounceWorkerPort(fbb, port, fbb.CreateString(entrypoint));
211+
auto message = protocol::CreateAnnounceWorkerPort(
212+
fbb, port, fbb.CreateString(entrypoint), fbb.CreateString(virtual_cluster_id));
213213
fbb.Finish(message);
214214
std::vector<uint8_t> reply;
215215
RAY_RETURN_NOT_OK(conn_->AtomicRequestReply(MessageType::AnnounceWorkerPort,

src/ray/raylet_client/raylet_client.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,9 @@ class RayletClient : public RayletClientInterface {
358358
/// \param port The port.
359359
/// \param entrypoint The entrypoint of the driver's job.
360360
/// \return ray::Status.
361-
Status AnnounceWorkerPortForDriver(int port, const std::string &entrypoint);
361+
Status AnnounceWorkerPortForDriver(int port,
362+
const std::string &entrypoint,
363+
const std::string &virtual_cluster_id);
362364

363365
/// Tell the raylet that the client has finished executing a task.
364366
///

0 commit comments

Comments
 (0)