From 19db820f7360b445d9853be2fc4191c9f7577b73 Mon Sep 17 00:00:00 2001 From: Michael Schubert Date: Fri, 10 Jan 2025 10:25:43 +0000 Subject: [PATCH] split out adding env obj to multipart msg --- src/CMQMaster.h | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/src/CMQMaster.h b/src/CMQMaster.h index e858f3f..210812a 100644 --- a/src/CMQMaster.h +++ b/src/CMQMaster.h @@ -120,25 +120,17 @@ class CMQMaster { mp.push_back(r2msg(cmd)); if (w.via.empty()) { - for (auto &str : new_env) { - w.env.insert(str); - mp.push_back(zmq::message_t(str)); - mp.push_back(zmq::message_t(env[str].data(), env[str].size(), [](void*, void*){})); - } + for (auto &str : new_env) + multipart_add_obj(mp, str, w.env); } else { std::vector proxy_add_env; auto &via_env = peers[w.via].env; for (auto &str : new_env) { w.env.insert(str); - if (via_env.find(str) == via_env.end()) { -// std::cout << "+from_master " << str << "\n"; - via_env.insert(str); - mp.push_back(zmq::message_t(str)); - mp.push_back(zmq::message_t(env[str].data(), env[str].size(), [](void*, void*){})); - } else { -// std::cout << "+from_proxy " << str << "\n"; + if (via_env.find(str) == via_env.end()) + multipart_add_obj(mp, str, via_env); + else proxy_add_env.push_back(str); - } } mp.push_back(r2msg(Rcpp::wrap(proxy_add_env))); } @@ -287,6 +279,13 @@ class CMQMaster { return mp; } + void multipart_add_obj(zmq::multipart_t &mp, std::string str, std::set &tracker) { + auto &obj = env[str]; + tracker.insert(str); + mp.push_back(zmq::message_t(str)); + mp.push_back(zmq::message_t(obj.data(), obj.size(), [](void*, void*){})); + } + int poll(int timeout=-1) { auto pitems = std::vector(1); pitems[0].socket = sock;