Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MPI Scaling fix #894

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions include/forcing/AorcForcing.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ struct forcing_params
std::string provider;
time_t simulation_start_t;
time_t simulation_end_t;
bool enable_cache = true;
/*
Constructor for forcing_params
*/
forcing_params(std::string path, std::string provider, std::string start_time, std::string end_time):
path(path), provider(provider), start_time(start_time), end_time(end_time)
forcing_params(std::string path, std::string provider, std::string start_time, std::string end_time, bool enable_cache) :
path(path), provider(provider), start_time(start_time), end_time(end_time), enable_cache(enable_cache)
{
/// \todo converting to UTC can be tricky, especially if thread safety is a concern
/* https://stackoverflow.com/questions/530519/stdmktime-and-timezone-info */
Expand Down
5 changes: 3 additions & 2 deletions include/forcing/NetCDFPerFeatureDataProvider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ namespace data_access
* @param log_s An output log stream for messages from the underlying library. If a provider object for
* the given path already exists, this argument will be ignored.
*/
static std::shared_ptr<NetCDFPerFeatureDataProvider> get_shared_provider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s);
static std::shared_ptr<NetCDFPerFeatureDataProvider> get_shared_provider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s, bool enable_cache);

/**
* @brief Cleanup the shared providers cache, ensuring that the files get closed.
*/
static void cleanup_shared_providers();

NetCDFPerFeatureDataProvider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s);
NetCDFPerFeatureDataProvider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s, bool enable_cache);

