Skip to content

Commit

Permalink
split out adding env obj to multipart msg
Browse files Browse the repository at this point in the history
  • Loading branch information
mschubert committed Jan 10, 2025
1 parent 355906d commit 19db820
Showing 1 changed file with 12 additions and 13 deletions.
25 changes: 12 additions & 13 deletions src/CMQMaster.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,25 +120,17 @@ class CMQMaster {
mp.push_back(r2msg(cmd));

if (w.via.empty()) {
for (auto &str : new_env) {
w.env.insert(str);
mp.push_back(zmq::message_t(str));
mp.push_back(zmq::message_t(env[str].data(), env[str].size(), [](void*, void*){}));
}
for (auto &str : new_env)
multipart_add_obj(mp, str, w.env);
} else {
std::vector<std::string> proxy_add_env;
auto &via_env = peers[w.via].env;
for (auto &str : new_env) {
w.env.insert(str);
if (via_env.find(str) == via_env.end()) {
// std::cout << "+from_master " << str << "\n";
via_env.insert(str);
mp.push_back(zmq::message_t(str));
mp.push_back(zmq::message_t(env[str].data(), env[str].size(), [](void*, void*){}));
} else {
// std::cout << "+from_proxy " << str << "\n";
if (via_env.find(str) == via_env.end())
multipart_add_obj(mp, str, via_env);
else
proxy_add_env.push_back(str);
}
}
mp.push_back(r2msg(Rcpp::wrap(proxy_add_env)));
}
Expand Down Expand Up @@ -287,6 +279,13 @@ class CMQMaster {
return mp;
}

void multipart_add_obj(zmq::multipart_t &mp, std::string str, std::set<std::string> &tracker) {
auto &obj = env[str];
tracker.insert(str);
mp.push_back(zmq::message_t(str));
mp.push_back(zmq::message_t(obj.data(), obj.size(), [](void*, void*){}));
}

int poll(int timeout=-1) {
auto pitems = std::vector<zmq::pollitem_t>(1);
pitems[0].socket = sock;
Expand Down

0 comments on commit 19db820

Please sign in to comment.