From 35f6d263a5aace30cf265e17d2b3139bd96e40ea Mon Sep 17 00:00:00 2001 From: Robert Underwood Date: Thu, 6 Aug 2020 10:44:51 -0400 Subject: [PATCH] libdistributed version 0.0.10 Major Changes: + Added support for serializing allocated pointers + Use std::move where appropriate to avoid copying large types + Exposed more functionality from the underlying library to better encapsulate the underlying parallel framework Minor Changes + Improved documentation completeness Bug Fixes: + Fixed invalid displacements in std::pair's serializer + Include missing includes for some platforms --- CMakeLists.txt | 2 +- include/libdistributed_comm.h | 64 +++++++++++++++++++- include/libdistributed_task_manager.h | 6 ++ include/libdistributed_work_queue.h | 14 ++++- include/libdistributed_work_queue_impl.h | 66 ++++++++++++++++----- include/libdistributed_work_queue_options.h | 21 ++++++- 6 files changed, 152 insertions(+), 21 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 09e3d83..6862b09 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,5 @@ cmake_minimum_required(VERSION 3.12) -project(libdistributed VERSION "0.0.8" LANGUAGES CXX) +project(libdistributed VERSION "0.0.10" LANGUAGES CXX) #correct was to set a default build type # https://blog.kitware.com/cmake-and-the-default-build-type/ diff --git a/include/libdistributed_comm.h b/include/libdistributed_comm.h index 252ca29..6dfd98b 100644 --- a/include/libdistributed_comm.h +++ b/include/libdistributed_comm.h @@ -1,3 +1,8 @@ +#ifndef LIBDISTRIBUTED_COMM_H +#define LIBDISTRIBUTED_COMM_H + + + #include #include #include @@ -131,6 +136,60 @@ define_basic_type(float, MPI_FLOAT); define_basic_type(double, MPI_DOUBLE); define_basic_type(char, MPI_CHAR); +/** + * serializer for arrays of initialized pointers + */ +template +struct serializer +{ + /** is the type serializable using MPI_Datatypes for both the sender and + * receiver at compile time?*/ + using mpi_type = typename serializer::mpi_type; + /** \returns a MPI_Datatype to represent the type if mpi_type is true, else MPI_INT */ + static MPI_Datatype dtype() { + return serializer::dtype(); + } + /** \returns a string representing the name of the type */ + static std::string name() { + return serializer::name() + "*"; + } + + /** + * Sends a data type from one location to another + * \param[in] t the data to send + * \param[in] dest the MPI rank to send to + * \param[in] tag the MPI tag to send to + * \param[in] comm the MPI_Comm to send to + * \returns an error code from the underlying send */ + static int send(T* const& t, int dest, int tag, MPI_Comm comm) { + return serializer::send(*t, dest, tag, comm); + } + /** + Recv a data type from another location + \param[in] t the data to recv from + \param[in] source the MPI rank to recv from + \param[in] tag the MPI tag to recv from + \param[in] comm the MPI_Comm to recv from + \param[in] status the MPI_Status to recv from + \returns an error code from the underlying recv */ + static int recv(T*& t, int source, int tag, MPI_Comm comm, + MPI_Status* status) { + return serializer::recv(*t, source, tag, comm, status); + } + + /** + Broadcast a data type from another location + \param[in] t the data to broadcast from + \param[in] root the MPI rank to broadcast from + \param[in] comm the MPI_Comm to broadcast from + \returns an error code from the underlying MPI_Bcast(s) */ + static int bcast(T*& t, int root, MPI_Comm comm) { + return serializer::bcast(*t, root, comm); + } + + +}; + /** specialization of serializion for pair */ template struct serializer> @@ -151,8 +210,8 @@ struct serializer> std::pair exemplar; int blocklen[] = { 1, 1 }; MPI_Aint displacements[2]; - MPI_Get_address(exemplar.first, &displacements[0]); - MPI_Get_address(exemplar.second, &displacements[1]); + MPI_Get_address(&exemplar.first, &displacements[0]); + MPI_Get_address(&exemplar.second, &displacements[1]); MPI_Aint min_address = std::min(displacements[0], displacements[1]); displacements[0] -= min_address; displacements[1] -= min_address; @@ -964,3 +1023,4 @@ bcast(T& values, int root, MPI_Comm comm = MPI_COMM_WORLD) } // namespace comm } // namespace distributed +#endif /* end of include guard: LIBDISTRIBUTED_COMM_H */ diff --git a/include/libdistributed_task_manager.h b/include/libdistributed_task_manager.h index 4b9efb2..6db494d 100644 --- a/include/libdistributed_task_manager.h +++ b/include/libdistributed_task_manager.h @@ -1,5 +1,6 @@ #ifndef LIBDISTRIBUTED_STOP_TOKEN_H #define LIBDISTRIBUTED_STOP_TOKEN_H +#include /** * \file @@ -47,6 +48,11 @@ class TaskManager: public StopToken { * Request a sub-communicator for the current process group */ virtual CommunicatorType get_subcommunicator()=0; + + /** + * \returns the number of groups of worker processes available + */ + virtual size_t num_workers() const=0; }; } diff --git a/include/libdistributed_work_queue.h b/include/libdistributed_work_queue.h index c13b062..b8ae652 100644 --- a/include/libdistributed_work_queue.h +++ b/include/libdistributed_work_queue.h @@ -15,6 +15,18 @@ namespace distributed { namespace queue { + /** + * type trait to determine task type from an iterator type + * + */ + template + struct iterator_to_request_type { + /** + * the contained within the iterator + */ + using type = typename impl::iterator_to_value_type::type; + }; + /** * \param[in] comm the communicator to duplicate to use for communication * \param[in] tasks_begin an iterator to the beginning of the task list @@ -57,7 +69,7 @@ void work_queue ( using RequestType = typename impl::iterator_to_value_type::type; using ResponseType = decltype( impl::maybe_stop_token( worker_fn, - std::declval(), + std::move(std::declval()), std::declval&>() ) ); diff --git a/include/libdistributed_work_queue_impl.h b/include/libdistributed_work_queue_impl.h index 44a8856..99f2020 100644 --- a/include/libdistributed_work_queue_impl.h +++ b/include/libdistributed_work_queue_impl.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include "libdistributed_task_manager.h" @@ -12,6 +13,17 @@ namespace distributed { namespace queue { namespace impl { + +template +size_t count_unique(Container const& c) { + std::unordered_set seen; + return std::count_if( + std::begin(c), + std::end(c), + [&seen](typename Container::const_reference v) { + return seen.insert(v).second; + }); +} template struct iterator_to_value_type { @@ -49,6 +61,7 @@ class WorkerTaskManager : public TaskManager , stop_request() , flag(0) , ROOT(options.get_root()) + , num_workers_v(count_unique(options.get_groups()) - 1) { MPI_Ibcast(&done, 1, MPI_INT, ROOT, queue_comm, &stop_request); } @@ -86,12 +99,17 @@ class WorkerTaskManager : public TaskManager return subcomm; } + size_t num_workers() const override { + return num_workers_v; + } + private: MPI_Comm queue_comm, subcomm; MPI_Request stop_request; int flag; int done;//used by MPI_Ibcast for syncronization, do not read unless flag==true const int ROOT; + size_t num_workers_v; }; template @@ -99,12 +117,13 @@ class MasterTaskManager : public TaskManager { public: template - MasterTaskManager(MPI_Comm comm, MPI_Comm subcomm, TaskIt begin, TaskIt end, work_queue_options const& options) + MasterTaskManager(MPI_Comm comm, MPI_Comm subcomm, TaskIt begin, TaskIt end, work_queue_options const& options, size_t num_workers_v) : TaskManager(), comm(comm), subcomm(subcomm), is_stop_requested(0), - ROOT(options.get_root()) + ROOT(options.get_root()), + num_workers_v(count_unique(options.get_groups())-1) { while(begin != end) { requests.emplace(*begin); @@ -150,6 +169,10 @@ class MasterTaskManager : public TaskManager return subcomm; } + size_t num_workers() const override{ + return num_workers_v; + } + void recv_tasks() { int num_has_tasks = 0; int stop_requested_flag = 0; @@ -173,6 +196,7 @@ class MasterTaskManager : public TaskManager int is_stop_requested; std::queue requests; const int ROOT; + size_t num_workers_v; }; template @@ -194,7 +218,7 @@ void master_main(MPI_Comm subcom, TaskForwardIt tasks_begin, TaskForwardIt tasks //create task queue - MasterTaskManager task_manager(comm, subcom, tasks_begin, tasks_end, options); + MasterTaskManager task_manager(comm, subcom, tasks_begin, tasks_end, options, workers.size()); int outstanding = 0; while((!task_manager.empty() and !task_manager.stop_requested()) or outstanding > 0) { @@ -224,7 +248,7 @@ void master_main(MPI_Comm subcom, TaskForwardIt tasks_begin, TaskForwardIt tasks int not_done = false; comm::bcast(not_done, 0, subcom); comm::bcast(response, 0, subcom); - maybe_stop_token(master_fn, response, task_manager); + maybe_stop_token(master_fn, std::move(response), task_manager); task_manager.recv_tasks(); workers.push(response_status.MPI_SOURCE); outstanding--; @@ -267,7 +291,8 @@ class MasterAuxTaskManager : public TaskManager stop_request(), flag(0), request_done(0), - ROOT(options.get_root()) + ROOT(options.get_root()), + num_workers_v(count_unique(options.get_groups()) -1) { MPI_Ibcast(&done, 1, MPI_INT, ROOT, queue_comm, &stop_request); } @@ -301,25 +326,31 @@ class MasterAuxTaskManager : public TaskManager MPI_Comm get_subcommunicator() override { return subcomm; } + + size_t num_workers() const override { + return num_workers_v; + } + private: MPI_Comm queue_comm, subcomm; MPI_Request stop_request; std::vector requests; int flag, done, request_done; const int ROOT; + size_t num_workers_v; }; template void master_aux(MPI_Comm subcomm, Function master_fn, work_queue_options const& options) { MasterAuxTaskManager task_manager(subcomm, options); - ResponseType response; int master_done = false; while(!master_done) { + ResponseType response; comm::bcast(master_done, 0, subcomm); if(!master_done) { comm::bcast(response, 0, subcomm); - maybe_stop_token(master_fn, response, task_manager); + maybe_stop_token(master_fn, std::move(response), task_manager); task_manager.send_tasks(); } } @@ -334,7 +365,7 @@ template struct takes_stop_token< Function, Message, RequestType, std::void_t()( - std::declval(), std::declval&>()))>> : std::true_type + std::move(std::declval()), std::declval&>()))>> : std::true_type {}; template @@ -354,9 +385,9 @@ struct maybe_stop_token_impl -auto maybe_stop_token(Function f, Message m, TaskManager& s) +auto maybe_stop_token(Function f, Message&& m, TaskManager& s) { - return maybe_stop_token_impl::call(f, m, s); + return maybe_stop_token_impl::call(f, std::forward(m), s); } template @@ -380,7 +411,7 @@ void worker_main(MPI_Comm subcomm, Function worker_fn, work_queue_options void worker_aux(MPI_Comm subcomm, Function worker_fn, work_queue_options const& options) { - RequestType request; - ResponseType response; WorkerTaskManager task_manager(subcomm, options); int worker_done = false; while(!worker_done) { comm::bcast(worker_done, 0, subcomm); if(!worker_done) { + RequestType request; comm::bcast(request, 0, subcomm); - maybe_stop_token(worker_fn, request, task_manager); + maybe_stop_token(worker_fn, std::move(request), task_manager); } } @@ -455,6 +485,10 @@ class NoWorkersTaskManager: public TaskManager { return MPI_COMM_SELF; } + size_t num_workers() const override { + return 1; + } + private: bool is_stop_requested = false; std::queue requests{}; @@ -465,10 +499,10 @@ void no_workers(TaskForwardIt tasks_begin, TaskForwardIt tasks_end, MasterFn mas NoWorkersTaskManager task_manager(tasks_begin, tasks_end); while(!task_manager.empty() && !task_manager.stop_requested()) { - RequestType task = task_manager.front(); + RequestType task = std::move(task_manager.front()); task_manager.pop(); - auto response = maybe_stop_token(worker_fn, task, task_manager); + auto response = maybe_stop_token(worker_fn, std::move(task), task_manager); maybe_stop_token(master_fn, response, task_manager); } } diff --git a/include/libdistributed_work_queue_options.h b/include/libdistributed_work_queue_options.h index 8d9707d..bbd0f6e 100644 --- a/include/libdistributed_work_queue_options.h +++ b/include/libdistributed_work_queue_options.h @@ -14,6 +14,10 @@ namespace distributed { namespace queue { + +/** + * Options for the work_queue + */ template class work_queue_options { public: @@ -28,13 +32,16 @@ class work_queue_options { /** * Construct a work queue options for a given communicator * - * \param[in] queue_comm the communicator to use for the queue + * \param[in] comm the communicator to use for the queue */ work_queue_options(MPI_Comm comm) { MPI_Comm_dup(comm, &queue_comm); } + /** + * deallocate used the communicator used for the queue on destruction + */ ~work_queue_options() { if(queue_comm != MPI_COMM_NULL) { MPI_Comm_free(&queue_comm); @@ -43,11 +50,20 @@ class work_queue_options { work_queue_options(work_queue_options&)=delete; work_queue_options& operator=(work_queue_options&)=delete; + + /** + * allow move construction + * \param rhs the object to move from + */ work_queue_options(work_queue_options&& rhs) noexcept: queue_comm(rhs.queue_comm) { rhs.queue_comm = MPI_COMM_NULL; } + /** + * allow move assignment + * \param rhs the object to move from + */ work_queue_options& operator=(work_queue_options&& rhs) noexcept { queue_comm = rhs.queue_comm; rhs.queue_comm = MPI_COMM_NULL; @@ -71,6 +87,9 @@ class work_queue_options { return size; } + /** + * \returns if the rank is a master rank + */ bool is_master() const { const auto groups = get_groups(); const auto rank = get_queue_rank();