// Default implementation defined in the .cpp file so that
// client code doesn't need to have the full definition of
Expand Down Expand Up @@ -135,6 +135,7 @@ namespace data_access
std::map<std::string,netCDF::NcVar> ncvar_cache;
std::map<std::string,std::string> units_cache;
boost::compute::detail::lru_cache<std::string, std::shared_ptr<std::vector<double>>> value_cache;
bool enable_cache;
size_t cache_slice_t_size = 1;
size_t cache_slice_c_size = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ namespace realization {
}
#if NGEN_WITH_NETCDF
else if (forcing_config.provider == "NetCDF"){
fp = data_access::NetCDFPerFeatureDataProvider::get_shared_provider(forcing_config.path, forcing_config.simulation_start_t, forcing_config.simulation_end_t, output_stream);
fp = data_access::NetCDFPerFeatureDataProvider::get_shared_provider(forcing_config.path, forcing_config.simulation_start_t, forcing_config.simulation_end_t, output_stream, forcing_config.enable_cache);
}
#endif
else if (forcing_config.provider == "NullForcingProvider"){
Expand Down
80 changes: 77 additions & 3 deletions include/realizations/catchment/Formulation_Manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,59 @@ namespace realization {

//for case where there is no output_root in the realization file
return "./";

}

/**
* @brief Check if the formulation has catchment output writing enabled
*
* @code{.cpp}
* // Example config:
* // ...
* // "write_catchment_output": false
* // ...
* const auto manager = Formulation_Manger(CONFIG);
* manager.get_output_root();
* //> false
* @endcode
*
* @return bool
*/
bool write_catchment_output() const {
const auto write_catchment_output = this->tree.get_optional<std::string>("write_catchment_output");
if (write_catchment_output != boost::none && *write_catchment_output != "") {
// if any variation of "false" or "no" or 0 is found, return false
if (write_catchment_output->compare("false") == 0 || write_catchment_output->compare("no") == 0 || write_catchment_output->compare("0") == 0) {
return false;
}
}
return true;
}

/**
* @brief Check if the formulation uses remote partitioning for mpi partitions
*
* @code{.cpp}
* // Example config:
* // ...
* // "remotes_enabled": false
* // ...
* const auto manager = Formulation_Manger(CONFIG);
* manager.get_output_root();
* //> false
* @endcode
*
* @return bool
*/
bool remotes_enabled() const {
const auto remotes_enabled = this->tree.get_optional<std::string>("remotes_enabled");
if (remotes_enabled != boost::none && *remotes_enabled != "") {
// if any variation of "false" or "no" or 0 is found, return false
if (remotes_enabled->compare("false") == 0 || remotes_enabled->compare("no") == 0 || remotes_enabled->compare("0") == 0) {
return false;
}
}
return true;
}

/**
Expand Down Expand Up @@ -395,9 +448,27 @@ namespace realization {
}

forcing_params get_forcing_params(const geojson::PropertyMap &forcing_prop_map, std::string identifier, simulation_time_params &simulation_time_config) {
int rank = 0;
bool enable_cache = true;
#if NGEN_WITH_MPI
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
#endif

if (forcing_prop_map.count("enable_cache") != 0) {
enable_cache = forcing_prop_map.at("enable_cache").as_boolean();
}

std::string path = "";
if(forcing_prop_map.count("path") != 0){
path = forcing_prop_map.at("path").as_string();
int id_index = path.find("{{id}}");
int partition_id_index = path.find("{{partition_id}}");
if (id_index != std::string::npos) {
path = path.replace(id_index, sizeof("{{id}}") - 1, identifier);
}
if (partition_id_index != std::string::npos) {
path = path.replace(partition_id_index, sizeof("{{partition_id}}") - 1, std::to_string(rank));
}
}
std::string provider;
if(forcing_prop_map.count("provider") != 0){
Expand All @@ -408,7 +479,8 @@ namespace realization {
path,
provider,
simulation_time_config.start_time,
simulation_time_config.end_time
simulation_time_config.end_time,
enable_cache
);
}

Expand Down Expand Up @@ -497,7 +569,8 @@ namespace realization {
path + entry->d_name,
provider,
simulation_time_config.start_time,
simulation_time_config.end_time
simulation_time_config.end_time,
enable_cache
);
}
else if ( entry->d_type == DT_UNKNOWN )
Expand All @@ -516,7 +589,8 @@ namespace realization {
path + entry->d_name,
provider,
simulation_time_config.start_time,
simulation_time_config.end_time
simulation_time_config.end_time,
enable_cache
);
}
throw std::runtime_error("Forcing data is path "+path+entry->d_name+" is not a file");
Expand Down
107 changes: 66 additions & 41 deletions src/NGen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,10 +439,16 @@ int main(int argc, char *argv[]) {
for(const auto& id : features.nexuses()) {
#if NGEN_WITH_MPI
if (mpi_num_procs > 1) {
if (manager->remotes_enabled() == true) {
if (!features.is_remote_sender_nexus(id)) {
nexus_outfiles[id].open(manager->get_output_root() + id + "_output.csv", std::ios::trunc);
}
} else {
}
else {
nexus_outfiles[id].open(manager->get_output_root() + id + "_rank_" + std::to_string(mpi_rank) + "_output.csv", std::ios::trunc);
}
}
else {
nexus_outfiles[id].open(manager->get_output_root() + id + "_output.csv", std::ios::trunc);
}
#else
Expand Down Expand Up @@ -558,57 +564,76 @@ int main(int argc, char *argv[]) {
} //done time

#if NGEN_WITH_MPI
MPI_Barrier(MPI_COMM_WORLD);
#endif

if (mpi_rank == 0)
{
std::cout << "Finished " << manager->Simulation_Time_Object->get_total_output_times() << " timesteps." << std::endl;
}
MPI_Request barrier_request;
MPI_Ibarrier(MPI_COMM_WORLD, &barrier_request);

auto time_done_simulation = std::chrono::steady_clock::now();
std::chrono::duration<double> time_elapsed_simulation = time_done_simulation - time_done_init;
int flag = 0;
const int sleep_microseconds = 100000; // 100 millisecond sleep

#if NGEN_WITH_MPI
MPI_Barrier(MPI_COMM_WORLD);
#endif

#if NGEN_WITH_ROUTING
if (mpi_rank == 0)
{ // Run t-route from single process
if(manager->get_using_routing()) {
//Note: Currently, delta_time is set in the t-route yaml configuration file, and the
//number_of_timesteps is determined from the total number of nexus outputs in t-route.
//It is recommended to still pass these values to the routing_py_adapter object in
//case a future implmentation needs these two values from the ngen framework.
int number_of_timesteps = manager->Simulation_Time_Object->get_total_output_times();

int delta_time = manager->Simulation_Time_Object->get_output_interval_seconds();

router->route(number_of_timesteps, delta_time);
// Wait for all ranks to reach the barrier
while (!flag) {
MPI_Test(&barrier_request, &flag, MPI_STATUS_IGNORE);
if (!flag) {
usleep(sleep_microseconds);
}
}
#endif
if (mpi_rank == 0) {
std::cout << "Finished " << manager->Simulation_Time_Object->get_total_output_times() << " timesteps." << std::endl;

auto time_done_simulation = std::chrono::steady_clock::now();
std::chrono::duration<double> time_elapsed_simulation = time_done_simulation - time_done_init;


auto time_done_routing = std::chrono::steady_clock::now();
std::chrono::duration<double> time_elapsed_routing = time_done_routing - time_done_simulation;

if (mpi_rank == 0)
{
std::cout << "NGen top-level timings:"
<< "\n\tNGen::init: " << time_elapsed_init.count()
<< "\n\tNGen::simulation: " << time_elapsed_simulation.count()
#if NGEN_WITH_ROUTING
<< "\n\tNGen::routing: " << time_elapsed_routing.count()
if(manager->get_using_routing()) {
//Note: Currently, delta_time is set in the t-route yaml configuration file, and the
//number_of_timesteps is determined from the total number of nexus outputs in t-route.
//It is recommended to still pass these values to the routing_py_adapter object in
//case a future implmentation needs these two values from the ngen framework.
int number_of_timesteps = manager->Simulation_Time_Object->get_total_output_times();

int delta_time = manager->Simulation_Time_Object->get_output_interval_seconds();

router->route(number_of_timesteps, delta_time);
}
#endif
<< std::endl;
}

manager->finalize();

#if NGEN_WITH_MPI
auto time_done_routing = std::chrono::steady_clock::now();
std::chrono::duration<double> time_elapsed_routing = time_done_routing - time_done_simulation;

std::cout << "NGen top-level timings:"
<< "\n\tNGen::init: " << time_elapsed_init.count()
<< "\n\tNGen::simulation: " << time_elapsed_simulation.count()
#if NGEN_WITH_ROUTING
<< "\n\tNGen::routing: " << time_elapsed_routing.count()
#endif
<< std::endl;
#if NGEN_WITH_MPI
for (int i = 1; i < mpi_num_procs; ++i) {
MPI_Send(NULL, 0, MPI_INT, i, 0, MPI_COMM_WORLD);
}
}
else {
// Non-root processes
MPI_Request recv_request;
MPI_Irecv(NULL, 0, MPI_INT, 0, 0, MPI_COMM_WORLD, &recv_request);

int recv_flag = 0;
while (!recv_flag) {
MPI_Test(&recv_request, &recv_flag, MPI_STATUS_IGNORE);
if (!recv_flag) {
usleep(sleep_microseconds);
}
}
#endif
}

manager->finalize();
#if NGEN_WITH_MPI
MPI_Finalize();
#endif
#endif

return 0;
}
9 changes: 8 additions & 1 deletion src/core/HY_Features.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@ HY_Features::HY_Features(network::Network network, std::shared_ptr<Formulation_M
{
//Find and prepare formulation
auto formulation = formulations->get_formulation(feat_id);
formulation->set_output_stream(formulations->get_output_root() + feat_id + ".csv");
if (formulations->write_catchment_output() == true)
{
formulation->set_output_stream(formulations->get_output_root() + feat_id + ".csv");
}
else
{
formulation->set_output_stream("/dev/null");
}
// TODO: add command line or config option to have this be omitted
//FIXME why isn't default param working here??? get_output_header_line() fails.
formulation->write_output("Time Step,""Time,"+formulation->get_output_header_line(",")+"\n");
Expand Down
9 changes: 8 additions & 1 deletion src/core/HY_Features_MPI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,14 @@ HY_Features_MPI::HY_Features_MPI( PartitionData partition_data, geojson::GeoJSON
{
//Find and prepare formulation
auto formulation = formulations->get_formulation(feat_id);
formulation->set_output_stream(formulations->get_output_root() + feat_id + ".csv");
if (formulations->write_catchment_output() == true)
{
formulation->set_output_stream(formulations->get_output_root() + feat_id + ".csv");
}
else
{
formulation->set_output_stream("/dev/null");
};
// TODO: add command line or config option to have this be omitted
//FIXME why isn't default param working here??? get_output_header_line() fails.
formulation->write_output("Time Step,""Time,"+formulation->get_output_header_line(",")+"\n");
Expand Down
Loading