From fd16f52ca1706733c146e969509daf4a25473cc5 Mon Sep 17 00:00:00 2001 From: Josh Cunningham Date: Tue, 22 Oct 2024 12:16:24 -0500 Subject: [PATCH 1/9] Add option to disable catchment csv output --- .../catchment/Formulation_Manager.hpp | 27 +++++++++++++++++++ src/core/HY_Features.cpp | 9 ++++++- src/core/HY_Features_MPI.cpp | 9 ++++++- 3 files changed, 43 insertions(+), 2 deletions(-) diff --git a/include/realizations/catchment/Formulation_Manager.hpp b/include/realizations/catchment/Formulation_Manager.hpp index 4140a97309..9ddea7bc4b 100644 --- a/include/realizations/catchment/Formulation_Manager.hpp +++ b/include/realizations/catchment/Formulation_Manager.hpp @@ -318,6 +318,33 @@ namespace realization { //for case where there is no output_root in the realization file return "./"; + + } + + /** + * @brief Check if the formulation has catchment output writing enabled + * + * @code{.cpp} + * // Example config: + * // ... + * // "write_catchment_output": false + * // ... + * const auto manager = Formulation_Manger(CONFIG); + * manager.get_output_root(); + * //> false + * @endcode + * + * @return bool + */ + bool write_catchment_output() const { + const auto write_catchment_output = this->tree.get_optional("write_catchment_output"); + if (write_catchment_output != boost::none && *write_catchment_output != "") { + // if any variation of "false" or "no" or 0 is found, return false + if (write_catchment_output->compare("false") == 0 || write_catchment_output->compare("no") == 0 || write_catchment_output->compare("0") == 0) { + return false; + } + } + return true; } /** diff --git a/src/core/HY_Features.cpp b/src/core/HY_Features.cpp index 91e59cb487..8ef2fdb24c 100644 --- a/src/core/HY_Features.cpp +++ b/src/core/HY_Features.cpp @@ -32,7 +32,14 @@ HY_Features::HY_Features(network::Network network, std::shared_ptrget_formulation(feat_id); - formulation->set_output_stream(formulations->get_output_root() + feat_id + ".csv"); + if (formulations->write_catchment_output() == true) + { + formulation->set_output_stream(formulations->get_output_root() + feat_id + ".csv"); + } + else + { + formulation->set_output_stream("/dev/null"); + } // TODO: add command line or config option to have this be omitted //FIXME why isn't default param working here??? get_output_header_line() fails. formulation->write_output("Time Step,""Time,"+formulation->get_output_header_line(",")+"\n"); diff --git a/src/core/HY_Features_MPI.cpp b/src/core/HY_Features_MPI.cpp index 4eec4e39a3..60fcec075e 100644 --- a/src/core/HY_Features_MPI.cpp +++ b/src/core/HY_Features_MPI.cpp @@ -38,7 +38,14 @@ HY_Features_MPI::HY_Features_MPI( PartitionData partition_data, geojson::GeoJSON { //Find and prepare formulation auto formulation = formulations->get_formulation(feat_id); - formulation->set_output_stream(formulations->get_output_root() + feat_id + ".csv"); + if (formulations->write_catchment_output() == true) + { + formulation->set_output_stream(formulations->get_output_root() + feat_id + ".csv"); + } + else + { + formulation->set_output_stream("/dev/null"); + }; // TODO: add command line or config option to have this be omitted //FIXME why isn't default param working here??? get_output_header_line() fails. formulation->write_output("Time Step,""Time,"+formulation->get_output_header_line(",")+"\n"); From 5396bb769caa53a2355d70a95615f9cf44fb005b Mon Sep 17 00:00:00 2001 From: Josh Cunningham Date: Tue, 22 Oct 2024 14:24:24 -0500 Subject: [PATCH 2/9] perf - reduce mpi barrier poll rate --- src/NGen.cpp | 47 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/src/NGen.cpp b/src/NGen.cpp index 1c0cf2bf28..9c26d2fc19 100644 --- a/src/NGen.cpp +++ b/src/NGen.cpp @@ -558,24 +558,29 @@ int main(int argc, char *argv[]) { } //done time #if NGEN_WITH_MPI - MPI_Barrier(MPI_COMM_WORLD); -#endif + MPI_Request barrier_request; + MPI_Ibarrier(MPI_COMM_WORLD, &barrier_request); - if (mpi_rank == 0) - { - std::cout << "Finished " << manager->Simulation_Time_Object->get_total_output_times() << " timesteps." << std::endl; + int flag = 0; + const int sleep_microseconds = 100000; // 100 millisecond sleep + + // Wait for all ranks to reach the barrier + while (!flag) { + MPI_Test(&barrier_request, &flag, MPI_STATUS_IGNORE); + if (!flag) { + usleep(sleep_microseconds); + } } +#endif + if (mpi_rank == 0) { + std::cout << "Finished " << manager->Simulation_Time_Object->get_total_output_times() << " timesteps." << std::endl; auto time_done_simulation = std::chrono::steady_clock::now(); std::chrono::duration time_elapsed_simulation = time_done_simulation - time_done_init; -#if NGEN_WITH_MPI - MPI_Barrier(MPI_COMM_WORLD); -#endif + #if NGEN_WITH_ROUTING - if (mpi_rank == 0) - { // Run t-route from single process if(manager->get_using_routing()) { //Note: Currently, delta_time is set in the t-route yaml configuration file, and the //number_of_timesteps is determined from the total number of nexus outputs in t-route. @@ -586,15 +591,12 @@ int main(int argc, char *argv[]) { int delta_time = manager->Simulation_Time_Object->get_output_interval_seconds(); router->route(number_of_timesteps, delta_time); - } } #endif auto time_done_routing = std::chrono::steady_clock::now(); std::chrono::duration time_elapsed_routing = time_done_routing - time_done_simulation; - if (mpi_rank == 0) - { std::cout << "NGen top-level timings:" << "\n\tNGen::init: " << time_elapsed_init.count() << "\n\tNGen::simulation: " << time_elapsed_simulation.count() @@ -602,10 +604,27 @@ int main(int argc, char *argv[]) { << "\n\tNGen::routing: " << time_elapsed_routing.count() #endif << std::endl; + #if NGEN_WITH_MPI + for (int i = 1; i < mpi_num_procs; ++i) { + MPI_Send(NULL, 0, MPI_INT, i, 0, MPI_COMM_WORLD); + } + } + else { + // Non-root processes + MPI_Request recv_request; + MPI_Irecv(NULL, 0, MPI_INT, 0, 0, MPI_COMM_WORLD, &recv_request); + + int recv_flag = 0; + while (!recv_flag) { + MPI_Test(&recv_request, &recv_flag, MPI_STATUS_IGNORE); + if (!recv_flag) { + usleep(sleep_microseconds); + } + } + #endif } manager->finalize(); - #if NGEN_WITH_MPI MPI_Finalize(); #endif From 4db67e9ae5a086097b14fb758c1b919e5b6a173d Mon Sep 17 00:00:00 2001 From: Josh Cunningham Date: Wed, 23 Oct 2024 18:30:52 -0500 Subject: [PATCH 3/9] add option to disable remote partitioning --- .../catchment/Formulation_Manager.hpp | 26 ++++++++ src/NGen.cpp | 62 ++++++++++--------- 2 files changed, 60 insertions(+), 28 deletions(-) diff --git a/include/realizations/catchment/Formulation_Manager.hpp b/include/realizations/catchment/Formulation_Manager.hpp index 9ddea7bc4b..de8d5267b8 100644 --- a/include/realizations/catchment/Formulation_Manager.hpp +++ b/include/realizations/catchment/Formulation_Manager.hpp @@ -347,6 +347,32 @@ namespace realization { return true; } + /** + * @brief Check if the formulation uses remote partitioning for mpi partitions + * + * @code{.cpp} + * // Example config: + * // ... + * // "remotes_enabled": false + * // ... + * const auto manager = Formulation_Manger(CONFIG); + * manager.get_output_root(); + * //> false + * @endcode + * + * @return bool + */ + bool remotes_enabled() const { + const auto remotes_enabled = this->tree.get_optional("remotes_enabled"); + if (remotes_enabled != boost::none && *remotes_enabled != "") { + // if any variation of "false" or "no" or 0 is found, return false + if (remotes_enabled->compare("false") == 0 || remotes_enabled->compare("no") == 0 || remotes_enabled->compare("0") == 0) { + return false; + } + } + return true; + } + /** * @brief return the layer storage used for formulations * @return a reference to the LayerStorageObject diff --git a/src/NGen.cpp b/src/NGen.cpp index 9c26d2fc19..95bf9628aa 100644 --- a/src/NGen.cpp +++ b/src/NGen.cpp @@ -439,10 +439,16 @@ int main(int argc, char *argv[]) { for(const auto& id : features.nexuses()) { #if NGEN_WITH_MPI if (mpi_num_procs > 1) { + if (manager->remotes_enabled() == true) { if (!features.is_remote_sender_nexus(id)) { nexus_outfiles[id].open(manager->get_output_root() + id + "_output.csv", std::ios::trunc); } - } else { + } + else { + nexus_outfiles[id].open(manager->get_output_root() + id + "_rank_" + std::to_string(mpi_rank) + "_output.csv", std::ios::trunc); + } + } + else { nexus_outfiles[id].open(manager->get_output_root() + id + "_output.csv", std::ios::trunc); } #else @@ -573,37 +579,37 @@ int main(int argc, char *argv[]) { } #endif if (mpi_rank == 0) { - std::cout << "Finished " << manager->Simulation_Time_Object->get_total_output_times() << " timesteps." << std::endl; + std::cout << "Finished " << manager->Simulation_Time_Object->get_total_output_times() << " timesteps." << std::endl; - auto time_done_simulation = std::chrono::steady_clock::now(); - std::chrono::duration time_elapsed_simulation = time_done_simulation - time_done_init; + auto time_done_simulation = std::chrono::steady_clock::now(); + std::chrono::duration time_elapsed_simulation = time_done_simulation - time_done_init; #if NGEN_WITH_ROUTING - if(manager->get_using_routing()) { - //Note: Currently, delta_time is set in the t-route yaml configuration file, and the - //number_of_timesteps is determined from the total number of nexus outputs in t-route. - //It is recommended to still pass these values to the routing_py_adapter object in - //case a future implmentation needs these two values from the ngen framework. - int number_of_timesteps = manager->Simulation_Time_Object->get_total_output_times(); - - int delta_time = manager->Simulation_Time_Object->get_output_interval_seconds(); - - router->route(number_of_timesteps, delta_time); - } + if(manager->get_using_routing()) { + //Note: Currently, delta_time is set in the t-route yaml configuration file, and the + //number_of_timesteps is determined from the total number of nexus outputs in t-route. + //It is recommended to still pass these values to the routing_py_adapter object in + //case a future implmentation needs these two values from the ngen framework. + int number_of_timesteps = manager->Simulation_Time_Object->get_total_output_times(); + + int delta_time = manager->Simulation_Time_Object->get_output_interval_seconds(); + + router->route(number_of_timesteps, delta_time); + } #endif - auto time_done_routing = std::chrono::steady_clock::now(); - std::chrono::duration time_elapsed_routing = time_done_routing - time_done_simulation; + auto time_done_routing = std::chrono::steady_clock::now(); + std::chrono::duration time_elapsed_routing = time_done_routing - time_done_simulation; - std::cout << "NGen top-level timings:" - << "\n\tNGen::init: " << time_elapsed_init.count() - << "\n\tNGen::simulation: " << time_elapsed_simulation.count() -#if NGEN_WITH_ROUTING - << "\n\tNGen::routing: " << time_elapsed_routing.count() -#endif - << std::endl; + std::cout << "NGen top-level timings:" + << "\n\tNGen::init: " << time_elapsed_init.count() + << "\n\tNGen::simulation: " << time_elapsed_simulation.count() + #if NGEN_WITH_ROUTING + << "\n\tNGen::routing: " << time_elapsed_routing.count() + #endif + << std::endl; #if NGEN_WITH_MPI for (int i = 1; i < mpi_num_procs; ++i) { MPI_Send(NULL, 0, MPI_INT, i, 0, MPI_COMM_WORLD); @@ -623,11 +629,11 @@ int main(int argc, char *argv[]) { } #endif } - - manager->finalize(); -#if NGEN_WITH_MPI + + manager->finalize(); + #if NGEN_WITH_MPI MPI_Finalize(); -#endif + #endif return 0; } From 532ece8952f3a8acc84c6ea19a911aa332fe007f Mon Sep 17 00:00:00 2001 From: Josh Cunningham Date: Wed, 23 Oct 2024 18:34:36 -0500 Subject: [PATCH 4/9] forcing_para --- include/realizations/catchment/Formulation_Manager.hpp | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/include/realizations/catchment/Formulation_Manager.hpp b/include/realizations/catchment/Formulation_Manager.hpp index de8d5267b8..dd8cd1c1c1 100644 --- a/include/realizations/catchment/Formulation_Manager.hpp +++ b/include/realizations/catchment/Formulation_Manager.hpp @@ -448,9 +448,19 @@ namespace realization { } forcing_params get_forcing_params(const geojson::PropertyMap &forcing_prop_map, std::string identifier, simulation_time_params &simulation_time_config) { + int rank; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); std::string path = ""; if(forcing_prop_map.count("path") != 0){ path = forcing_prop_map.at("path").as_string(); + int id_index = path.find("{{id}}"); + int partition_id_index = path.find("{{partition_id}}"); + if (id_index != std::string::npos) { + path = path.replace(id_index, sizeof("{{id}}") - 1, identifier); + } + if (partition_id_index != std::string::npos) { + path = path.replace(partition_id_index, sizeof("{{partition_id}}") - 1, std::to_string(rank)); + } } std::string provider; if(forcing_prop_map.count("provider") != 0){ From 9db90e94c7f57fc30e091a0b6df9484e4416e6a7 Mon Sep 17 00:00:00 2001 From: Josh Cunningham Date: Thu, 24 Oct 2024 15:09:36 -0500 Subject: [PATCH 5/9] add option to disable netcdf forcing cache --- include/forcing/AorcForcing.hpp | 5 +- .../forcing/NetCDFPerFeatureDataProvider.hpp | 5 +- .../catchment/Formulation_Constructors.hpp | 2 +- .../catchment/Formulation_Manager.hpp | 15 +- src/forcing/NetCDFPerFeatureDataProvider.cpp | 187 +++++++++--------- 5 files changed, 116 insertions(+), 98 deletions(-) diff --git a/include/forcing/AorcForcing.hpp b/include/forcing/AorcForcing.hpp index 58ea21753d..136ef10792 100644 --- a/include/forcing/AorcForcing.hpp +++ b/include/forcing/AorcForcing.hpp @@ -40,11 +40,12 @@ struct forcing_params std::string provider; time_t simulation_start_t; time_t simulation_end_t; + bool enable_cache = true; /* Constructor for forcing_params */ - forcing_params(std::string path, std::string provider, std::string start_time, std::string end_time): - path(path), provider(provider), start_time(start_time), end_time(end_time) + forcing_params(std::string path, std::string provider, std::string start_time, std::string end_time, bool enable_cache) : + path(path), provider(provider), start_time(start_time), end_time(end_time), enable_cache(enable_cache) { /// \todo converting to UTC can be tricky, especially if thread safety is a concern /* https://stackoverflow.com/questions/530519/stdmktime-and-timezone-info */ diff --git a/include/forcing/NetCDFPerFeatureDataProvider.hpp b/include/forcing/NetCDFPerFeatureDataProvider.hpp index 92ef771fff..85373ecb53 100644 --- a/include/forcing/NetCDFPerFeatureDataProvider.hpp +++ b/include/forcing/NetCDFPerFeatureDataProvider.hpp @@ -53,14 +53,14 @@ namespace data_access * @param log_s An output log stream for messages from the underlying library. If a provider object for * the given path already exists, this argument will be ignored. */ - static std::shared_ptr get_shared_provider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s); + static std::shared_ptr get_shared_provider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s, bool enable_cache); /** * @brief Cleanup the shared providers cache, ensuring that the files get closed. */ static void cleanup_shared_providers(); - NetCDFPerFeatureDataProvider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s); + NetCDFPerFeatureDataProvider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s, bool enable_cache); // Default implementation defined in the .cpp file so that // client code doesn't need to have the full definition of @@ -135,6 +135,7 @@ namespace data_access std::map ncvar_cache; std::map units_cache; boost::compute::detail::lru_cache>> value_cache; + bool enable_cache; size_t cache_slice_t_size = 1; size_t cache_slice_c_size = 1; diff --git a/include/realizations/catchment/Formulation_Constructors.hpp b/include/realizations/catchment/Formulation_Constructors.hpp index 8f1fa5aeea..24b0c2b38c 100644 --- a/include/realizations/catchment/Formulation_Constructors.hpp +++ b/include/realizations/catchment/Formulation_Constructors.hpp @@ -49,7 +49,7 @@ namespace realization { } #if NGEN_WITH_NETCDF else if (forcing_config.provider == "NetCDF"){ - fp = data_access::NetCDFPerFeatureDataProvider::get_shared_provider(forcing_config.path, forcing_config.simulation_start_t, forcing_config.simulation_end_t, output_stream); + fp = data_access::NetCDFPerFeatureDataProvider::get_shared_provider(forcing_config.path, forcing_config.simulation_start_t, forcing_config.simulation_end_t, output_stream, forcing_config.enable_cache); } #endif else if (forcing_config.provider == "NullForcingProvider"){ diff --git a/include/realizations/catchment/Formulation_Manager.hpp b/include/realizations/catchment/Formulation_Manager.hpp index dd8cd1c1c1..3a954547a6 100644 --- a/include/realizations/catchment/Formulation_Manager.hpp +++ b/include/realizations/catchment/Formulation_Manager.hpp @@ -450,6 +450,12 @@ namespace realization { forcing_params get_forcing_params(const geojson::PropertyMap &forcing_prop_map, std::string identifier, simulation_time_params &simulation_time_config) { int rank; MPI_Comm_rank(MPI_COMM_WORLD, &rank); + bool enable_cache = true; + + if (forcing_prop_map.count("enable_cache") != 0) { + enable_cache = forcing_prop_map.at("enable_cache").as_boolean(); + } + std::string path = ""; if(forcing_prop_map.count("path") != 0){ path = forcing_prop_map.at("path").as_string(); @@ -471,7 +477,8 @@ namespace realization { path, provider, simulation_time_config.start_time, - simulation_time_config.end_time + simulation_time_config.end_time, + enable_cache ); } @@ -560,7 +567,8 @@ namespace realization { path + entry->d_name, provider, simulation_time_config.start_time, - simulation_time_config.end_time + simulation_time_config.end_time, + enable_cache ); } else if ( entry->d_type == DT_UNKNOWN ) @@ -579,7 +587,8 @@ namespace realization { path + entry->d_name, provider, simulation_time_config.start_time, - simulation_time_config.end_time + simulation_time_config.end_time, + enable_cache ); } throw std::runtime_error("Forcing data is path "+path+entry->d_name+" is not a file"); diff --git a/src/forcing/NetCDFPerFeatureDataProvider.cpp b/src/forcing/NetCDFPerFeatureDataProvider.cpp index 2ddb9ab578..a6ba03b840 100644 --- a/src/forcing/NetCDFPerFeatureDataProvider.cpp +++ b/src/forcing/NetCDFPerFeatureDataProvider.cpp @@ -10,14 +10,14 @@ std::map namespace data_access { -std::shared_ptr NetCDFPerFeatureDataProvider::get_shared_provider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s) +std::shared_ptr NetCDFPerFeatureDataProvider::get_shared_provider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s, bool enable_cache) { const std::lock_guard lock(shared_providers_mutex); std::shared_ptr p; if(shared_providers.count(input_path) > 0){ p = shared_providers[input_path]; } else { - p = std::make_shared(input_path, sim_start, sim_end, log_s); + p = std::make_shared(input_path, sim_start, sim_end, log_s, enable_cache); shared_providers[input_path] = p; } return p; @@ -30,9 +30,10 @@ void NetCDFPerFeatureDataProvider::cleanup_shared_providers() shared_providers.clear(); } -NetCDFPerFeatureDataProvider::NetCDFPerFeatureDataProvider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s) : log_stream(log_s), value_cache(20), +NetCDFPerFeatureDataProvider::NetCDFPerFeatureDataProvider(std::string input_path, time_t sim_start, time_t sim_end, utils::StreamHandler log_s, bool enable_cache) : log_stream(log_s), value_cache(20), sim_start_date_time_epoch(sim_start), - sim_end_date_time_epoch(sim_end) + sim_end_date_time_epoch(sim_end), + enable_cache(enable_cache) { //size_t sizep = 1073741824, nelemsp = 202481; //float preemptionp = 0.75; @@ -322,105 +323,112 @@ size_t NetCDFPerFeatureDataProvider::get_ts_index_for_time(const time_t &epoch_t double NetCDFPerFeatureDataProvider::get_value(const CatchmentAggrDataSelector& selector, ReSampleMethod m) { auto init_time = selector.get_init_time(); - auto stop_time = init_time + selector.get_duration_secs(); // scope hiding! BAD JUJU! - size_t idx1 = get_ts_index_for_time(init_time); - size_t idx2; - try { - idx2 = get_ts_index_for_time(stop_time-1); // Don't include next timestep when duration % timestep = 0 - } - catch(const std::out_of_range &e){ - idx2 = get_ts_index_for_time(this->stop_time-1); //to the edge - } - - auto stride = idx2 - idx1; - - std::vector start, count; - auto cat_pos = id_pos[selector.get_id()]; - - - - double t1 = time_vals[idx1]; - double t2 = time_vals[idx2]; - + std::vector start, count; double rvalue = 0.0; - - auto ncvar = get_ncvar(selector.get_variable_name()); - std::string native_units = get_ncvar_units(selector.get_variable_name()); + auto ncvar = get_ncvar(selector.get_variable_name()); - auto read_len = idx2 - idx1 + 1; - - std::vector raw_values; - raw_values.resize(read_len); - - //TODO: Currently assuming a whole variable cache slice across all catchments for a single timestep...but some stuff here to support otherwise. - size_t cache_slices_t_n = read_len / cache_slice_t_size; // Integer division! - // For reference: https://stackoverflow.com/a/72030286 - for( size_t i = 0; i < cache_slices_t_n; i++ ) { - std::shared_ptr> cached; - int cache_t_idx = (idx1 - (idx1 % cache_slice_t_size) + i); - std::string key = ncvar.getName() + "|" + std::to_string(cache_t_idx); - if(value_cache.contains(key)){ - cached = value_cache.get(key).get(); - } else { - cached = std::make_shared>(cache_slice_c_size * cache_slice_t_size); - start.clear(); - start.push_back(0); // only always 0 when cache_slice_c_size = numids! - start.push_back(cache_t_idx * cache_slice_t_size); - count.clear(); - count.push_back(cache_slice_c_size); - count.push_back(cache_slice_t_size); // Must be 1 for now!...probably... - ncvar.getVar(start,count,&(*cached)[0]); - value_cache.insert(key, cached); + if (enable_cache == false) + { + // vector start is the catchment index and the time index + // vector count is how much to read in both directions + start.clear(); + start.push_back(cat_pos); + start.push_back(idx1); + count.clear(); + count.push_back(1); + count.push_back(1); + ncvar.getVar(start, count, &rvalue); + } + else + { + auto stop_time = init_time + selector.get_duration_secs(); // scope hiding! BAD JUJU! + + size_t idx2; + try { + idx2 = get_ts_index_for_time(stop_time-1); // Don't include next timestep when duration % timestep = 0 } - for( size_t j = 0; j < cache_slice_t_size; j++){ - raw_values[i+j] = cached->at((j*cache_slice_t_size) + cat_pos); + catch(const std::out_of_range &e){ + idx2 = get_ts_index_for_time(this->stop_time-1); //to the edge } - } - - rvalue = 0.0; - - double a , b = 0.0; - - a = 1.0 - ( (t1 - init_time) / time_stride ); - rvalue += (a * raw_values[0]); + auto stride = idx2 - idx1; + + double t1 = time_vals[idx1]; + double t2 = time_vals[idx2]; + + auto read_len = idx2 - idx1 + 1; + + std::vector raw_values; + raw_values.resize(read_len); + + + + //TODO: Currently assuming a whole variable cache slice across all catchments for a single timestep...but some stuff here to support otherwise. + size_t cache_slices_t_n = read_len / cache_slice_t_size; // Integer division! + // For reference: https://stackoverflow.com/a/72030286 + for( size_t i = 0; i < cache_slices_t_n; i++ ) { + std::shared_ptr> cached; + int cache_t_idx = (idx1 - (idx1 % cache_slice_t_size) + i); + std::string key = ncvar.getName() + "|" + std::to_string(cache_t_idx); + if(value_cache.contains(key)){ + cached = value_cache.get(key).get(); + } else { + cached = std::make_shared>(cache_slice_c_size * cache_slice_t_size); + start.clear(); + start.push_back(0); // only always 0 when cache_slice_c_size = numids! + start.push_back(cache_t_idx * cache_slice_t_size); + count.clear(); + count.push_back(cache_slice_c_size); + count.push_back(cache_slice_t_size); // Must be 1 for now!...probably... + ncvar.getVar(start,count,&(*cached)[0]); + value_cache.insert(key, cached); + } + for( size_t j = 0; j < cache_slice_t_size; j++){ + raw_values[i+j] = cached->at((j*cache_slice_t_size) + cat_pos); + } + } + rvalue = 0.0; - for( size_t i = 1; i < raw_values.size() -1; ++i ) - { - rvalue += raw_values[i]; - } + double a , b = 0.0; + + a = 1.0 - ( (t1 - init_time) / time_stride ); + rvalue += (a * raw_values[0]); - if ( raw_values.size() > 1) // likewise the last data value may not be fully in the window - { - b = (stop_time - t2) / time_stride; - rvalue += (b * raw_values.back() ); - } + for( size_t i = 1; i < raw_values.size() -1; ++i ) + { + rvalue += raw_values[i]; + } - // account for the resampling methods - switch(m) - { - case SUM: // we allready have the sum so do nothing - ; - break; - - case MEAN: - { - // This is getting a length weighted mean - // the data values where allready scaled for where there was only partial use of a data value - // so we just need to do a final scale to account for the differnce between time_stride and duration_s - - double scale_factor = (selector.get_duration_secs() > time_stride ) ? (time_stride / selector.get_duration_secs()) : (1.0 / (a + b)); - rvalue *= scale_factor; + if ( raw_values.size() > 1) // likewise the last data value may not be fully in the window + { + b = (stop_time - t2) / time_stride; + rvalue += (b * raw_values.back() ); } - break; + // account for the resampling methods + switch(m) + { + case SUM: // we allready have the sum so do nothing + ; + break; + + case MEAN: + { + // This is getting a length weighted mean + // the data values where allready scaled for where there was only partial use of a data value + // so we just need to do a final scale to account for the differnce between time_stride and duration_s + + double scale_factor = (selector.get_duration_secs() > time_stride ) ? (time_stride / selector.get_duration_secs()) : (1.0 / (a + b)); + rvalue *= scale_factor; + } + break; - default: - ; + default: + ; + } } - try { return UnitsHelper::get_converted_value(native_units, rvalue, selector.get_output_units()); @@ -433,7 +441,6 @@ double NetCDFPerFeatureDataProvider::get_value(const CatchmentAggrDataSelector& return rvalue; } - return rvalue; } std::vector NetCDFPerFeatureDataProvider::get_values(const CatchmentAggrDataSelector& selector, data_access::ReSampleMethod m) From 1a16970ab1f1b93b9f61f2675e0187f15f05636a Mon Sep 17 00:00:00 2001 From: Josh Cunningham Date: Fri, 25 Oct 2024 15:26:58 -0500 Subject: [PATCH 6/9] add helper scripts for round robin partitioning --- .../data_conversion/no_remote_merging.py | 57 +++++++++++++++++ .../partitioning/round_robin_partioning.py | 62 +++++++++++++++++++ 2 files changed, 119 insertions(+) create mode 100644 utilities/data_conversion/no_remote_merging.py create mode 100644 utilities/partitioning/round_robin_partioning.py diff --git a/utilities/data_conversion/no_remote_merging.py b/utilities/data_conversion/no_remote_merging.py new file mode 100644 index 0000000000..debe7f986f --- /dev/null +++ b/utilities/data_conversion/no_remote_merging.py @@ -0,0 +1,57 @@ +from pathlib import Path +import argparse +import os +import polars as pl +from collections import defaultdict + + +def sum_csv_files(file_pattern: str): + # Scan CSV files and assign column names + df = pl.scan_csv(file_pattern, has_header=False, new_columns=["index", "timestamp", "value"]) + # Group by timestamp and sum the values + df = df.group_by("index").agg( + pl.col("timestamp").first().str.strip_chars().alias("timestamp"), + pl.col("value").str.strip_chars().cast(pl.Float32).sum().alias("total_value"), + ) + + return df.sort("index") + + +def merge_outputs(output_path: Path) -> None: + # get all the file names in the folder + output_files = list(output_path.glob("*_rank_*.csv")) + # print(output_files) + # parse out nexus id from the file names + total_files = len(output_files) + # sort the files + + nexuse_counts = defaultdict(int) + + for file in output_files: + nexus_id = file.stem.split("_")[0] + nexuse_counts[nexus_id] += 1 + + for file in output_files: + nexus_id = file.stem.split("_")[0] + if nexuse_counts[nexus_id] == 1: + os.rename(file, output_path / f"{nexus_id}.csv") + + for nexus, count in nexuse_counts.items(): + if count > 1: + df = sum_csv_files(f"{output_path}/{nexus}_*.csv") + df.collect().write_csv(f"{output_path}/{nexus}.csv", include_header=False) + for file in output_path.glob(f"{nexus}_*.csv"): + file.unlink() + + # delete the files that were merged + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Merge the per rank outputs created by running ngen with round robin partitioning." + ) + parser.add_argument("output_path", type=Path, help="ngen output folder") + + args = parser.parse_args() + + merge_outputs(args.output_path) diff --git a/utilities/partitioning/round_robin_partioning.py b/utilities/partitioning/round_robin_partioning.py new file mode 100644 index 0000000000..a785b96847 --- /dev/null +++ b/utilities/partitioning/round_robin_partioning.py @@ -0,0 +1,62 @@ +import json +from pathlib import Path +import sqlite3 +import argparse +import multiprocessing + + +def get_cat_to_nex_flowpairs(hydrofabric: Path) -> list: + with sqlite3.connect(f"{hydrofabric}") as conn: + cursor = conn.cursor() + cursor.execute("SELECT divide_id, toid FROM divides") + cat_to_nex_pairs = cursor.fetchall() + return cat_to_nex_pairs + + +def create_partitions(hydrofabric: Path, output_path: Path, num_partitions: int = None) -> None: + if num_partitions is None: + num_partitions = multiprocessing.cpu_count() + + cat_to_nex_pairs = get_cat_to_nex_flowpairs(hydrofabric) + + # sort the cat nex tuples by the nex id + cat_to_nex_pairs = sorted(cat_to_nex_pairs, key=lambda x: x[1]) + + num_partitions = min(num_partitions, len(cat_to_nex_pairs)) + + cats = set([cat for cat, _ in cat_to_nex_pairs]) + nexs = set([nex for _, nex in cat_to_nex_pairs]) + print(f"Number of partitions: {num_partitions}") + print(f"Number of cats: {len(cats)}") + print(f"Number of nexus: {len(nexs)}") + + partitions = [] + for i in range(num_partitions): + part = {} + part["id"] = i + part["cat-ids"] = [] + part["nex-ids"] = [] + part["remote-connections"] = [] + partitions.append(part) + + for i, (cat_id, nex_id) in enumerate(cat_to_nex_pairs): + print(i) + part_id = i % num_partitions + partitions[part_id]["cat-ids"].append(cat_id) + partitions[part_id]["nex-ids"].append(nex_id) + + with open(output_path, "w") as f: + f.write(json.dumps({"partitions": partitions}, indent=4)) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Create partitions from hydrofabric data.") + parser.add_argument("hydrofabric", type=Path, help="Path to the hydrofabric SQLite file.") + parser.add_argument("output_path", type=Path, help="Path to the output JSON file.") + parser.add_argument( + "-n", "--num_partitions", type=int, default=None, help="Number of partitions to create." + ) + + args = parser.parse_args() + + create_partitions(args.hydrofabric, args.output_path, args.num_partitions) From 2f6053fd41235fac2b9e478bbce018166f2f8677 Mon Sep 17 00:00:00 2001 From: Josh Cunningham Date: Fri, 25 Oct 2024 15:49:38 -0500 Subject: [PATCH 7/9] fix no_mpi build --- include/realizations/catchment/Formulation_Manager.hpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/include/realizations/catchment/Formulation_Manager.hpp b/include/realizations/catchment/Formulation_Manager.hpp index 3a954547a6..340e0df8bf 100644 --- a/include/realizations/catchment/Formulation_Manager.hpp +++ b/include/realizations/catchment/Formulation_Manager.hpp @@ -448,9 +448,11 @@ namespace realization { } forcing_params get_forcing_params(const geojson::PropertyMap &forcing_prop_map, std::string identifier, simulation_time_params &simulation_time_config) { - int rank; - MPI_Comm_rank(MPI_COMM_WORLD, &rank); + int rank = 0; bool enable_cache = true; + #if NGEN_WITH_MPI + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + #endif if (forcing_prop_map.count("enable_cache") != 0) { enable_cache = forcing_prop_map.at("enable_cache").as_boolean(); From 4018ebf708f6cf7c520db8c156f0f90d0b62d2db Mon Sep 17 00:00:00 2001 From: Josh Cunningham Date: Mon, 28 Oct 2024 14:37:25 -0500 Subject: [PATCH 8/9] fix filenames in output merging script --- utilities/data_conversion/no_remote_merging.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/utilities/data_conversion/no_remote_merging.py b/utilities/data_conversion/no_remote_merging.py index debe7f986f..36ffdd5a99 100644 --- a/utilities/data_conversion/no_remote_merging.py +++ b/utilities/data_conversion/no_remote_merging.py @@ -11,7 +11,7 @@ def sum_csv_files(file_pattern: str): # Group by timestamp and sum the values df = df.group_by("index").agg( pl.col("timestamp").first().str.strip_chars().alias("timestamp"), - pl.col("value").str.strip_chars().cast(pl.Float32).sum().alias("total_value"), + pl.col("value").str.strip_chars().cast(pl.Float64).sum().alias("total_value"), ) return df.sort("index") @@ -34,14 +34,16 @@ def merge_outputs(output_path: Path) -> None: for file in output_files: nexus_id = file.stem.split("_")[0] if nexuse_counts[nexus_id] == 1: - os.rename(file, output_path / f"{nexus_id}.csv") + os.rename(file, output_path / f"{nexus_id}_output.csv") for nexus, count in nexuse_counts.items(): if count > 1: - df = sum_csv_files(f"{output_path}/{nexus}_*.csv") - df.collect().write_csv(f"{output_path}/{nexus}.csv", include_header=False) - for file in output_path.glob(f"{nexus}_*.csv"): + df = sum_csv_files(f"{output_path}/{nexus}_rank_*.csv") + df.collect().write_csv(f"{output_path}/{nexus}_output.csv", include_header=False) + for file in output_path.glob(f"{nexus}_rank_*.csv"): file.unlink() + # use sed to add the spaces in the csv files + os.system(f"sed -i 's/,/, /g' {output_path}/{nexus}_output.csv") # delete the files that were merged From 9f3a314973bb15bf0a89abf3eb94c80d09bae01a Mon Sep 17 00:00:00 2001 From: Josh Cunningham Date: Mon, 4 Nov 2024 13:59:04 -0600 Subject: [PATCH 9/9] parallelise no_remote_merging.py --- .../data_conversion/no_remote_merging.py | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/utilities/data_conversion/no_remote_merging.py b/utilities/data_conversion/no_remote_merging.py index 36ffdd5a99..4422ef9b90 100644 --- a/utilities/data_conversion/no_remote_merging.py +++ b/utilities/data_conversion/no_remote_merging.py @@ -3,7 +3,8 @@ import os import polars as pl from collections import defaultdict - +from functools import partial +import multiprocessing as mp def sum_csv_files(file_pattern: str): # Scan CSV files and assign column names @@ -16,6 +17,17 @@ def sum_csv_files(file_pattern: str): return df.sort("index") +def process_pair(output_path:Path, tup): + nexus, count = tup + if count > 1: + df = sum_csv_files(output_path / f"{nexus}_rank_*.csv") + df.collect().write_csv(output_path / f"{nexus}_output.csv", include_header=False) + for file in output_path.glob(f"{nexus}_rank_*.csv"): + file.unlink() + # use sed to add the spaces in the csv files + output_file = output_path / f"{nexus}_output.csv" + os.system(f"sed -i 's/,/, /g' {output_file.absolute()}") + def merge_outputs(output_path: Path) -> None: # get all the file names in the folder @@ -25,27 +37,21 @@ def merge_outputs(output_path: Path) -> None: total_files = len(output_files) # sort the files - nexuse_counts = defaultdict(int) + nexus_counts = defaultdict(int) for file in output_files: nexus_id = file.stem.split("_")[0] - nexuse_counts[nexus_id] += 1 + nexus_counts[nexus_id] += 1 for file in output_files: nexus_id = file.stem.split("_")[0] - if nexuse_counts[nexus_id] == 1: + if nexus_counts[nexus_id] == 1: os.rename(file, output_path / f"{nexus_id}_output.csv") - for nexus, count in nexuse_counts.items(): - if count > 1: - df = sum_csv_files(f"{output_path}/{nexus}_rank_*.csv") - df.collect().write_csv(f"{output_path}/{nexus}_output.csv", include_header=False) - for file in output_path.glob(f"{nexus}_rank_*.csv"): - file.unlink() - # use sed to add the spaces in the csv files - os.system(f"sed -i 's/,/, /g' {output_path}/{nexus}_output.csv") + partial_process = partial(process_pair, output_path) + with mp.Pool() as p: + p.map(partial_process,nexus_counts.items()) - # delete the files that were merged if __name__ == "__main__":