Skip to content

Commit

Permalink
libdistributed version 0.0.10
Browse files Browse the repository at this point in the history
Major Changes:

+ Added support for serializing allocated pointers
+ Use std::move where appropriate to avoid copying large types
+ Exposed more functionality from the underlying library to better
  encapsulate the underlying parallel framework

Minor Changes

+ Improved documentation completeness

Bug Fixes:

+  Fixed invalid displacements in std::pair's serializer
+  Include missing includes for some platforms
  • Loading branch information
robertu94 committed Aug 6, 2020
1 parent 6803221 commit 35f6d26
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 21 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.12)
project(libdistributed VERSION "0.0.8" LANGUAGES CXX)
project(libdistributed VERSION "0.0.10" LANGUAGES CXX)

#correct was to set a default build type
# https://blog.kitware.com/cmake-and-the-default-build-type/
Expand Down
64 changes: 62 additions & 2 deletions include/libdistributed_comm.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
#ifndef LIBDISTRIBUTED_COMM_H
#define LIBDISTRIBUTED_COMM_H



#include <algorithm>
#include <array>
#include <cstddef>
Expand Down Expand Up @@ -131,6 +136,60 @@ define_basic_type(float, MPI_FLOAT);
define_basic_type(double, MPI_DOUBLE);
define_basic_type(char, MPI_CHAR);

/**
* serializer for arrays of initialized pointers
*/
template <class T>
struct serializer<T*>
{
/** is the type serializable using MPI_Datatypes for both the sender and
* receiver at compile time?*/
using mpi_type = typename serializer<T>::mpi_type;
/** \returns a MPI_Datatype to represent the type if mpi_type is true, else MPI_INT */
static MPI_Datatype dtype() {
return serializer<T>::dtype();
}
/** \returns a string representing the name of the type */
static std::string name() {
return serializer<T>::name() + "*";
}

/**
* Sends a data type from one location to another
* \param[in] t the data to send
* \param[in] dest the MPI rank to send to
* \param[in] tag the MPI tag to send to
* \param[in] comm the MPI_Comm to send to
* \returns an error code from the underlying send */
static int send(T* const& t, int dest, int tag, MPI_Comm comm) {
return serializer<T>::send(*t, dest, tag, comm);
}
/**
Recv a data type from another location
\param[in] t the data to recv from
\param[in] source the MPI rank to recv from
\param[in] tag the MPI tag to recv from
\param[in] comm the MPI_Comm to recv from
\param[in] status the MPI_Status to recv from
\returns an error code from the underlying recv */
static int recv(T*& t, int source, int tag, MPI_Comm comm,
MPI_Status* status) {
return serializer<T>::recv(*t, source, tag, comm, status);
}

/**
Broadcast a data type from another location
\param[in] t the data to broadcast from
\param[in] root the MPI rank to broadcast from
\param[in] comm the MPI_Comm to broadcast from
\returns an error code from the underlying MPI_Bcast(s) */
static int bcast(T*& t, int root, MPI_Comm comm) {
return serializer<T>::bcast(*t, root, comm);
}


};

/** specialization of serializion for pair */
template <class T, class V>
struct serializer<std::pair<T, V>>
Expand All @@ -151,8 +210,8 @@ struct serializer<std::pair<T, V>>
std::pair<T, V> exemplar;
int blocklen[] = { 1, 1 };
MPI_Aint displacements[2];
MPI_Get_address(exemplar.first, &displacements[0]);
MPI_Get_address(exemplar.second, &displacements[1]);
MPI_Get_address(&exemplar.first, &displacements[0]);
MPI_Get_address(&exemplar.second, &displacements[1]);
MPI_Aint min_address = std::min(displacements[0], displacements[1]);
displacements[0] -= min_address;
displacements[1] -= min_address;
Expand Down Expand Up @@ -964,3 +1023,4 @@ bcast(T& values, int root, MPI_Comm comm = MPI_COMM_WORLD)

} // namespace comm
} // namespace distributed
#endif /* end of include guard: LIBDISTRIBUTED_COMM_H */
6 changes: 6 additions & 0 deletions include/libdistributed_task_manager.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#ifndef LIBDISTRIBUTED_STOP_TOKEN_H
#define LIBDISTRIBUTED_STOP_TOKEN_H
#include <cstddef>

/**
* \file
Expand Down Expand Up @@ -47,6 +48,11 @@ class TaskManager: public StopToken {
* Request a sub-communicator for the current process group
*/
virtual CommunicatorType get_subcommunicator()=0;

/**
* \returns the number of groups of worker processes available
*/
virtual size_t num_workers() const=0;
};

}
Expand Down
14 changes: 13 additions & 1 deletion include/libdistributed_work_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,18 @@
namespace distributed {
namespace queue {

/**
* type trait to determine task type from an iterator type
*
*/
template <class Type>
struct iterator_to_request_type {
/**
* the contained within the iterator
*/
using type = typename impl::iterator_to_value_type<Type>::type;
};

/**
* \param[in] comm the communicator to duplicate to use for communication
* \param[in] tasks_begin an iterator to the beginning of the task list
Expand Down Expand Up @@ -57,7 +69,7 @@ void work_queue (

using RequestType = typename impl::iterator_to_value_type<TaskRandomIt>::type;
using ResponseType = decltype( impl::maybe_stop_token( worker_fn,
std::declval<RequestType>(),
std::move(std::declval<RequestType>()),
std::declval<TaskManager<RequestType, MPI_Comm>&>()
)
);
Expand Down
66 changes: 50 additions & 16 deletions include/libdistributed_work_queue_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <set>
#include <type_traits>
#include <algorithm>
#include <unordered_set>
#include <mpi.h>

#include "libdistributed_task_manager.h"
Expand All @@ -12,6 +13,17 @@
namespace distributed {
namespace queue {
namespace impl {

template <class Container>
size_t count_unique(Container const& c) {
std::unordered_set<typename Container::value_type> seen;
return std::count_if(
std::begin(c),
std::end(c),
[&seen](typename Container::const_reference v) {
return seen.insert(v).second;
});
}

template <class T>
struct iterator_to_value_type {
Expand Down Expand Up @@ -49,6 +61,7 @@ class WorkerTaskManager : public TaskManager<RequestType, MPI_Comm>
, stop_request()
, flag(0)
, ROOT(options.get_root())
, num_workers_v(count_unique(options.get_groups()) - 1)
{
MPI_Ibcast(&done, 1, MPI_INT, ROOT, queue_comm, &stop_request);
}
Expand Down Expand Up @@ -86,25 +99,31 @@ class WorkerTaskManager : public TaskManager<RequestType, MPI_Comm>
return subcomm;
}

size_t num_workers() const override {
return num_workers_v;
}

private:
MPI_Comm queue_comm, subcomm;
MPI_Request stop_request;
int flag;
int done;//used by MPI_Ibcast for syncronization, do not read unless flag==true
const int ROOT;
size_t num_workers_v;
};

template <class RequestType>
class MasterTaskManager : public TaskManager<RequestType, MPI_Comm>
{
public:
template <class TaskIt>
MasterTaskManager(MPI_Comm comm, MPI_Comm subcomm, TaskIt begin, TaskIt end, work_queue_options<RequestType> const& options)
MasterTaskManager(MPI_Comm comm, MPI_Comm subcomm, TaskIt begin, TaskIt end, work_queue_options<RequestType> const& options, size_t num_workers_v)
: TaskManager<RequestType, MPI_Comm>(),
comm(comm),
subcomm(subcomm),
is_stop_requested(0),
ROOT(options.get_root())
ROOT(options.get_root()),
num_workers_v(count_unique(options.get_groups())-1)
{
while(begin != end) {
requests.emplace(*begin);
Expand Down Expand Up @@ -150,6 +169,10 @@ class MasterTaskManager : public TaskManager<RequestType, MPI_Comm>
return subcomm;
}

size_t num_workers() const override{
return num_workers_v;
}

void recv_tasks() {
int num_has_tasks = 0;
int stop_requested_flag = 0;
Expand All @@ -173,6 +196,7 @@ class MasterTaskManager : public TaskManager<RequestType, MPI_Comm>
int is_stop_requested;
std::queue<RequestType> requests;
const int ROOT;
size_t num_workers_v;
};

template <class RequestType, class ResponseType, class TaskForwardIt, class Function>
Expand All @@ -194,7 +218,7 @@ void master_main(MPI_Comm subcom, TaskForwardIt tasks_begin, TaskForwardIt tasks

//create task queue

MasterTaskManager<RequestType> task_manager(comm, subcom, tasks_begin, tasks_end, options);
MasterTaskManager<RequestType> task_manager(comm, subcom, tasks_begin, tasks_end, options, workers.size());

int outstanding = 0;
while((!task_manager.empty() and !task_manager.stop_requested()) or outstanding > 0) {
Expand Down Expand Up @@ -224,7 +248,7 @@ void master_main(MPI_Comm subcom, TaskForwardIt tasks_begin, TaskForwardIt tasks
int not_done = false;
comm::bcast(not_done, 0, subcom);
comm::bcast(response, 0, subcom);
maybe_stop_token(master_fn, response, task_manager);
maybe_stop_token(master_fn, std::move(response), task_manager);
task_manager.recv_tasks();
workers.push(response_status.MPI_SOURCE);
outstanding--;
Expand Down Expand Up @@ -267,7 +291,8 @@ class MasterAuxTaskManager : public TaskManager<RequestType, MPI_Comm>
stop_request(),
flag(0),
request_done(0),
ROOT(options.get_root())
ROOT(options.get_root()),
num_workers_v(count_unique(options.get_groups()) -1)
{
MPI_Ibcast(&done, 1, MPI_INT, ROOT, queue_comm, &stop_request);
}
Expand Down Expand Up @@ -301,25 +326,31 @@ class MasterAuxTaskManager : public TaskManager<RequestType, MPI_Comm>
MPI_Comm get_subcommunicator() override {
return subcomm;
}

size_t num_workers() const override {
return num_workers_v;
}

private:
MPI_Comm queue_comm, subcomm;
MPI_Request stop_request;
std::vector<RequestType> requests;
int flag, done, request_done;
const int ROOT;
size_t num_workers_v;
};

template <class RequestType, class ResponseType, class Function>
void master_aux(MPI_Comm subcomm, Function master_fn, work_queue_options<RequestType> const& options) {
MasterAuxTaskManager<RequestType, ResponseType> task_manager(subcomm, options);
ResponseType response;

int master_done = false;
while(!master_done) {
ResponseType response;
comm::bcast(master_done, 0, subcomm);
if(!master_done) {
comm::bcast(response, 0, subcomm);
maybe_stop_token(master_fn, response, task_manager);
maybe_stop_token(master_fn, std::move(response), task_manager);
task_manager.send_tasks();
}
}
Expand All @@ -334,7 +365,7 @@ template <typename Function, class Message, class RequestType>
struct takes_stop_token<
Function, Message, RequestType,
std::void_t<decltype(std::declval<Function>()(
std::declval<Message>(), std::declval<TaskManager<RequestType, MPI_Comm>&>()))>> : std::true_type
std::move(std::declval<Message>()), std::declval<TaskManager<RequestType, MPI_Comm>&>()))>> : std::true_type
{};

template <class Function, class Message, class RequestType, class Enable = void>
Expand All @@ -354,9 +385,9 @@ struct maybe_stop_token_impl<Function, Message, RequestType,
};

template <class Function, class Message, class RequestType>
auto maybe_stop_token(Function f, Message m, TaskManager<RequestType, MPI_Comm>& s)
auto maybe_stop_token(Function f, Message&& m, TaskManager<RequestType, MPI_Comm>& s)
{
return maybe_stop_token_impl<Function, Message, RequestType>::call(f, m, s);
return maybe_stop_token_impl<Function, Message, RequestType>::call(f, std::forward<Message>(m), s);
}

template <class RequestType, class ResponseType, class Function>
Expand All @@ -380,7 +411,7 @@ void worker_main(MPI_Comm subcomm, Function worker_fn, work_queue_options<Reques
{
comm::bcast(worker_done, 0, subcomm);
comm::bcast(request, 0, subcomm);
auto response = maybe_stop_token(worker_fn, request, stop_token);
auto response = maybe_stop_token(worker_fn, std::move(request), stop_token);
comm::send(response, options.get_root(), (int)worker_status::done, queue_comm);
}
break;
Expand All @@ -396,15 +427,14 @@ void worker_main(MPI_Comm subcomm, Function worker_fn, work_queue_options<Reques

template <class RequestType, class ResponseType, class Function>
void worker_aux(MPI_Comm subcomm, Function worker_fn, work_queue_options<RequestType> const& options) {
RequestType request;
ResponseType response;
WorkerTaskManager<RequestType, ResponseType> task_manager(subcomm, options);
int worker_done = false;
while(!worker_done) {
comm::bcast(worker_done, 0, subcomm);
if(!worker_done) {
RequestType request;
comm::bcast(request, 0, subcomm);
maybe_stop_token(worker_fn, request, task_manager);
maybe_stop_token(worker_fn, std::move(request), task_manager);
}
}

Expand Down Expand Up @@ -455,6 +485,10 @@ class NoWorkersTaskManager: public TaskManager<RequestType, MPI_Comm> {
return MPI_COMM_SELF;
}

size_t num_workers() const override {
return 1;
}

private:
bool is_stop_requested = false;
std::queue<RequestType> requests{};
Expand All @@ -465,10 +499,10 @@ void no_workers(TaskForwardIt tasks_begin, TaskForwardIt tasks_end, MasterFn mas
NoWorkersTaskManager<RequestType> task_manager(tasks_begin, tasks_end);

while(!task_manager.empty() && !task_manager.stop_requested()) {
RequestType task = task_manager.front();
RequestType task = std::move(task_manager.front());
task_manager.pop();

auto response = maybe_stop_token(worker_fn, task, task_manager);
auto response = maybe_stop_token(worker_fn, std::move(task), task_manager);
maybe_stop_token(master_fn, response, task_manager);
}
}
Expand Down
Loading

0 comments on commit 35f6d26

Please sign in to comment.