From 043cd19f304b276027f324a36063d4a355b01a05 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Wed, 11 Mar 2026 15:21:51 -0400 Subject: [PATCH 01/41] Map NHF hydrofabric to old field names --- src/geopackage/read.cpp | 195 ++++++++++++++++++++-------------------- 1 file changed, 100 insertions(+), 95 deletions(-) diff --git a/src/geopackage/read.cpp b/src/geopackage/read.cpp index 50bc062526..e1fc6172d6 100644 --- a/src/geopackage/read.cpp +++ b/src/geopackage/read.cpp @@ -32,12 +32,6 @@ std::shared_ptr ngen::geopackage::read( // Check for malicious/invalid layer input check_table_name(layer); std::vector features; - if (ids.size() > 0) - features.reserve(ids.size()); - double min_x = std::numeric_limits::infinity(); - double min_y = std::numeric_limits::infinity(); - double max_x = -std::numeric_limits::infinity(); - double max_y = -std::numeric_limits::infinity(); LOG(LogLevel::DEBUG, "Establishing connection to geopackage %s.", gpkg_path.c_str()); ngen::sqlite::database db{gpkg_path}; @@ -62,103 +56,114 @@ std::shared_ptr ngen::geopackage::read( throw std::runtime_error(errmsg); } - // Introspect if the layer is divides to see which ID field is in use - std::string id_column = "id"; - if(layer == "divides"){ - try { - //TODO: A bit primitive. Actually introspect the schema somehow? https://www.sqlite.org/c3ref/funclist.html - auto query_get_first_row = db.query("SELECT divide_id FROM " + layer + " LIMIT 1"); - id_column = "divide_id"; - } - catch (const std::exception& e){ - #ifndef NGEN_QUIET - // output debug info on what is read exactly - read_ss << "WARN: Using legacy ID column \"id\" in layer " << layer << " is DEPRECATED and may stop working at any time." << std::endl; - LOG(read_ss.str(), LogLevel::WARNING); read_ss.str(""); - #endif - } + std::string id_column; + std::string feature_query; + if (layer == "divides") { + id_column = "div_id"; + feature_query = + "SELECT " + "('cat-' || divides.div_id) AS id, " + "('nex-' || flowpaths.dn_nex_id) AS toid, " + "flowpaths.slope AS So, " + "divides.geom AS geom " + "FROM divides " + "LEFT JOIN flowpaths " + "ON divides.div_id = flowpaths.div_id"; + } else if (layer == "nexus") { + id_column = "nex_id"; + feature_query = + "SELECT " + "('nex-' || nexus.nex_id) AS id, " + "IIF(flowpaths.div_id, ('cat-' || flowpaths.div_id), 'terminal') AS toid, " + "flowpaths.slope AS So, " + "nexus.geom AS geom " + "FROM nexus " + "LEFT JOIN flowpaths " + "ON nexus.dn_fp_id = flowpaths.fp_id"; + } else { + Logger::logMsgAndThrowError("Geopackage read only accepts layers `divides` and `nexus`. The layer entered was " + layer); } - // execute sub-queries if the number of IDs gets too long or once if ids.size() == 0 - int bind_limit = 900; - boost::span id_span(ids); - for (int i = 0; i < ids.size() || (i == 0 && ids.size() == 0); i += bind_limit) { - int span_size = (i + bind_limit >= ids.size()) ? (ids.size() - i) : bind_limit; - boost::span sub_ids = id_span.subspan(i, span_size); - - // Layer exists, getting statement for it - // - // this creates a string in the form: - // WHERE id IN (?, ?, ?, ...) - // so that it can be bound by SQLite. - // This is safer than trying to concatenate - // the IDs together. - std::string joined_ids = ""; - if (!sub_ids.empty()) { - joined_ids = " WHERE "+id_column+" IN (?"; - for (size_t i = 1; i < sub_ids.size(); i++) { - joined_ids += ", ?"; + std::string joined_ids = ""; + if (!ids.empty()) { + std::stringstream filter; + filter << " WHERE " << layer << '.' << id_column << " IN ("; + for (size_t i = 0; i < ids.size(); ++i) { + if (i != 0) + filter << ','; + auto &filter_id = ids[i]; + size_t sep_index = filter_id.find('-'); + if (sep_index == std::string::npos) { + sep_index = 0; + } else { + sep_index++; } - joined_ids += ")"; + int id_num = std::atoi(filter_id.c_str() + sep_index); + if (id_num <= 0) + Logger::logMsgAndThrowError("Could not convert input " + layer + " ID into a number: " + filter_id); + filter << id_num; } + filter << ')'; + joined_ids = filter.str(); + } - // Get number of features - auto query_get_layer_count = db.query("SELECT COUNT(*) FROM " + layer + joined_ids, sub_ids); - query_get_layer_count.next(); - const int layer_feature_count = query_get_layer_count.get(0); - - #ifndef NGEN_QUIET - // output debug info on what is read exactly - read_ss << "Reading " << layer_feature_count << " features from layer " << layer << " using ID column `"<< id_column << "`"; - if (!sub_ids.empty()) { - read_ss << " (id subset:"; - for (auto& id : sub_ids) { - read_ss << " " << id; - } - read_ss << ")"; - } - read_ss << std::endl; - LOG(read_ss.str(), LogLevel::DEBUG); read_ss.str(""); - #endif - - // Get layer feature metadata (geometry column name + type) - auto query_get_layer_geom_meta = db.query("SELECT column_name FROM gpkg_geometry_columns WHERE table_name = ?", layer); - query_get_layer_geom_meta.next(); - const std::string layer_geometry_column = query_get_layer_geom_meta.get(0); - - // Get layer - LOG(LogLevel::DEBUG, "Reading %d features from layer %s.", layer_feature_count, layer.c_str()); - auto query_get_layer = db.query("SELECT * FROM " + layer + joined_ids, sub_ids); - query_get_layer.next(); - - // build features out of layer query - if (ids.size() == 0) - features.reserve(layer_feature_count); - while(!query_get_layer.done()) { - geojson::Feature feature = build_feature( - query_get_layer, - id_column, - layer_geometry_column - ); - - features.push_back(feature); - query_get_layer.next(); - } + // Get number of features + auto query_get_layer_count = db.query("SELECT COUNT(*) FROM " + layer + joined_ids); + query_get_layer_count.next(); + const int layer_feature_count = query_get_layer_count.get(0); + features.reserve(layer_feature_count); + if (!ids.empty() && ids.size() != layer_feature_count) { + LOG(LogLevel::WARNING, "The number of input IDs (%d) does not equal the number of features with those IDs in the geopackage (%d) for layer %s.", + ids.size(), layer_feature_count, layer.c_str()); + } - // get layer bounding box from features - // - // GeoPackage contains a bounding box in the SQLite DB, - // however, it is in the SRS of the GPKG. By creating - // the bbox after the features are built, the projection - // is already done. This also should be fairly cheap to do. - for (const auto& feature : features) { - const auto& bbox = feature->get_bounding_box(); - min_x = bbox[0] < min_x ? bbox[0] : min_x; - min_y = bbox[1] < min_y ? bbox[1] : min_y; - max_x = bbox[2] > max_x ? bbox[2] : max_x; - max_y = bbox[3] > max_y ? bbox[3] : max_y; + #ifndef NGEN_QUIET + // output debug info on what is read exactly + read_ss << "Reading " << layer_feature_count << " features from layer " << layer << " using ID column `"<< id_column << "`"; + if (!ids.empty()) { + read_ss << " (id subset:"; + for (auto& id : ids) { + read_ss << " " << id; } + read_ss << ")"; + } + read_ss << std::endl; + LOG(read_ss.str(), LogLevel::DEBUG); read_ss.str(""); + #endif + + // Get layer + LOG(LogLevel::DEBUG, "Reading %d features from layer %s.", layer_feature_count, layer.c_str()); + auto query_get_layer = db.query(feature_query + joined_ids); + query_get_layer.next(); + + // build features out of layer query + while(!query_get_layer.done()) { + geojson::Feature feature = build_feature( + query_get_layer, + "id", + "geom" + ); + + features.push_back(feature); + query_get_layer.next(); + } + // get layer bounding box from features + // + // GeoPackage contains a bounding box in the SQLite DB, + // however, it is in the SRS of the GPKG. By creating + // the bbox after the features are built, the projection + // is already done. This also should be fairly cheap to do. + double min_x = std::numeric_limits::infinity(); + double min_y = std::numeric_limits::infinity(); + double max_x = -std::numeric_limits::infinity(); + double max_y = -std::numeric_limits::infinity(); + for (const auto& feature : features) { + const auto& bbox = feature->get_bounding_box(); + min_x = bbox[0] < min_x ? bbox[0] : min_x; + min_y = bbox[1] < min_y ? bbox[1] : min_y; + max_x = bbox[2] > max_x ? bbox[2] : max_x; + max_y = bbox[3] > max_y ? bbox[3] : max_y; } auto fc = std::make_shared( From 6bed3867d96110b4fd1e693eed94b75bca986464 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Thu, 12 Mar 2026 11:10:12 -0400 Subject: [PATCH 02/41] Improved messaging for geopackage error handling --- src/NGen.cpp | 10 ++++++---- src/partitionGenerator.cpp | 7 ++++--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/NGen.cpp b/src/NGen.cpp index b4b8d14978..dd4ab6b58e 100644 --- a/src/NGen.cpp +++ b/src/NGen.cpp @@ -448,11 +448,12 @@ int run_ngen(int argc, char* argv[], int mpi_num_procs, int mpi_rank) { #if NGEN_WITH_SQLITE3 try { nexus_collection = ngen::geopackage::read(nexusDataFile, "nexus", nexus_subset_ids); - } catch (...) { + } catch (std::exception &e) { // Handle all exceptions std::string msg = "Geopackage error occurred reading nexuses: " + nexusDataFile; LOG(msg,LogLevel::FATAL); - throw std::runtime_error(msg); + LOG(LogLevel::FATAL, e.what()); + throw; } #else LOG(LogLevel::FATAL, "SQLite3 support required to read GeoPackage files."); @@ -480,11 +481,12 @@ int run_ngen(int argc, char* argv[], int mpi_num_procs, int mpi_rank) { try { catchment_collection = ngen::geopackage::read(catchmentDataFile, "divides", catchment_subset_ids); - } catch (...) { + } catch (std::exception &e) { // Handle all exceptions std::string msg = "Geopackage error occurred reading divides: " + catchmentDataFile; LOG(msg,LogLevel::FATAL); - throw std::runtime_error(msg); + LOG(LogLevel::FATAL, e.what()); + throw; } #else diff --git a/src/partitionGenerator.cpp b/src/partitionGenerator.cpp index 16c6b438b7..6b370f3443 100644 --- a/src/partitionGenerator.cpp +++ b/src/partitionGenerator.cpp @@ -434,7 +434,7 @@ int main(int argc, char* argv[]) #if NGEN_WITH_SQLITE3 try { catchment_collection = ngen::geopackage::read(catchmentDataFile, "divides", catchment_subset_ids); - } catch (...) { + } catch (std::exception &e) { // Handle all exceptions std::string msg = "Geopackage error occurred reading divides: " + catchmentDataFile; LOG(msg,LogLevel::FATAL, msg); @@ -474,11 +474,12 @@ int main(int argc, char* argv[]) #if NGEN_WITH_SQLITE3 try { global_nexus_collection = ngen::geopackage::read(nexusDataFile, "nexus", nexus_subset_ids); - } catch (...) { + } catch (std::exception &e) { // Handle all exceptions std::string msg = "Geopackage error occurred reading nexuses: " + nexusDataFile; LOG(msg,LogLevel::FATAL); - throw std::runtime_error(msg); + LOG(LogLevel::FATAL, e.what()); + throw; } #else LOG(msg,LogLevel::FATAL, "SQLite3 support required to read GeoPackage files."); From 66b0169d41f6ea567cbabf7f3b57eed6485ce9c8 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Wed, 18 Mar 2026 14:18:20 -0400 Subject: [PATCH 03/41] Account for realization config json not including feature type prefix --- .../realizations/catchment/Bmi_Multi_Formulation.hpp | 2 +- .../realizations/catchment/Catchment_Formulation.hpp | 6 ++++++ .../realizations/catchment/Formulation_Manager.hpp | 8 +++++--- src/geopackage/read.cpp | 1 + src/realizations/catchment/Catchment_Formulation.cpp | 11 +++++++++++ 5 files changed, 24 insertions(+), 4 deletions(-) diff --git a/include/realizations/catchment/Bmi_Multi_Formulation.hpp b/include/realizations/catchment/Bmi_Multi_Formulation.hpp index 1eb2487917..3387f9fa1e 100644 --- a/include/realizations/catchment/Bmi_Multi_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Multi_Formulation.hpp @@ -627,7 +627,7 @@ namespace realization { // Since this is a nested formulation, support usage of the '{{id}}' syntax for init config file paths. Catchment_Formulation::config_pattern_substitution(properties, BMI_REALIZATION_CFG_PARAM_REQ__INIT_CONFIG, - "{{id}}", id); + "{{id}}", Catchment_Formulation::config_pattern_id_replacement(id)); // Call create_formulation to perform the rest of the typical initialization steps for the formulation. mod->create_formulation(properties); diff --git a/include/realizations/catchment/Catchment_Formulation.hpp b/include/realizations/catchment/Catchment_Formulation.hpp index 31522e953c..b9ababa3b2 100644 --- a/include/realizations/catchment/Catchment_Formulation.hpp +++ b/include/realizations/catchment/Catchment_Formulation.hpp @@ -32,6 +32,12 @@ namespace realization { static void config_pattern_substitution(geojson::PropertyMap &properties, const std::string &key, const std::string &pattern, const std::string &replacement); + /**Remove leading non-numeric characters from the ID string. + * + * This may be needed to correct NGEN adding an identifying prefix to the ID with system file names without the prefix. + */ + static std::string config_pattern_id_replacement(const std::string &id); + /** * Get a header line appropriate for a file made up of entries from this type's implementation of * ``get_output_line_for_timestep``. diff --git a/include/realizations/catchment/Formulation_Manager.hpp b/include/realizations/catchment/Formulation_Manager.hpp index 89383e792d..5584b03115 100644 --- a/include/realizations/catchment/Formulation_Manager.hpp +++ b/include/realizations/catchment/Formulation_Manager.hpp @@ -200,7 +200,7 @@ namespace realization { this->add_formulation( this->construct_formulation_from_config( simulation_time_config, - catchment_config.first, + "cat-" + catchment_config.first, catchment_formulation, output_stream ) @@ -553,7 +553,7 @@ namespace realization { global_copy.formulation.parameters, BMI_REALIZATION_CFG_PARAM_REQ__INIT_CONFIG, "{{id}}", - identifier + Catchment_Formulation::config_pattern_id_replacement(identifier) ); } else { ss.str(""); ss << "init_config is present but empty for identifier: " << identifier << std::endl; @@ -665,7 +665,9 @@ namespace realization { // Replace {{id}} if present if (id_index != std::string::npos) { - filepattern = filepattern.replace(id_index, sizeof("{{id}}") - 1, identifier); + // account generate the regex to search for the ID with or without a prefix + std::string pattern_id = Catchment_Formulation::config_pattern_id_replacement(identifier); + filepattern = filepattern.replace(id_index, sizeof("{{id}}") - 1, pattern_id); } // Compile the file pattern as a regex diff --git a/src/geopackage/read.cpp b/src/geopackage/read.cpp index e1fc6172d6..c2b4dc89e0 100644 --- a/src/geopackage/read.cpp +++ b/src/geopackage/read.cpp @@ -65,6 +65,7 @@ std::shared_ptr ngen::geopackage::read( "('cat-' || divides.div_id) AS id, " "('nex-' || flowpaths.dn_nex_id) AS toid, " "flowpaths.slope AS So, " + "divides.area_sqkm AS areasqkm, " // faster for later code to rename the field here "divides.geom AS geom " "FROM divides " "LEFT JOIN flowpaths " diff --git a/src/realizations/catchment/Catchment_Formulation.cpp b/src/realizations/catchment/Catchment_Formulation.cpp index 00b22e0cf0..71c2972d0d 100644 --- a/src/realizations/catchment/Catchment_Formulation.cpp +++ b/src/realizations/catchment/Catchment_Formulation.cpp @@ -59,6 +59,17 @@ namespace realization { // LOG(ss.str(), LogLevel::DEBUG); } + std::string Catchment_Formulation::config_pattern_id_replacement(const std::string &id) { + size_t index = id.find_last_of('-'); + if (index != std::string::npos && ++index < id.length()) { + // check if first character after the last hyphen is a number + if (static_cast(id[index]) - static_cast('0') <= 9) { + return id.substr(index); + } + } + return id; + } + std::string Catchment_Formulation::get_output_header_line(std::string delimiter) const { return "Total Discharge"; } From 3976d08009fe424523f06f0093a9f010fee328bf Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Wed, 25 Mar 2026 11:31:54 -0400 Subject: [PATCH 04/41] Change IIF to CASE for sqlite 3.26 support --- src/geopackage/read.cpp | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/geopackage/read.cpp b/src/geopackage/read.cpp index c2b4dc89e0..c967560ea4 100644 --- a/src/geopackage/read.cpp +++ b/src/geopackage/read.cpp @@ -75,8 +75,14 @@ std::shared_ptr ngen::geopackage::read( feature_query = "SELECT " "('nex-' || nexus.nex_id) AS id, " - "IIF(flowpaths.div_id, ('cat-' || flowpaths.div_id), 'terminal') AS toid, " - "flowpaths.slope AS So, " + "CASE " + "WHEN flowpaths.div_id IS NULL THEN 'terminal' " + "ELSE ('cat-' || flowpaths.div_id) " + "END AS toid, " + "CASE " + "WHEN flowpaths.slope IS NULL THEN 0.0 " + "ELSE flowpaths.slope " + "END AS So, " "nexus.geom AS geom " "FROM nexus " "LEFT JOIN flowpaths " From a83fbb9a8a23ddb1c10f043b74aa247aa61a2245 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Fri, 16 Jan 2026 14:06:13 -0500 Subject: [PATCH 05/41] State saving for multi-BMI --- .../catchment/Bmi_Module_Formulation.hpp | 13 +++++++++++++ src/NGen.cpp | 10 ++++++++++ 2 files changed, 23 insertions(+) diff --git a/include/realizations/catchment/Bmi_Module_Formulation.hpp b/include/realizations/catchment/Bmi_Module_Formulation.hpp index 150bd2ac38..bcc1f251e1 100644 --- a/include/realizations/catchment/Bmi_Module_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Module_Formulation.hpp @@ -57,6 +57,19 @@ namespace realization { void load_hot_start(std::shared_ptr loader) override; + /** + * Requests the BMI to copy its current state into memory. The state will remain in memory until either a new state is made or `free_save_state` is called. + * + * @param size A `uint64_t` pointer that will have its value set to the size of the serialized data. + * @return Pointer to the beginning of the serialized data. + */ + virtual const char* create_save_state(uint64_t *size) const; + + /** + * Clears any serialized data stored by the BMI from memory. + */ + virtual void free_save_state() const; + /** * Get the collection of forcing output property names this instance can provide. * diff --git a/src/NGen.cpp b/src/NGen.cpp index dd4ab6b58e..2e71f2c4a1 100644 --- a/src/NGen.cpp +++ b/src/NGen.cpp @@ -714,6 +714,16 @@ int run_ngen(int argc, char* argv[], int mpi_num_procs, int mpi_rank) { simulation->run_catchments(); + if (state_saving_config.has_end_of_run()) { + LOG("Saving end-of-run state.", LogLevel::INFO); + std::shared_ptr saver = state_saving_config.end_of_run_saver(); + std::shared_ptr snapshot = saver->initialize_snapshot( + State_Saver::snapshot_time_now(), + State_Saver::State_Durability::strict + ); + simulation->save_state_snapshot(snapshot); + } + #if NGEN_WITH_MPI MPI_Barrier(MPI_COMM_WORLD); #endif From 3fab311889a7960e175f54e55c48a02ac1e012a4 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Tue, 20 Jan 2026 11:22:00 -0500 Subject: [PATCH 06/41] Cold start loading --- include/realizations/catchment/Bmi_Formulation.hpp | 7 +++++++ .../catchment/Bmi_Module_Formulation.hpp | 13 +------------ 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/include/realizations/catchment/Bmi_Formulation.hpp b/include/realizations/catchment/Bmi_Formulation.hpp index 91236fea42..02aab75fb7 100644 --- a/include/realizations/catchment/Bmi_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Formulation.hpp @@ -95,6 +95,13 @@ namespace realization { */ virtual void load_hot_start(std::shared_ptr loader) = 0; + /** + * Passes a serialized representation of the model's state to ``loader`` + * + * Asks saver to find data for the BMI and passes that data to the BMI for loading. + */ + virtual void load_state(std::shared_ptr loader) const = 0; + /** * Convert a time value from the model to an epoch time in seconds. * diff --git a/include/realizations/catchment/Bmi_Module_Formulation.hpp b/include/realizations/catchment/Bmi_Module_Formulation.hpp index bcc1f251e1..c16e5269e7 100644 --- a/include/realizations/catchment/Bmi_Module_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Module_Formulation.hpp @@ -57,18 +57,7 @@ namespace realization { void load_hot_start(std::shared_ptr loader) override; - /** - * Requests the BMI to copy its current state into memory. The state will remain in memory until either a new state is made or `free_save_state` is called. - * - * @param size A `uint64_t` pointer that will have its value set to the size of the serialized data. - * @return Pointer to the beginning of the serialized data. - */ - virtual const char* create_save_state(uint64_t *size) const; - - /** - * Clears any serialized data stored by the BMI from memory. - */ - virtual void free_save_state() const; + void load_state(std::shared_ptr loader) const override; /** * Get the collection of forcing output property names this instance can provide. From 19fd33de525bb413ddaade0b42ab5e2ebea7d987 Mon Sep 17 00:00:00 2001 From: "Carolyn.Maynard" Date: Thu, 8 Jan 2026 09:56:18 -0800 Subject: [PATCH 07/41] Docker updates for python ewts packages --- extern/t-route | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extern/t-route b/extern/t-route index e2f7d5dcb7..346db58b67 160000 --- a/extern/t-route +++ b/extern/t-route @@ -1 +1 @@ -Subproject commit e2f7d5dcb7efcc684523f759b438ad250543ccdd +Subproject commit 346db58b6779089bfde7c9519783ecfb39d95a95 From 7c7cad192bbbe50293ee493a1470cf4c3ca37cc0 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Fri, 6 Feb 2026 10:26:57 -0500 Subject: [PATCH 08/41] Use Boost for serializing Multi-BMI --- include/realizations/catchment/Bmi_Module_Formulation.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/realizations/catchment/Bmi_Module_Formulation.hpp b/include/realizations/catchment/Bmi_Module_Formulation.hpp index c16e5269e7..fbd73c61f0 100644 --- a/include/realizations/catchment/Bmi_Module_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Module_Formulation.hpp @@ -57,7 +57,7 @@ namespace realization { void load_hot_start(std::shared_ptr loader) override; - void load_state(std::shared_ptr loader) const override; + void load_state(std::shared_ptr loader) override; /** * Get the collection of forcing output property names this instance can provide. From 0928f655f717476109d53556f80850ecd47e6ec2 Mon Sep 17 00:00:00 2001 From: Miguel Pena Date: Wed, 4 Feb 2026 20:25:43 -0800 Subject: [PATCH 09/41] updates to cicd and dockerfile --- .github/workflows/ngwpc-cicd.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ngwpc-cicd.yml b/.github/workflows/ngwpc-cicd.yml index 87c8471898..19187ddbb5 100644 --- a/.github/workflows/ngwpc-cicd.yml +++ b/.github/workflows/ngwpc-cicd.yml @@ -274,6 +274,8 @@ jobs: - name: Build & push image id: build_image uses: docker/build-push-action@v6 + env: + BUILD_DATE: ${{ env.BUILD_DATE }} with: context: . # file: Dockerfile.test # comment out when done testing From c7a9da53de46aeba3fa85d6bfbdd506ff340eb21 Mon Sep 17 00:00:00 2001 From: Miguel Pena Date: Tue, 10 Feb 2026 20:56:44 -0800 Subject: [PATCH 10/41] updated cicd file --- .github/workflows/ngwpc-cicd.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/ngwpc-cicd.yml b/.github/workflows/ngwpc-cicd.yml index 19187ddbb5..87c8471898 100644 --- a/.github/workflows/ngwpc-cicd.yml +++ b/.github/workflows/ngwpc-cicd.yml @@ -274,8 +274,6 @@ jobs: - name: Build & push image id: build_image uses: docker/build-push-action@v6 - env: - BUILD_DATE: ${{ env.BUILD_DATE }} with: context: . # file: Dockerfile.test # comment out when done testing From 84ad6315f2d3a1cd1d16fcb8c51e9ff9ffeb72a0 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Thu, 12 Feb 2026 09:47:23 -0500 Subject: [PATCH 11/41] Merge remote-tracking branch 'NOAA-OWP/master' into development --- .../catchment/Bmi_Multi_Formulation.hpp | 9 ++ src/core/nexus/HY_PointHydroNexusRemote.cpp | 126 ++++++++++++------ 2 files changed, 92 insertions(+), 43 deletions(-) diff --git a/include/realizations/catchment/Bmi_Multi_Formulation.hpp b/include/realizations/catchment/Bmi_Multi_Formulation.hpp index 3387f9fa1e..f9968695de 100644 --- a/include/realizations/catchment/Bmi_Multi_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Multi_Formulation.hpp @@ -62,6 +62,15 @@ namespace realization { void load_state(std::shared_ptr loader) override; void load_hot_start(std::shared_ptr loader) override; + + virtual void check_mass_balance(const int& iteration, const int& total_steps, const std::string& timestamp) const final { + for( const auto &module : modules ) { + // TODO may need to check on outputs form each module indepdently??? + // Right now, the assumption is that if each component is mass balanced + // then the entire formulation is mass balanced + module->check_mass_balance(iteration, total_steps, timestamp); + } + }; /** * Convert a time value from the model to an epoch time in seconds. diff --git a/src/core/nexus/HY_PointHydroNexusRemote.cpp b/src/core/nexus/HY_PointHydroNexusRemote.cpp index 3dd9323949..f195d16e56 100644 --- a/src/core/nexus/HY_PointHydroNexusRemote.cpp +++ b/src/core/nexus/HY_PointHydroNexusRemote.cpp @@ -19,7 +19,7 @@ void MPI_Handle_Error(int status) } else { - MPI_Abort(MPI_COMM_WORLD,1); + MPI_Abort(MPI_COMM_WORLD, status); } } @@ -93,9 +93,9 @@ HY_PointHydroNexusRemote::HY_PointHydroNexusRemote(std::string nexus_id, Catchme HY_PointHydroNexusRemote::~HY_PointHydroNexusRemote() { - long wait_time = 0; - - // This destructore might be called after MPI_Finalize so do not attempt communication if + const unsigned int timeout = 120000; // timeout threshold in milliseconds + unsigned int wait_time = 0; + // This destructor might be called after MPI_Finalize so do not attempt communication if // this has occured int mpi_finalized; MPI_Finalized(&mpi_finalized); @@ -106,14 +106,46 @@ HY_PointHydroNexusRemote::~HY_PointHydroNexusRemote() process_communications(); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - + if( wait_time < timeout && wait_time > 0 ){ // don't sleep if first call clears comms! + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + else + { + std::cerr << "HY_PointHydroNexusRemote: "<< id + << " destructor timed out after " << timeout/1000 + << " seconds waiting on pending MPI communications\n"; + // The return is is probably best, logging the error. + // There is no good way to recover from this. + // Throwing an exception from destructors is generally not a good idea + // as it can lead to undefined behavior. + // and using std::exit forces the program to terminate immediately, + // even if this situation is recoverable/acceptable in some cases. + return; + } wait_time += 1; + MPI_Finalized(&mpi_finalized); + } +} - if ( wait_time > 120000 ) +void HY_PointHydroNexusRemote::post_receives() +{ + // Post receives if not already posted (for pure receiver nexuses) + if (stored_receives.empty()) + { + for (int rank : upstream_ranks) { - // TODO log warning message that some comunications could not complete - + stored_receives.push_back({}); + stored_receives.back().buffer = std::make_shared(); + int tag = extract(id); + + MPI_Handle_Error(MPI_Irecv( + stored_receives.back().buffer.get(), + 1, + time_step_and_flow_type, + rank, + tag, + MPI_COMM_WORLD, + &stored_receives.back().mpi_request)); } } } @@ -132,31 +164,15 @@ double HY_PointHydroNexusRemote::get_downstream_flow(std::string catchment_id, t } else if ( type == receiver || type == sender_receiver ) { - for ( int rank : upstream_ranks ) - { - int status; - - stored_receives.resize(stored_receives.size() + 1); - stored_receives.back().buffer = std::make_shared(); - - int tag = extract(id); - - //Receive downstream_flow from Upstream Remote Nexus to this Downstream Remote Nexus - status = MPI_Irecv( - stored_receives.back().buffer.get(), - 1, - time_step_and_flow_type, - rank, - tag, - MPI_COMM_WORLD, - &stored_receives.back().mpi_request); - - MPI_Handle_Error(status); - - //std::cerr << "Creating receive with target_rank=" << rank << " on tag=" << tag << "\n"; - } - - //std::cerr << "Waiting on receives\n"; + post_receives(); + // Wait for receives to complete + // This ensures all upstream flows are received before returning + // and that we have matched all sends with receives for a given time step. + // As long as the functions are called appropriately, e.g. one call to + // `add_upstream_flow` per upstream catchment per time step, followed + // by a call to `get_downstream_flow` for each downstream catchment per time step, + // this loop will terminate and ensures the synchronization of flows between + // ranks. while ( stored_receives.size() > 0 ) { process_communications(); @@ -169,6 +185,28 @@ double HY_PointHydroNexusRemote::get_downstream_flow(std::string catchment_id, t void HY_PointHydroNexusRemote::add_upstream_flow(double val, std::string catchment_id, time_step_t t) { + // Process any completed communications to free resources + // If no communications are pending, this call will do nothing. + process_communications(); + // NOTE: It is possible for a partition to get "too far" ahead since the sends are now + // truely asynchronous. For pure receivers and sender_receivers, this isn't a problem + // because the get_downstream_flow function will block until all receives are processed. + // However, for pure senders, this could be a problem. + // We can use this spinlock here to limit how far ahead a partition can get. + // in this case, approximately 100 time steps per downstream catchment... + while( stored_sends.size() > downstream_ranks.size()*100 ) + { + process_communications(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + // Post receives before sending to prevent deadlock + // When stored_receives is empty, we need to post for incoming messages + if ((type == receiver || type == sender_receiver) && stored_receives.empty()) + { + post_receives(); + } + // first add flow to local copy HY_PointHydroNexus::add_upstream_flow(val, catchment_id, t); @@ -207,23 +245,25 @@ void HY_PointHydroNexusRemote::add_upstream_flow(double val, std::string catchme int tag = extract(id); //Send downstream_flow from this Upstream Remote Nexus to the Downstream Remote Nexus - MPI_Isend( + MPI_Handle_Error( + MPI_Isend( stored_sends.back().buffer.get(), 1, time_step_and_flow_type, *downstream_ranks.begin(), //TODO currently only support a SINGLE downstream message pairing tag, MPI_COMM_WORLD, - &stored_sends.back().mpi_request); - - //std::cerr << "Creating send with target_rank=" << *downstream_ranks.begin() << " on tag=" << tag << "\n"; + &stored_sends.back().mpi_request) + ); + //std::cerr << "Creating send with target_rank=" << *downstream_ranks.begin() << " on tag=" << tag << "\n"; - while ( stored_sends.size() > 0 ) - { - process_communications(); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } + // Send is async, the next call to add_upstream_flow will test and ensure the send has completed + // and free the memory associated with the send. + // This prevents a potential deadlock situation where a send isn't able to complete + // because the remote receiver is also trying to send and the underlying mpi buffers/protocol + // are forced into a rendevous protocol. So we ensure that we always post receives before sends. + // and that we always test for completed sends before freeing the memory associated with the send. } } } From 5bc48d43298d924f8bf7b3e80b993ebdc38f86ac Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Tue, 24 Feb 2026 14:10:00 -0500 Subject: [PATCH 12/41] Align Fortran state size with sizeof int --- src/realizations/catchment/Bmi_Fortran_Formulation.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/realizations/catchment/Bmi_Fortran_Formulation.cpp b/src/realizations/catchment/Bmi_Fortran_Formulation.cpp index 04b7e7e51a..04a3f916b0 100644 --- a/src/realizations/catchment/Bmi_Fortran_Formulation.cpp +++ b/src/realizations/catchment/Bmi_Fortran_Formulation.cpp @@ -104,6 +104,7 @@ const boost::span Bmi_Fortran_Formulation::get_serialization_state() { // create the serialized state on the Fortran BMI int size_int = 0; model->SetValue(StateSaveNames::CREATE, &size_int); + // the size coming in should be the number of int elements in the Fortran backing array, not the byte size of the array model->GetValue(StateSaveNames::SIZE, &size_int); // resize the state to the array to the size of the Fortran's backing array this->serialized_state.resize(size_int); From 7fdc6d85bdad1d3484da31b70145cf6e1c3f89be Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Mon, 2 Mar 2026 10:27:53 -0500 Subject: [PATCH 13/41] Fortran reports size in bytes --- src/realizations/catchment/Bmi_Fortran_Formulation.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/realizations/catchment/Bmi_Fortran_Formulation.cpp b/src/realizations/catchment/Bmi_Fortran_Formulation.cpp index 04a3f916b0..04b7e7e51a 100644 --- a/src/realizations/catchment/Bmi_Fortran_Formulation.cpp +++ b/src/realizations/catchment/Bmi_Fortran_Formulation.cpp @@ -104,7 +104,6 @@ const boost::span Bmi_Fortran_Formulation::get_serialization_state() { // create the serialized state on the Fortran BMI int size_int = 0; model->SetValue(StateSaveNames::CREATE, &size_int); - // the size coming in should be the number of int elements in the Fortran backing array, not the byte size of the array model->GetValue(StateSaveNames::SIZE, &size_int); // resize the state to the array to the size of the Fortran's backing array this->serialized_state.resize(size_int); From 9994c747745fe6cab3b120056233642ff426b9d7 Mon Sep 17 00:00:00 2001 From: hellkite500 Date: Tue, 16 Sep 2025 18:07:30 -0600 Subject: [PATCH 14/41] feat(ngen): add mass balance check for all bmi modules during runtime --- include/realizations/catchment/Bmi_Module_Formulation.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/include/realizations/catchment/Bmi_Module_Formulation.hpp b/include/realizations/catchment/Bmi_Module_Formulation.hpp index fbd73c61f0..2e61e2e216 100644 --- a/include/realizations/catchment/Bmi_Module_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Module_Formulation.hpp @@ -7,6 +7,7 @@ #include "Bmi_Adapter.hpp" #include #include "bmi_utilities.hpp" +#include "bmi/protocols.hpp" #include #include "bmi/protocols.hpp" From 28047e2c796dbcbcf8a5e36f513a1e58b928236f Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Wed, 25 Feb 2026 10:12:57 -0500 Subject: [PATCH 15/41] Revert changes from OWP --- .../core/nexus/HY_PointHydroNexusRemote.hpp | 1 - src/core/nexus/HY_PointHydroNexusRemote.cpp | 126 ++++++------------ 2 files changed, 43 insertions(+), 84 deletions(-) diff --git a/include/core/nexus/HY_PointHydroNexusRemote.hpp b/include/core/nexus/HY_PointHydroNexusRemote.hpp index aa3857d67b..c9cfafd613 100644 --- a/include/core/nexus/HY_PointHydroNexusRemote.hpp +++ b/include/core/nexus/HY_PointHydroNexusRemote.hpp @@ -72,7 +72,6 @@ class HY_PointHydroNexusRemote : public HY_PointHydroNexus communication_type get_communicator_type() { return type; } private: - void post_receives(); void process_communications(); int world_rank; diff --git a/src/core/nexus/HY_PointHydroNexusRemote.cpp b/src/core/nexus/HY_PointHydroNexusRemote.cpp index f195d16e56..3dd9323949 100644 --- a/src/core/nexus/HY_PointHydroNexusRemote.cpp +++ b/src/core/nexus/HY_PointHydroNexusRemote.cpp @@ -19,7 +19,7 @@ void MPI_Handle_Error(int status) } else { - MPI_Abort(MPI_COMM_WORLD, status); + MPI_Abort(MPI_COMM_WORLD,1); } } @@ -93,9 +93,9 @@ HY_PointHydroNexusRemote::HY_PointHydroNexusRemote(std::string nexus_id, Catchme HY_PointHydroNexusRemote::~HY_PointHydroNexusRemote() { - const unsigned int timeout = 120000; // timeout threshold in milliseconds - unsigned int wait_time = 0; - // This destructor might be called after MPI_Finalize so do not attempt communication if + long wait_time = 0; + + // This destructore might be called after MPI_Finalize so do not attempt communication if // this has occured int mpi_finalized; MPI_Finalized(&mpi_finalized); @@ -106,46 +106,14 @@ HY_PointHydroNexusRemote::~HY_PointHydroNexusRemote() process_communications(); - if( wait_time < timeout && wait_time > 0 ){ // don't sleep if first call clears comms! - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - else - { - std::cerr << "HY_PointHydroNexusRemote: "<< id - << " destructor timed out after " << timeout/1000 - << " seconds waiting on pending MPI communications\n"; - // The return is is probably best, logging the error. - // There is no good way to recover from this. - // Throwing an exception from destructors is generally not a good idea - // as it can lead to undefined behavior. - // and using std::exit forces the program to terminate immediately, - // even if this situation is recoverable/acceptable in some cases. - return; - } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + wait_time += 1; - MPI_Finalized(&mpi_finalized); - } -} -void HY_PointHydroNexusRemote::post_receives() -{ - // Post receives if not already posted (for pure receiver nexuses) - if (stored_receives.empty()) - { - for (int rank : upstream_ranks) + if ( wait_time > 120000 ) { - stored_receives.push_back({}); - stored_receives.back().buffer = std::make_shared(); - int tag = extract(id); - - MPI_Handle_Error(MPI_Irecv( - stored_receives.back().buffer.get(), - 1, - time_step_and_flow_type, - rank, - tag, - MPI_COMM_WORLD, - &stored_receives.back().mpi_request)); + // TODO log warning message that some comunications could not complete + } } } @@ -164,15 +132,31 @@ double HY_PointHydroNexusRemote::get_downstream_flow(std::string catchment_id, t } else if ( type == receiver || type == sender_receiver ) { - post_receives(); - // Wait for receives to complete - // This ensures all upstream flows are received before returning - // and that we have matched all sends with receives for a given time step. - // As long as the functions are called appropriately, e.g. one call to - // `add_upstream_flow` per upstream catchment per time step, followed - // by a call to `get_downstream_flow` for each downstream catchment per time step, - // this loop will terminate and ensures the synchronization of flows between - // ranks. + for ( int rank : upstream_ranks ) + { + int status; + + stored_receives.resize(stored_receives.size() + 1); + stored_receives.back().buffer = std::make_shared(); + + int tag = extract(id); + + //Receive downstream_flow from Upstream Remote Nexus to this Downstream Remote Nexus + status = MPI_Irecv( + stored_receives.back().buffer.get(), + 1, + time_step_and_flow_type, + rank, + tag, + MPI_COMM_WORLD, + &stored_receives.back().mpi_request); + + MPI_Handle_Error(status); + + //std::cerr << "Creating receive with target_rank=" << rank << " on tag=" << tag << "\n"; + } + + //std::cerr << "Waiting on receives\n"; while ( stored_receives.size() > 0 ) { process_communications(); @@ -185,28 +169,6 @@ double HY_PointHydroNexusRemote::get_downstream_flow(std::string catchment_id, t void HY_PointHydroNexusRemote::add_upstream_flow(double val, std::string catchment_id, time_step_t t) { - // Process any completed communications to free resources - // If no communications are pending, this call will do nothing. - process_communications(); - // NOTE: It is possible for a partition to get "too far" ahead since the sends are now - // truely asynchronous. For pure receivers and sender_receivers, this isn't a problem - // because the get_downstream_flow function will block until all receives are processed. - // However, for pure senders, this could be a problem. - // We can use this spinlock here to limit how far ahead a partition can get. - // in this case, approximately 100 time steps per downstream catchment... - while( stored_sends.size() > downstream_ranks.size()*100 ) - { - process_communications(); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - - // Post receives before sending to prevent deadlock - // When stored_receives is empty, we need to post for incoming messages - if ((type == receiver || type == sender_receiver) && stored_receives.empty()) - { - post_receives(); - } - // first add flow to local copy HY_PointHydroNexus::add_upstream_flow(val, catchment_id, t); @@ -245,25 +207,23 @@ void HY_PointHydroNexusRemote::add_upstream_flow(double val, std::string catchme int tag = extract(id); //Send downstream_flow from this Upstream Remote Nexus to the Downstream Remote Nexus - MPI_Handle_Error( - MPI_Isend( + MPI_Isend( stored_sends.back().buffer.get(), 1, time_step_and_flow_type, *downstream_ranks.begin(), //TODO currently only support a SINGLE downstream message pairing tag, MPI_COMM_WORLD, - &stored_sends.back().mpi_request) - ); + &stored_sends.back().mpi_request); + + //std::cerr << "Creating send with target_rank=" << *downstream_ranks.begin() << " on tag=" << tag << "\n"; - //std::cerr << "Creating send with target_rank=" << *downstream_ranks.begin() << " on tag=" << tag << "\n"; - // Send is async, the next call to add_upstream_flow will test and ensure the send has completed - // and free the memory associated with the send. - // This prevents a potential deadlock situation where a send isn't able to complete - // because the remote receiver is also trying to send and the underlying mpi buffers/protocol - // are forced into a rendevous protocol. So we ensure that we always post receives before sends. - // and that we always test for completed sends before freeing the memory associated with the send. + while ( stored_sends.size() > 0 ) + { + process_communications(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } } } } From 9a9920aab5b211474d2fb0624820e0c44bda78a2 Mon Sep 17 00:00:00 2001 From: "Carolyn.Maynard" Date: Mon, 9 Mar 2026 18:11:46 -0700 Subject: [PATCH 16/41] Use new nwm-ewts libraries and python package EWTS versions to build summary output to console Updates to CMakeLists.txt for integrating the nwm-ewts libraries Initial conversion to ewts_ngen_bridge library. Updated CMakeLists. Replaced logMsgAndThrowError with separate calls to LOG and throw. --- Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 8b721f514a..0151ff5070 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,10 +7,10 @@ ARG ORG=ngwpc ARG NGEN_FORCING_IMAGE_TAG=latest ARG NGEN_FORCING_IMAGE=ghcr.io/${ORG}/ngen-bmi-forcing:${NGEN_FORCING_IMAGE_TAG} -FROM ${NGEN_FORCING_IMAGE} AS base +#FROM ${NGEN_FORCING_IMAGE} AS base # Uncomment when building locally -#FROM ngen-bmi-forcing AS base +FROM ngen-bmi-forcing AS base # OCI Metadata Arguments ARG NGEN_FORCING_IMAGE From 28d992af8e0714e04571349b4dd73b44c9b978c1 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Thu, 26 Mar 2026 14:49:47 -0400 Subject: [PATCH 17/41] EWST function name change --- src/geopackage/read.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/geopackage/read.cpp b/src/geopackage/read.cpp index c967560ea4..8f9efa8283 100644 --- a/src/geopackage/read.cpp +++ b/src/geopackage/read.cpp @@ -88,7 +88,7 @@ std::shared_ptr ngen::geopackage::read( "LEFT JOIN flowpaths " "ON nexus.dn_fp_id = flowpaths.fp_id"; } else { - Logger::logMsgAndThrowError("Geopackage read only accepts layers `divides` and `nexus`. The layer entered was " + layer); + Logger::LogAndThrow("Geopackage read only accepts layers `divides` and `nexus`. The layer entered was " + layer); } std::string joined_ids = ""; @@ -107,7 +107,7 @@ std::shared_ptr ngen::geopackage::read( } int id_num = std::atoi(filter_id.c_str() + sep_index); if (id_num <= 0) - Logger::logMsgAndThrowError("Could not convert input " + layer + " ID into a number: " + filter_id); + Logger::LogAndThrow("Could not convert input " + layer + " ID into a number: " + filter_id); filter << id_num; } filter << ')'; From cb44c878845f6c59f02b2df1c8a7b5b38d6917ab Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Thu, 26 Mar 2026 15:49:40 -0400 Subject: [PATCH 18/41] Fix rebase problems --- src/partitionGenerator.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/partitionGenerator.cpp b/src/partitionGenerator.cpp index 6b370f3443..b52754bf00 100644 --- a/src/partitionGenerator.cpp +++ b/src/partitionGenerator.cpp @@ -433,7 +433,7 @@ int main(int argc, char* argv[]) { #if NGEN_WITH_SQLITE3 try { - catchment_collection = ngen::geopackage::read(catchmentDataFile, "divides", catchment_subset_ids); + catchment_collection = ngen::geopackage::read(catchmentDataFile, "divides", catchment_subset_ids); } catch (std::exception &e) { // Handle all exceptions std::string msg = "Geopackage error occurred reading divides: " + catchmentDataFile; @@ -482,7 +482,7 @@ int main(int argc, char* argv[]) throw; } #else - LOG(msg,LogLevel::FATAL, "SQLite3 support required to read GeoPackage files."); + LOG(LogLevel::FATAL, "SQLite3 support required to read GeoPackage files."); throw std::runtime_error("SQLite3 support required to read GeoPackage files."); #endif } From 07e989f9f4d77365f3a6367f19b26ae98dfac090 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Thu, 2 Apr 2026 08:24:04 -0400 Subject: [PATCH 19/41] Ensure realization catchment IDs start with "cat-" when searching the geometry fabric --- .../realizations/catchment/Formulation_Manager.hpp | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/include/realizations/catchment/Formulation_Manager.hpp b/include/realizations/catchment/Formulation_Manager.hpp index 5584b03115..746129b90e 100644 --- a/include/realizations/catchment/Formulation_Manager.hpp +++ b/include/realizations/catchment/Formulation_Manager.hpp @@ -170,13 +170,18 @@ namespace realization { for (std::pair catchment_config : *possible_catchment_configs) { ss.str(""); ss << "Processing catchment: " << catchment_config.first << std::endl; LOG(ss.str(), LogLevel::DEBUG); + // ensure catchment's ID starts with "cat-" so it can be found in the fabric + std::string catchment_id = catchment_config.first; + if (strncmp(catchment_id.c_str(), "cat-", 4) != 0) { + catchment_id = "cat-" + catchment_id; + } - int catchment_index = fabric->find(catchment_config.first); + int catchment_index = fabric->find(catchment_id); if (catchment_index == -1) { #ifndef NGEN_QUIET ss.str(""); ss << "Formulation_Manager::read: Cannot create formulation for catchment " - << catchment_config.first + << catchment_id << " that isn't identified in the hydrofabric or requested subset" << std::endl; LOG(ss.str(), LogLevel::WARNING); #endif @@ -200,7 +205,7 @@ namespace realization { this->add_formulation( this->construct_formulation_from_config( simulation_time_config, - "cat-" + catchment_config.first, + catchment_id, catchment_formulation, output_stream ) From 27f7fbe4dbe133ba33bd6be91ca59044d44c0ed5 Mon Sep 17 00:00:00 2001 From: Phil Miller Date: Mon, 8 Dec 2025 17:53:19 -0800 Subject: [PATCH 20/41] Add logic and structures for parsing state saving configuration from realization config --- src/state_save_restore/CMakeLists.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/src/state_save_restore/CMakeLists.txt b/src/state_save_restore/CMakeLists.txt index b068d6d4ab..099e46124d 100644 --- a/src/state_save_restore/CMakeLists.txt +++ b/src/state_save_restore/CMakeLists.txt @@ -14,4 +14,3 @@ target_include_directories(state_save_restore PUBLIC ${PROJECT_SOURCE_DIR}/include ) - From a542066ebffb1b76638932fbb78aacd1d34b8298 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Fri, 16 Jan 2026 14:06:13 -0500 Subject: [PATCH 21/41] State saving for multi-BMI --- .../catchment/Bmi_Module_Formulation.hpp | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/include/realizations/catchment/Bmi_Module_Formulation.hpp b/include/realizations/catchment/Bmi_Module_Formulation.hpp index 2e61e2e216..8f6facf558 100644 --- a/include/realizations/catchment/Bmi_Module_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Module_Formulation.hpp @@ -60,6 +60,19 @@ namespace realization { void load_state(std::shared_ptr loader) override; + /** + * Requests the BMI to copy its current state into memory. The state will remain in memory until either a new state is made or `free_save_state` is called. + * + * @param size A `uint64_t` pointer that will have its value set to the size of the serialized data. + * @return Pointer to the beginning of the serialized data. + */ + virtual const char* create_save_state(uint64_t *size) const; + + /** + * Clears any serialized data stored by the BMI from memory. + */ + virtual void free_save_state() const; + /** * Get the collection of forcing output property names this instance can provide. * From 105cddf9c019bb9c2add6c4bd23ff92bf5954f9c Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Tue, 20 Jan 2026 11:22:00 -0500 Subject: [PATCH 22/41] Cold start loading --- include/realizations/catchment/Bmi_Multi_Formulation.hpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/include/realizations/catchment/Bmi_Multi_Formulation.hpp b/include/realizations/catchment/Bmi_Multi_Formulation.hpp index f9968695de..8dd56f027d 100644 --- a/include/realizations/catchment/Bmi_Multi_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Multi_Formulation.hpp @@ -72,6 +72,8 @@ namespace realization { } }; + void load_state(std::shared_ptr loader) const override; + /** * Convert a time value from the model to an epoch time in seconds. * From 8524e1517288dc612143743503622d3c362c89ef Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Fri, 6 Feb 2026 10:26:57 -0500 Subject: [PATCH 23/41] Use Boost for serializing Multi-BMI --- include/realizations/catchment/Bmi_Formulation.hpp | 7 ------- include/realizations/catchment/Bmi_Multi_Formulation.hpp | 2 -- 2 files changed, 9 deletions(-) diff --git a/include/realizations/catchment/Bmi_Formulation.hpp b/include/realizations/catchment/Bmi_Formulation.hpp index 02aab75fb7..91236fea42 100644 --- a/include/realizations/catchment/Bmi_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Formulation.hpp @@ -95,13 +95,6 @@ namespace realization { */ virtual void load_hot_start(std::shared_ptr loader) = 0; - /** - * Passes a serialized representation of the model's state to ``loader`` - * - * Asks saver to find data for the BMI and passes that data to the BMI for loading. - */ - virtual void load_state(std::shared_ptr loader) const = 0; - /** * Convert a time value from the model to an epoch time in seconds. * diff --git a/include/realizations/catchment/Bmi_Multi_Formulation.hpp b/include/realizations/catchment/Bmi_Multi_Formulation.hpp index 8dd56f027d..f9968695de 100644 --- a/include/realizations/catchment/Bmi_Multi_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Multi_Formulation.hpp @@ -72,8 +72,6 @@ namespace realization { } }; - void load_state(std::shared_ptr loader) const override; - /** * Convert a time value from the model to an epoch time in seconds. * From 2b679c2282bdc29c1fe328092c8b1d0ce0e65da8 Mon Sep 17 00:00:00 2001 From: Miguel Pena Date: Wed, 4 Feb 2026 20:25:43 -0800 Subject: [PATCH 24/41] updates to cicd and dockerfile --- .github/workflows/ngwpc-cicd.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ngwpc-cicd.yml b/.github/workflows/ngwpc-cicd.yml index 87c8471898..19187ddbb5 100644 --- a/.github/workflows/ngwpc-cicd.yml +++ b/.github/workflows/ngwpc-cicd.yml @@ -274,6 +274,8 @@ jobs: - name: Build & push image id: build_image uses: docker/build-push-action@v6 + env: + BUILD_DATE: ${{ env.BUILD_DATE }} with: context: . # file: Dockerfile.test # comment out when done testing From 66ed20b224e3a45e302ac8d4a70116fbf990d5c1 Mon Sep 17 00:00:00 2001 From: Miguel Pena Date: Tue, 10 Feb 2026 20:56:44 -0800 Subject: [PATCH 25/41] updated cicd file --- .github/workflows/ngwpc-cicd.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/ngwpc-cicd.yml b/.github/workflows/ngwpc-cicd.yml index 19187ddbb5..87c8471898 100644 --- a/.github/workflows/ngwpc-cicd.yml +++ b/.github/workflows/ngwpc-cicd.yml @@ -274,8 +274,6 @@ jobs: - name: Build & push image id: build_image uses: docker/build-push-action@v6 - env: - BUILD_DATE: ${{ env.BUILD_DATE }} with: context: . # file: Dockerfile.test # comment out when done testing From 41c9aaf882e76a751d4d93b305e666afaca9796d Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Thu, 12 Feb 2026 09:47:23 -0500 Subject: [PATCH 26/41] Merge remote-tracking branch 'NOAA-OWP/master' into development --- CMakeLists.txt | 1 - .../catchment/Bmi_Module_Formulation.hpp | 3 - src/core/nexus/HY_PointHydroNexusRemote.cpp | 126 ++++++++++++------ 3 files changed, 83 insertions(+), 47 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index a320eab3a0..28d71f09b0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -372,7 +372,6 @@ target_link_libraries(ngen NGen::parallel NGen::state_save_restore NGen::bmi_protocols - NGen::state_save_restore ) if(NGEN_WITH_SQLITE) diff --git a/include/realizations/catchment/Bmi_Module_Formulation.hpp b/include/realizations/catchment/Bmi_Module_Formulation.hpp index 8f6facf558..f7fc898f11 100644 --- a/include/realizations/catchment/Bmi_Module_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Module_Formulation.hpp @@ -9,9 +9,6 @@ #include "bmi_utilities.hpp" #include "bmi/protocols.hpp" -#include -#include "bmi/protocols.hpp" - using data_access::MEAN; using data_access::SUM; diff --git a/src/core/nexus/HY_PointHydroNexusRemote.cpp b/src/core/nexus/HY_PointHydroNexusRemote.cpp index 3dd9323949..f195d16e56 100644 --- a/src/core/nexus/HY_PointHydroNexusRemote.cpp +++ b/src/core/nexus/HY_PointHydroNexusRemote.cpp @@ -19,7 +19,7 @@ void MPI_Handle_Error(int status) } else { - MPI_Abort(MPI_COMM_WORLD,1); + MPI_Abort(MPI_COMM_WORLD, status); } } @@ -93,9 +93,9 @@ HY_PointHydroNexusRemote::HY_PointHydroNexusRemote(std::string nexus_id, Catchme HY_PointHydroNexusRemote::~HY_PointHydroNexusRemote() { - long wait_time = 0; - - // This destructore might be called after MPI_Finalize so do not attempt communication if + const unsigned int timeout = 120000; // timeout threshold in milliseconds + unsigned int wait_time = 0; + // This destructor might be called after MPI_Finalize so do not attempt communication if // this has occured int mpi_finalized; MPI_Finalized(&mpi_finalized); @@ -106,14 +106,46 @@ HY_PointHydroNexusRemote::~HY_PointHydroNexusRemote() process_communications(); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - + if( wait_time < timeout && wait_time > 0 ){ // don't sleep if first call clears comms! + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + else + { + std::cerr << "HY_PointHydroNexusRemote: "<< id + << " destructor timed out after " << timeout/1000 + << " seconds waiting on pending MPI communications\n"; + // The return is is probably best, logging the error. + // There is no good way to recover from this. + // Throwing an exception from destructors is generally not a good idea + // as it can lead to undefined behavior. + // and using std::exit forces the program to terminate immediately, + // even if this situation is recoverable/acceptable in some cases. + return; + } wait_time += 1; + MPI_Finalized(&mpi_finalized); + } +} - if ( wait_time > 120000 ) +void HY_PointHydroNexusRemote::post_receives() +{ + // Post receives if not already posted (for pure receiver nexuses) + if (stored_receives.empty()) + { + for (int rank : upstream_ranks) { - // TODO log warning message that some comunications could not complete - + stored_receives.push_back({}); + stored_receives.back().buffer = std::make_shared(); + int tag = extract(id); + + MPI_Handle_Error(MPI_Irecv( + stored_receives.back().buffer.get(), + 1, + time_step_and_flow_type, + rank, + tag, + MPI_COMM_WORLD, + &stored_receives.back().mpi_request)); } } } @@ -132,31 +164,15 @@ double HY_PointHydroNexusRemote::get_downstream_flow(std::string catchment_id, t } else if ( type == receiver || type == sender_receiver ) { - for ( int rank : upstream_ranks ) - { - int status; - - stored_receives.resize(stored_receives.size() + 1); - stored_receives.back().buffer = std::make_shared(); - - int tag = extract(id); - - //Receive downstream_flow from Upstream Remote Nexus to this Downstream Remote Nexus - status = MPI_Irecv( - stored_receives.back().buffer.get(), - 1, - time_step_and_flow_type, - rank, - tag, - MPI_COMM_WORLD, - &stored_receives.back().mpi_request); - - MPI_Handle_Error(status); - - //std::cerr << "Creating receive with target_rank=" << rank << " on tag=" << tag << "\n"; - } - - //std::cerr << "Waiting on receives\n"; + post_receives(); + // Wait for receives to complete + // This ensures all upstream flows are received before returning + // and that we have matched all sends with receives for a given time step. + // As long as the functions are called appropriately, e.g. one call to + // `add_upstream_flow` per upstream catchment per time step, followed + // by a call to `get_downstream_flow` for each downstream catchment per time step, + // this loop will terminate and ensures the synchronization of flows between + // ranks. while ( stored_receives.size() > 0 ) { process_communications(); @@ -169,6 +185,28 @@ double HY_PointHydroNexusRemote::get_downstream_flow(std::string catchment_id, t void HY_PointHydroNexusRemote::add_upstream_flow(double val, std::string catchment_id, time_step_t t) { + // Process any completed communications to free resources + // If no communications are pending, this call will do nothing. + process_communications(); + // NOTE: It is possible for a partition to get "too far" ahead since the sends are now + // truely asynchronous. For pure receivers and sender_receivers, this isn't a problem + // because the get_downstream_flow function will block until all receives are processed. + // However, for pure senders, this could be a problem. + // We can use this spinlock here to limit how far ahead a partition can get. + // in this case, approximately 100 time steps per downstream catchment... + while( stored_sends.size() > downstream_ranks.size()*100 ) + { + process_communications(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + // Post receives before sending to prevent deadlock + // When stored_receives is empty, we need to post for incoming messages + if ((type == receiver || type == sender_receiver) && stored_receives.empty()) + { + post_receives(); + } + // first add flow to local copy HY_PointHydroNexus::add_upstream_flow(val, catchment_id, t); @@ -207,23 +245,25 @@ void HY_PointHydroNexusRemote::add_upstream_flow(double val, std::string catchme int tag = extract(id); //Send downstream_flow from this Upstream Remote Nexus to the Downstream Remote Nexus - MPI_Isend( + MPI_Handle_Error( + MPI_Isend( stored_sends.back().buffer.get(), 1, time_step_and_flow_type, *downstream_ranks.begin(), //TODO currently only support a SINGLE downstream message pairing tag, MPI_COMM_WORLD, - &stored_sends.back().mpi_request); - - //std::cerr << "Creating send with target_rank=" << *downstream_ranks.begin() << " on tag=" << tag << "\n"; + &stored_sends.back().mpi_request) + ); + //std::cerr << "Creating send with target_rank=" << *downstream_ranks.begin() << " on tag=" << tag << "\n"; - while ( stored_sends.size() > 0 ) - { - process_communications(); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } + // Send is async, the next call to add_upstream_flow will test and ensure the send has completed + // and free the memory associated with the send. + // This prevents a potential deadlock situation where a send isn't able to complete + // because the remote receiver is also trying to send and the underlying mpi buffers/protocol + // are forced into a rendevous protocol. So we ensure that we always post receives before sends. + // and that we always test for completed sends before freeing the memory associated with the send. } } } From 6511d0f4f3c742458a3b5051bce11012ea9f3293 Mon Sep 17 00:00:00 2001 From: "jeff.wade" Date: Mon, 16 Feb 2026 13:20:22 -0500 Subject: [PATCH 27/41] Add topoflow-glacier submodule --- Dockerfile | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Dockerfile b/Dockerfile index 0151ff5070..f7b574b896 100644 --- a/Dockerfile +++ b/Dockerfile @@ -471,6 +471,11 @@ RUN --mount=type=cache,target=/root/.cache/pip,id=pip-cache \ cd extern/topoflow-glacier; \ pip install . +RUN --mount=type=cache,target=/root/.cache/pip,id=pip-cache \ + set -eux; \ + cd extern/topoflow-glacier; \ + pip install . + RUN set -eux && \ mkdir --parents /ngencerf/data/ngen-run-logs/ && \ mkdir --parents /ngen-app/bin/ && \ From 190fb5904551f0db16ecaba5ee6203b635f8753b Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Wed, 25 Feb 2026 10:12:57 -0500 Subject: [PATCH 28/41] Revert changes from OWP --- src/core/nexus/HY_PointHydroNexusRemote.cpp | 128 +++++++------------- 1 file changed, 44 insertions(+), 84 deletions(-) diff --git a/src/core/nexus/HY_PointHydroNexusRemote.cpp b/src/core/nexus/HY_PointHydroNexusRemote.cpp index f195d16e56..41811bb2fb 100644 --- a/src/core/nexus/HY_PointHydroNexusRemote.cpp +++ b/src/core/nexus/HY_PointHydroNexusRemote.cpp @@ -19,7 +19,7 @@ void MPI_Handle_Error(int status) } else { - MPI_Abort(MPI_COMM_WORLD, status); + MPI_Abort(MPI_COMM_WORLD,1); } } @@ -93,9 +93,9 @@ HY_PointHydroNexusRemote::HY_PointHydroNexusRemote(std::string nexus_id, Catchme HY_PointHydroNexusRemote::~HY_PointHydroNexusRemote() { - const unsigned int timeout = 120000; // timeout threshold in milliseconds - unsigned int wait_time = 0; - // This destructor might be called after MPI_Finalize so do not attempt communication if + long wait_time = 0; + + // This destructore might be called after MPI_Finalize so do not attempt communication if // this has occured int mpi_finalized; MPI_Finalized(&mpi_finalized); @@ -106,46 +106,14 @@ HY_PointHydroNexusRemote::~HY_PointHydroNexusRemote() process_communications(); - if( wait_time < timeout && wait_time > 0 ){ // don't sleep if first call clears comms! - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - else - { - std::cerr << "HY_PointHydroNexusRemote: "<< id - << " destructor timed out after " << timeout/1000 - << " seconds waiting on pending MPI communications\n"; - // The return is is probably best, logging the error. - // There is no good way to recover from this. - // Throwing an exception from destructors is generally not a good idea - // as it can lead to undefined behavior. - // and using std::exit forces the program to terminate immediately, - // even if this situation is recoverable/acceptable in some cases. - return; - } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + wait_time += 1; - MPI_Finalized(&mpi_finalized); - } -} -void HY_PointHydroNexusRemote::post_receives() -{ - // Post receives if not already posted (for pure receiver nexuses) - if (stored_receives.empty()) - { - for (int rank : upstream_ranks) + if ( wait_time > 120000 ) { - stored_receives.push_back({}); - stored_receives.back().buffer = std::make_shared(); - int tag = extract(id); - - MPI_Handle_Error(MPI_Irecv( - stored_receives.back().buffer.get(), - 1, - time_step_and_flow_type, - rank, - tag, - MPI_COMM_WORLD, - &stored_receives.back().mpi_request)); + // TODO log warning message that some comunications could not complete + } } } @@ -164,15 +132,31 @@ double HY_PointHydroNexusRemote::get_downstream_flow(std::string catchment_id, t } else if ( type == receiver || type == sender_receiver ) { - post_receives(); - // Wait for receives to complete - // This ensures all upstream flows are received before returning - // and that we have matched all sends with receives for a given time step. - // As long as the functions are called appropriately, e.g. one call to - // `add_upstream_flow` per upstream catchment per time step, followed - // by a call to `get_downstream_flow` for each downstream catchment per time step, - // this loop will terminate and ensures the synchronization of flows between - // ranks. + for ( int rank : upstream_ranks ) + { + int status; + + stored_receives.resize(stored_receives.size() + 1); + stored_receives.back().buffer = std::make_shared(); + + int tag = extract(id); + + //Receive downstream_flow from Upstream Remote Nexus to this Downstream Remote Nexus + status = MPI_Irecv( + stored_receives.back().buffer.get(), + 1, + time_step_and_flow_type, + rank, + tag, + MPI_COMM_WORLD, + &stored_receives.back().mpi_request); + + MPI_Handle_Error(status); + + //std::cerr << "Creating receive with target_rank=" << rank << " on tag=" << tag << "\n"; + } + + //std::cerr << "Waiting on receives\n"; while ( stored_receives.size() > 0 ) { process_communications(); @@ -185,28 +169,6 @@ double HY_PointHydroNexusRemote::get_downstream_flow(std::string catchment_id, t void HY_PointHydroNexusRemote::add_upstream_flow(double val, std::string catchment_id, time_step_t t) { - // Process any completed communications to free resources - // If no communications are pending, this call will do nothing. - process_communications(); - // NOTE: It is possible for a partition to get "too far" ahead since the sends are now - // truely asynchronous. For pure receivers and sender_receivers, this isn't a problem - // because the get_downstream_flow function will block until all receives are processed. - // However, for pure senders, this could be a problem. - // We can use this spinlock here to limit how far ahead a partition can get. - // in this case, approximately 100 time steps per downstream catchment... - while( stored_sends.size() > downstream_ranks.size()*100 ) - { - process_communications(); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - - // Post receives before sending to prevent deadlock - // When stored_receives is empty, we need to post for incoming messages - if ((type == receiver || type == sender_receiver) && stored_receives.empty()) - { - post_receives(); - } - // first add flow to local copy HY_PointHydroNexus::add_upstream_flow(val, catchment_id, t); @@ -245,25 +207,23 @@ void HY_PointHydroNexusRemote::add_upstream_flow(double val, std::string catchme int tag = extract(id); //Send downstream_flow from this Upstream Remote Nexus to the Downstream Remote Nexus - MPI_Handle_Error( - MPI_Isend( + MPI_Isend( stored_sends.back().buffer.get(), 1, time_step_and_flow_type, *downstream_ranks.begin(), //TODO currently only support a SINGLE downstream message pairing tag, MPI_COMM_WORLD, - &stored_sends.back().mpi_request) - ); + &stored_sends.back().mpi_request); + + //std::cerr << "Creating send with target_rank=" << *downstream_ranks.begin() << " on tag=" << tag << "\n"; - //std::cerr << "Creating send with target_rank=" << *downstream_ranks.begin() << " on tag=" << tag << "\n"; - // Send is async, the next call to add_upstream_flow will test and ensure the send has completed - // and free the memory associated with the send. - // This prevents a potential deadlock situation where a send isn't able to complete - // because the remote receiver is also trying to send and the underlying mpi buffers/protocol - // are forced into a rendevous protocol. So we ensure that we always post receives before sends. - // and that we always test for completed sends before freeing the memory associated with the send. + while ( stored_sends.size() > 0 ) + { + process_communications(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } } } } @@ -323,4 +283,4 @@ int HY_PointHydroNexusRemote::get_world_rank() return world_rank; } -#endif // NGEN_WITH_MPI +#endif // NGEN_WITH_MPI \ No newline at end of file From 149467524c63de3e195c2def3e330b2da4fd741b Mon Sep 17 00:00:00 2001 From: "Carolyn.Maynard" Date: Mon, 9 Mar 2026 18:11:46 -0700 Subject: [PATCH 29/41] Use new nwm-ewts libraries and python package EWTS versions to build summary output to console Updates to CMakeLists.txt for integrating the nwm-ewts libraries Initial conversion to ewts_ngen_bridge library. Updated CMakeLists. Replaced logMsgAndThrowError with separate calls to LOG and throw. --- src/core/nexus/HY_PointHydroNexusRemote.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/nexus/HY_PointHydroNexusRemote.cpp b/src/core/nexus/HY_PointHydroNexusRemote.cpp index 41811bb2fb..3dd9323949 100644 --- a/src/core/nexus/HY_PointHydroNexusRemote.cpp +++ b/src/core/nexus/HY_PointHydroNexusRemote.cpp @@ -283,4 +283,4 @@ int HY_PointHydroNexusRemote::get_world_rank() return world_rank; } -#endif // NGEN_WITH_MPI \ No newline at end of file +#endif // NGEN_WITH_MPI From f25f1a5a662b08f3a1aab0df797dc2a033496f9b Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Fri, 6 Feb 2026 10:26:57 -0500 Subject: [PATCH 30/41] Use Boost for serializing Multi-BMI --- include/realizations/catchment/Bmi_Module_Formulation.hpp | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/include/realizations/catchment/Bmi_Module_Formulation.hpp b/include/realizations/catchment/Bmi_Module_Formulation.hpp index f7fc898f11..286555cf73 100644 --- a/include/realizations/catchment/Bmi_Module_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Module_Formulation.hpp @@ -72,11 +72,6 @@ namespace realization { /** * Get the collection of forcing output property names this instance can provide. - * - * This is part of the @ref ForcingProvider interface. This interface must be implemented for items of this - * type to be usable as "forcing" providers for situations when some other object needs to receive as an input - * (i.e., one of its forcings) a data property output from this object. - * * For this type, this is the collection of BMI output variables, plus any aliases included in the formulation * config's output variable mapping. * @@ -319,6 +314,7 @@ namespace realization { * Existing state pointers should not be used as the stored data may be freed depending on implementation. */ virtual void free_serialization_state(); + void set_realization_file_format(bool is_legacy_format); virtual void check_mass_balance(const int& iteration, const int& total_steps, const std::string& timestamp) const override { From 0e2648f9f67b2040d80f972727139687ff0345c2 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Tue, 17 Feb 2026 09:21:39 -0500 Subject: [PATCH 31/41] Fortran state hackery --- include/realizations/catchment/Bmi_Module_Formulation.hpp | 1 - 1 file changed, 1 deletion(-) diff --git a/include/realizations/catchment/Bmi_Module_Formulation.hpp b/include/realizations/catchment/Bmi_Module_Formulation.hpp index 286555cf73..040e3eddff 100644 --- a/include/realizations/catchment/Bmi_Module_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Module_Formulation.hpp @@ -314,7 +314,6 @@ namespace realization { * Existing state pointers should not be used as the stored data may be freed depending on implementation. */ virtual void free_serialization_state(); - void set_realization_file_format(bool is_legacy_format); virtual void check_mass_balance(const int& iteration, const int& total_steps, const std::string& timestamp) const override { From 0d689512aa15eddb7522f45439955c712312877a Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Thu, 12 Feb 2026 09:47:23 -0500 Subject: [PATCH 32/41] Merge remote-tracking branch 'NOAA-OWP/master' into development --- src/core/nexus/HY_PointHydroNexusRemote.cpp | 126 +++++++++++++------- 1 file changed, 83 insertions(+), 43 deletions(-) diff --git a/src/core/nexus/HY_PointHydroNexusRemote.cpp b/src/core/nexus/HY_PointHydroNexusRemote.cpp index 3dd9323949..f195d16e56 100644 --- a/src/core/nexus/HY_PointHydroNexusRemote.cpp +++ b/src/core/nexus/HY_PointHydroNexusRemote.cpp @@ -19,7 +19,7 @@ void MPI_Handle_Error(int status) } else { - MPI_Abort(MPI_COMM_WORLD,1); + MPI_Abort(MPI_COMM_WORLD, status); } } @@ -93,9 +93,9 @@ HY_PointHydroNexusRemote::HY_PointHydroNexusRemote(std::string nexus_id, Catchme HY_PointHydroNexusRemote::~HY_PointHydroNexusRemote() { - long wait_time = 0; - - // This destructore might be called after MPI_Finalize so do not attempt communication if + const unsigned int timeout = 120000; // timeout threshold in milliseconds + unsigned int wait_time = 0; + // This destructor might be called after MPI_Finalize so do not attempt communication if // this has occured int mpi_finalized; MPI_Finalized(&mpi_finalized); @@ -106,14 +106,46 @@ HY_PointHydroNexusRemote::~HY_PointHydroNexusRemote() process_communications(); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - + if( wait_time < timeout && wait_time > 0 ){ // don't sleep if first call clears comms! + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + else + { + std::cerr << "HY_PointHydroNexusRemote: "<< id + << " destructor timed out after " << timeout/1000 + << " seconds waiting on pending MPI communications\n"; + // The return is is probably best, logging the error. + // There is no good way to recover from this. + // Throwing an exception from destructors is generally not a good idea + // as it can lead to undefined behavior. + // and using std::exit forces the program to terminate immediately, + // even if this situation is recoverable/acceptable in some cases. + return; + } wait_time += 1; + MPI_Finalized(&mpi_finalized); + } +} - if ( wait_time > 120000 ) +void HY_PointHydroNexusRemote::post_receives() +{ + // Post receives if not already posted (for pure receiver nexuses) + if (stored_receives.empty()) + { + for (int rank : upstream_ranks) { - // TODO log warning message that some comunications could not complete - + stored_receives.push_back({}); + stored_receives.back().buffer = std::make_shared(); + int tag = extract(id); + + MPI_Handle_Error(MPI_Irecv( + stored_receives.back().buffer.get(), + 1, + time_step_and_flow_type, + rank, + tag, + MPI_COMM_WORLD, + &stored_receives.back().mpi_request)); } } } @@ -132,31 +164,15 @@ double HY_PointHydroNexusRemote::get_downstream_flow(std::string catchment_id, t } else if ( type == receiver || type == sender_receiver ) { - for ( int rank : upstream_ranks ) - { - int status; - - stored_receives.resize(stored_receives.size() + 1); - stored_receives.back().buffer = std::make_shared(); - - int tag = extract(id); - - //Receive downstream_flow from Upstream Remote Nexus to this Downstream Remote Nexus - status = MPI_Irecv( - stored_receives.back().buffer.get(), - 1, - time_step_and_flow_type, - rank, - tag, - MPI_COMM_WORLD, - &stored_receives.back().mpi_request); - - MPI_Handle_Error(status); - - //std::cerr << "Creating receive with target_rank=" << rank << " on tag=" << tag << "\n"; - } - - //std::cerr << "Waiting on receives\n"; + post_receives(); + // Wait for receives to complete + // This ensures all upstream flows are received before returning + // and that we have matched all sends with receives for a given time step. + // As long as the functions are called appropriately, e.g. one call to + // `add_upstream_flow` per upstream catchment per time step, followed + // by a call to `get_downstream_flow` for each downstream catchment per time step, + // this loop will terminate and ensures the synchronization of flows between + // ranks. while ( stored_receives.size() > 0 ) { process_communications(); @@ -169,6 +185,28 @@ double HY_PointHydroNexusRemote::get_downstream_flow(std::string catchment_id, t void HY_PointHydroNexusRemote::add_upstream_flow(double val, std::string catchment_id, time_step_t t) { + // Process any completed communications to free resources + // If no communications are pending, this call will do nothing. + process_communications(); + // NOTE: It is possible for a partition to get "too far" ahead since the sends are now + // truely asynchronous. For pure receivers and sender_receivers, this isn't a problem + // because the get_downstream_flow function will block until all receives are processed. + // However, for pure senders, this could be a problem. + // We can use this spinlock here to limit how far ahead a partition can get. + // in this case, approximately 100 time steps per downstream catchment... + while( stored_sends.size() > downstream_ranks.size()*100 ) + { + process_communications(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + + // Post receives before sending to prevent deadlock + // When stored_receives is empty, we need to post for incoming messages + if ((type == receiver || type == sender_receiver) && stored_receives.empty()) + { + post_receives(); + } + // first add flow to local copy HY_PointHydroNexus::add_upstream_flow(val, catchment_id, t); @@ -207,23 +245,25 @@ void HY_PointHydroNexusRemote::add_upstream_flow(double val, std::string catchme int tag = extract(id); //Send downstream_flow from this Upstream Remote Nexus to the Downstream Remote Nexus - MPI_Isend( + MPI_Handle_Error( + MPI_Isend( stored_sends.back().buffer.get(), 1, time_step_and_flow_type, *downstream_ranks.begin(), //TODO currently only support a SINGLE downstream message pairing tag, MPI_COMM_WORLD, - &stored_sends.back().mpi_request); - - //std::cerr << "Creating send with target_rank=" << *downstream_ranks.begin() << " on tag=" << tag << "\n"; + &stored_sends.back().mpi_request) + ); + //std::cerr << "Creating send with target_rank=" << *downstream_ranks.begin() << " on tag=" << tag << "\n"; - while ( stored_sends.size() > 0 ) - { - process_communications(); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } + // Send is async, the next call to add_upstream_flow will test and ensure the send has completed + // and free the memory associated with the send. + // This prevents a potential deadlock situation where a send isn't able to complete + // because the remote receiver is also trying to send and the underlying mpi buffers/protocol + // are forced into a rendevous protocol. So we ensure that we always post receives before sends. + // and that we always test for completed sends before freeing the memory associated with the send. } } } From 1c632262561bdd48eed61891a6e434850d436bd4 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Wed, 25 Feb 2026 10:12:57 -0500 Subject: [PATCH 33/41] Revert changes from OWP --- src/core/nexus/HY_PointHydroNexusRemote.cpp | 126 +++++++------------- 1 file changed, 43 insertions(+), 83 deletions(-) diff --git a/src/core/nexus/HY_PointHydroNexusRemote.cpp b/src/core/nexus/HY_PointHydroNexusRemote.cpp index f195d16e56..3dd9323949 100644 --- a/src/core/nexus/HY_PointHydroNexusRemote.cpp +++ b/src/core/nexus/HY_PointHydroNexusRemote.cpp @@ -19,7 +19,7 @@ void MPI_Handle_Error(int status) } else { - MPI_Abort(MPI_COMM_WORLD, status); + MPI_Abort(MPI_COMM_WORLD,1); } } @@ -93,9 +93,9 @@ HY_PointHydroNexusRemote::HY_PointHydroNexusRemote(std::string nexus_id, Catchme HY_PointHydroNexusRemote::~HY_PointHydroNexusRemote() { - const unsigned int timeout = 120000; // timeout threshold in milliseconds - unsigned int wait_time = 0; - // This destructor might be called after MPI_Finalize so do not attempt communication if + long wait_time = 0; + + // This destructore might be called after MPI_Finalize so do not attempt communication if // this has occured int mpi_finalized; MPI_Finalized(&mpi_finalized); @@ -106,46 +106,14 @@ HY_PointHydroNexusRemote::~HY_PointHydroNexusRemote() process_communications(); - if( wait_time < timeout && wait_time > 0 ){ // don't sleep if first call clears comms! - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - else - { - std::cerr << "HY_PointHydroNexusRemote: "<< id - << " destructor timed out after " << timeout/1000 - << " seconds waiting on pending MPI communications\n"; - // The return is is probably best, logging the error. - // There is no good way to recover from this. - // Throwing an exception from destructors is generally not a good idea - // as it can lead to undefined behavior. - // and using std::exit forces the program to terminate immediately, - // even if this situation is recoverable/acceptable in some cases. - return; - } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + wait_time += 1; - MPI_Finalized(&mpi_finalized); - } -} -void HY_PointHydroNexusRemote::post_receives() -{ - // Post receives if not already posted (for pure receiver nexuses) - if (stored_receives.empty()) - { - for (int rank : upstream_ranks) + if ( wait_time > 120000 ) { - stored_receives.push_back({}); - stored_receives.back().buffer = std::make_shared(); - int tag = extract(id); - - MPI_Handle_Error(MPI_Irecv( - stored_receives.back().buffer.get(), - 1, - time_step_and_flow_type, - rank, - tag, - MPI_COMM_WORLD, - &stored_receives.back().mpi_request)); + // TODO log warning message that some comunications could not complete + } } } @@ -164,15 +132,31 @@ double HY_PointHydroNexusRemote::get_downstream_flow(std::string catchment_id, t } else if ( type == receiver || type == sender_receiver ) { - post_receives(); - // Wait for receives to complete - // This ensures all upstream flows are received before returning - // and that we have matched all sends with receives for a given time step. - // As long as the functions are called appropriately, e.g. one call to - // `add_upstream_flow` per upstream catchment per time step, followed - // by a call to `get_downstream_flow` for each downstream catchment per time step, - // this loop will terminate and ensures the synchronization of flows between - // ranks. + for ( int rank : upstream_ranks ) + { + int status; + + stored_receives.resize(stored_receives.size() + 1); + stored_receives.back().buffer = std::make_shared(); + + int tag = extract(id); + + //Receive downstream_flow from Upstream Remote Nexus to this Downstream Remote Nexus + status = MPI_Irecv( + stored_receives.back().buffer.get(), + 1, + time_step_and_flow_type, + rank, + tag, + MPI_COMM_WORLD, + &stored_receives.back().mpi_request); + + MPI_Handle_Error(status); + + //std::cerr << "Creating receive with target_rank=" << rank << " on tag=" << tag << "\n"; + } + + //std::cerr << "Waiting on receives\n"; while ( stored_receives.size() > 0 ) { process_communications(); @@ -185,28 +169,6 @@ double HY_PointHydroNexusRemote::get_downstream_flow(std::string catchment_id, t void HY_PointHydroNexusRemote::add_upstream_flow(double val, std::string catchment_id, time_step_t t) { - // Process any completed communications to free resources - // If no communications are pending, this call will do nothing. - process_communications(); - // NOTE: It is possible for a partition to get "too far" ahead since the sends are now - // truely asynchronous. For pure receivers and sender_receivers, this isn't a problem - // because the get_downstream_flow function will block until all receives are processed. - // However, for pure senders, this could be a problem. - // We can use this spinlock here to limit how far ahead a partition can get. - // in this case, approximately 100 time steps per downstream catchment... - while( stored_sends.size() > downstream_ranks.size()*100 ) - { - process_communications(); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - - // Post receives before sending to prevent deadlock - // When stored_receives is empty, we need to post for incoming messages - if ((type == receiver || type == sender_receiver) && stored_receives.empty()) - { - post_receives(); - } - // first add flow to local copy HY_PointHydroNexus::add_upstream_flow(val, catchment_id, t); @@ -245,25 +207,23 @@ void HY_PointHydroNexusRemote::add_upstream_flow(double val, std::string catchme int tag = extract(id); //Send downstream_flow from this Upstream Remote Nexus to the Downstream Remote Nexus - MPI_Handle_Error( - MPI_Isend( + MPI_Isend( stored_sends.back().buffer.get(), 1, time_step_and_flow_type, *downstream_ranks.begin(), //TODO currently only support a SINGLE downstream message pairing tag, MPI_COMM_WORLD, - &stored_sends.back().mpi_request) - ); + &stored_sends.back().mpi_request); + + //std::cerr << "Creating send with target_rank=" << *downstream_ranks.begin() << " on tag=" << tag << "\n"; - //std::cerr << "Creating send with target_rank=" << *downstream_ranks.begin() << " on tag=" << tag << "\n"; - // Send is async, the next call to add_upstream_flow will test and ensure the send has completed - // and free the memory associated with the send. - // This prevents a potential deadlock situation where a send isn't able to complete - // because the remote receiver is also trying to send and the underlying mpi buffers/protocol - // are forced into a rendevous protocol. So we ensure that we always post receives before sends. - // and that we always test for completed sends before freeing the memory associated with the send. + while ( stored_sends.size() > 0 ) + { + process_communications(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } } } } From b4a122354f45ef5ea720bb2d0206cee035336635 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Thu, 26 Mar 2026 15:49:40 -0400 Subject: [PATCH 34/41] Fix rebase problems --- include/realizations/catchment/Bmi_Multi_Formulation.hpp | 9 --------- 1 file changed, 9 deletions(-) diff --git a/include/realizations/catchment/Bmi_Multi_Formulation.hpp b/include/realizations/catchment/Bmi_Multi_Formulation.hpp index f9968695de..3387f9fa1e 100644 --- a/include/realizations/catchment/Bmi_Multi_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Multi_Formulation.hpp @@ -62,15 +62,6 @@ namespace realization { void load_state(std::shared_ptr loader) override; void load_hot_start(std::shared_ptr loader) override; - - virtual void check_mass_balance(const int& iteration, const int& total_steps, const std::string& timestamp) const final { - for( const auto &module : modules ) { - // TODO may need to check on outputs form each module indepdently??? - // Right now, the assumption is that if each component is mass balanced - // then the entire formulation is mass balanced - module->check_mass_balance(iteration, total_steps, timestamp); - } - }; /** * Convert a time value from the model to an epoch time in seconds. From 9921aa20fbcc752133e9e81cf77b893bb6dcedbe Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Mon, 6 Apr 2026 11:57:24 -0400 Subject: [PATCH 35/41] Remove duplicate function declaration --- .../catchment/Bmi_Module_Formulation.hpp | 15 --------------- src/NGen.cpp | 10 ---------- src/partitionGenerator.cpp | 3 ++- 3 files changed, 2 insertions(+), 26 deletions(-) diff --git a/include/realizations/catchment/Bmi_Module_Formulation.hpp b/include/realizations/catchment/Bmi_Module_Formulation.hpp index 040e3eddff..318fafe064 100644 --- a/include/realizations/catchment/Bmi_Module_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Module_Formulation.hpp @@ -55,21 +55,6 @@ namespace realization { void load_hot_start(std::shared_ptr loader) override; - void load_state(std::shared_ptr loader) override; - - /** - * Requests the BMI to copy its current state into memory. The state will remain in memory until either a new state is made or `free_save_state` is called. - * - * @param size A `uint64_t` pointer that will have its value set to the size of the serialized data. - * @return Pointer to the beginning of the serialized data. - */ - virtual const char* create_save_state(uint64_t *size) const; - - /** - * Clears any serialized data stored by the BMI from memory. - */ - virtual void free_save_state() const; - /** * Get the collection of forcing output property names this instance can provide. * For this type, this is the collection of BMI output variables, plus any aliases included in the formulation diff --git a/src/NGen.cpp b/src/NGen.cpp index 2e71f2c4a1..dd4ab6b58e 100644 --- a/src/NGen.cpp +++ b/src/NGen.cpp @@ -714,16 +714,6 @@ int run_ngen(int argc, char* argv[], int mpi_num_procs, int mpi_rank) { simulation->run_catchments(); - if (state_saving_config.has_end_of_run()) { - LOG("Saving end-of-run state.", LogLevel::INFO); - std::shared_ptr saver = state_saving_config.end_of_run_saver(); - std::shared_ptr snapshot = saver->initialize_snapshot( - State_Saver::snapshot_time_now(), - State_Saver::State_Durability::strict - ); - simulation->save_state_snapshot(snapshot); - } - #if NGEN_WITH_MPI MPI_Barrier(MPI_COMM_WORLD); #endif diff --git a/src/partitionGenerator.cpp b/src/partitionGenerator.cpp index b52754bf00..d58f0d6d50 100644 --- a/src/partitionGenerator.cpp +++ b/src/partitionGenerator.cpp @@ -437,7 +437,8 @@ int main(int argc, char* argv[]) } catch (std::exception &e) { // Handle all exceptions std::string msg = "Geopackage error occurred reading divides: " + catchmentDataFile; - LOG(msg,LogLevel::FATAL, msg); + LOG(LogLevel::FATAL, msg); + LOG(LogLevel::FATAL, e.what()); throw std::runtime_error(msg); } #else From 614dddb793b47fb0ae35301750f20f5e06a952c6 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Tue, 7 Apr 2026 09:05:42 -0400 Subject: [PATCH 36/41] Ignore sentinels made up by partitionGenerator --- src/geopackage/read.cpp | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/geopackage/read.cpp b/src/geopackage/read.cpp index 8f9efa8283..8213e9c040 100644 --- a/src/geopackage/read.cpp +++ b/src/geopackage/read.cpp @@ -106,9 +106,15 @@ std::shared_ptr ngen::geopackage::read( sep_index++; } int id_num = std::atoi(filter_id.c_str() + sep_index); - if (id_num <= 0) - Logger::LogAndThrow("Could not convert input " + layer + " ID into a number: " + filter_id); - filter << id_num; + if (id_num <= 0) { + // check if the failed item is a fake terminal and igore if it is + std::string terminal = "wb-TERMINAL_SENTINEL-"; + if (strncmp(filter_id.c_str(), terminal.c_str(), terminal.length()) != 0) { + Logger::LogAndThrow("Could not convert input " + layer + " ID into a number: " + filter_id); + } + } else { + filter << id_num; + } } filter << ')'; joined_ids = filter.str(); From 2c7eb8007b16bd46281c9fc021985492b87d1c4f Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Tue, 7 Apr 2026 11:10:47 -0400 Subject: [PATCH 37/41] Fix whereclause builder when sentinels are included --- src/geopackage/read.cpp | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/geopackage/read.cpp b/src/geopackage/read.cpp index 8213e9c040..1a17a0050b 100644 --- a/src/geopackage/read.cpp +++ b/src/geopackage/read.cpp @@ -93,12 +93,10 @@ std::shared_ptr ngen::geopackage::read( std::string joined_ids = ""; if (!ids.empty()) { + bool non_sentinel_found = false; std::stringstream filter; filter << " WHERE " << layer << '.' << id_column << " IN ("; - for (size_t i = 0; i < ids.size(); ++i) { - if (i != 0) - filter << ','; - auto &filter_id = ids[i]; + for (const auto &filter_id : ids) { size_t sep_index = filter_id.find('-'); if (sep_index == std::string::npos) { sep_index = 0; @@ -113,11 +111,19 @@ std::shared_ptr ngen::geopackage::read( Logger::LogAndThrow("Could not convert input " + layer + " ID into a number: " + filter_id); } } else { + if (non_sentinel_found) // only add comma after finding at least one non-sentinel + filter << ','; + non_sentinel_found = true; filter << id_num; } } - filter << ')'; - joined_ids = filter.str(); + if (non_sentinel_found) { + filter << ')'; + joined_ids = filter.str(); + } else { + // if all IDs were sentinels, just make the query return nothing + joined_ids = " WHERE 1=0"; + } } // Get number of features From 44b7b0a05a2503b3aa5fd0343598c894728b52ba Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Wed, 8 Apr 2026 15:52:46 -0400 Subject: [PATCH 38/41] Pass catchment and nexus results to T-Route --- include/core/NgenSimulation.hpp | 9 ++ src/core/Layer.cpp | 15 ++- src/core/NgenSimulation.cpp | 157 ++++++++++++++++++-------------- 3 files changed, 110 insertions(+), 71 deletions(-) diff --git a/include/core/NgenSimulation.hpp b/include/core/NgenSimulation.hpp index 3189045edc..1de4ecd8ed 100644 --- a/include/core/NgenSimulation.hpp +++ b/include/core/NgenSimulation.hpp @@ -83,6 +83,15 @@ class NgenSimulation private: void advance_models_one_output_step(); + // Set T-route input that may require merging results from other MPI processes + std::pair*, std::unordered_map*> set_troute_inputs( + const NgenSimulation::hy_features_t &features, + const std::vector *simulation_values, + const std::unordered_map *feature_indexes, + const std::string id_var_name, + const std::string value_var_name + ); + int simulation_step_; std::shared_ptr sim_time_; diff --git a/src/core/Layer.cpp b/src/core/Layer.cpp index 432b918aa3..537e9548f7 100644 --- a/src/core/Layer.cpp +++ b/src/core/Layer.cpp @@ -46,10 +46,6 @@ void ngen::Layer::update_models(boost::span catchment_outflows, +" at feature id "+id; throw std::runtime_error(msg); } -#if NGEN_WITH_ROUTING - int results_index = catchment_indexes[id]; - catchment_outflows[results_index] += response; -#endif // NGEN_WITH_ROUTING if (r_c->get_output_header_count() > 0) { // only write output if config specifies output values std::string output = std::to_string(output_time_index)+","+current_timestamp+","+ @@ -64,6 +60,17 @@ void ngen::Layer::update_models(boost::span catchment_outflows, catch(std::invalid_argument &e) { area = catchment_data->get_feature(id)->get_property("area_sqkm").as_real_number(); } +#if NGEN_WITH_ROUTING + // t-route NHF takes in catchment results in m^3/s + int results_index = catchment_indexes[id]; + catchment_outflows[results_index] += + // response is meters per timestep + response + // divide by timestamp seconds to get to (m/s) + / simulation_time.get_output_interval_seconds() + // multiply by (m^2) area to get to (m^3/s) + * (area * 1'000'000); +#endif // NGEN_WITH_ROUTING double response_m_s = response * (area * 1000000); //TODO put this somewhere else as well, for now, an implicit assumption is that a module's get_response returns //m/timestep diff --git a/src/core/NgenSimulation.cpp b/src/core/NgenSimulation.cpp index 9addca80d4..bf54b5f206 100644 --- a/src/core/NgenSimulation.cpp +++ b/src/core/NgenSimulation.cpp @@ -218,98 +218,79 @@ double NgenSimulation::get_nexus_outflow(int nexus_index, int timestep_index) co return nexus_downstream_flows_[timestep_index * nexus_indexes_.size() + nexus_index]; } -void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::string const& t_route_config_file_with_path) -{ -#if NGEN_WITH_ROUTING - std::vector *routing_nexus_downflows = &nexus_downstream_flows_; - std::unordered_map *routing_nexus_indexes = &nexus_indexes_; - - size_t number_of_timesteps = sim_time_->get_total_output_times(); - if (nexus_downstream_flows_.size() != number_of_timesteps * nexus_indexes_.size()) { - std::string msg = "Routing input data in NgenSimulation::nexus_downstream_flows_ does not reflect a full-duration run"; - LOG(msg, LogLevel::FATAL); - throw std::runtime_error(msg); - } - +std::pair*, std::unordered_map*> NgenSimulation::set_troute_inputs( + const NgenSimulation::hy_features_t &features, + const std::vector *simulation_values, + const std::unordered_map *feature_indexes, + const std::string id_var_name, + const std::string value_var_name +) { #if NGEN_WITH_MPI - std::vector all_nexus_downflows; - std::unordered_map all_nexus_indexes; - - if (mpi_num_procs_ > 1) { - std::vector local_nexus_ids; - for (const auto& nexus : nexus_indexes_) { - local_nexus_ids.push_back(nexus.first); + std::vector all_values; + std::unordered_map all_indexes; + if (this->mpi_num_procs_ > 1) { + size_t number_of_timesteps = sim_time_->get_total_output_times(); + // create a list of local IDs + std::vector local_ids; + for (const auto& id_pair : *feature_indexes) { + local_ids.push_back(id_pair.first); } - // MPI_Gather all nexus IDs into a single vector - std::vector all_nexus_ids = parallel::gather_strings(local_nexus_ids, mpi_rank_, mpi_num_procs_); + // MPI_Gather all IDs into a single vector + std::vector all_ids = parallel::gather_strings(local_ids, mpi_rank_, mpi_num_procs_); if (mpi_rank_ == 0) { // filter to only the unique IDs - std::sort(all_nexus_ids.begin(), all_nexus_ids.end()); - all_nexus_ids.erase( - std::unique(all_nexus_ids.begin(), all_nexus_ids.end()), - all_nexus_ids.end() + std::sort(all_ids.begin(), all_ids.end()); + all_ids.erase( + std::unique(all_ids.begin(), all_ids.end()), + all_ids.end() ); } - // MPI_Broadcast so all processes share the nexus IDs - all_nexus_ids = std::move(parallel::broadcast_strings(all_nexus_ids, mpi_rank_, mpi_num_procs_)); + // MPI_Broadcast so all processes share the IDs + all_ids = std::move(parallel::broadcast_strings(all_ids, mpi_rank_, mpi_num_procs_)); // MPI_Reduce to collect the results from processes - if (mpi_rank_ == 0) { - all_nexus_downflows.resize(number_of_timesteps * all_nexus_ids.size(), 0.0); + if (this->mpi_rank_ == 0) { + all_values.resize(number_of_timesteps * all_ids.size(), 0.0); } std::vector local_buffer(number_of_timesteps); std::vector receive_buffer(number_of_timesteps, 0.0); - for (int i = 0; i < all_nexus_ids.size(); ++i) { - std::string nexus_id = all_nexus_ids[i]; - if (nexus_indexes_.find(nexus_id) != nexus_indexes_.end() && !features.is_remote_sender_nexus(nexus_id)) { + for (int i = 0; i < all_ids.size(); ++i) { + std::string current_id = all_ids[i]; + if (feature_indexes->find(current_id) != feature_indexes->end() && !features.is_remote_sender_nexus(current_id)) { // if this process has the id and receives/records data, copy the values to the buffer - int nexus_index = nexus_indexes_[nexus_id]; + int id_index = feature_indexes->at(current_id); for (int step = 0; step < number_of_timesteps; ++step) { - int offset = step * nexus_indexes_.size() + nexus_index; - local_buffer[step] = nexus_downstream_flows_[offset]; + int offset = step * feature_indexes->size() + id_index; + local_buffer[step] = simulation_values->at(offset); } } else { // if this process does not have the id, fill with 0 to make sure it doesn't affect reduce sum std::fill(local_buffer.begin(), local_buffer.end(), 0.0); } MPI_Reduce(local_buffer.data(), receive_buffer.data(), number_of_timesteps, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD); - if (mpi_rank_ == 0) { + if (this->mpi_rank_ == 0) { // copy reduce values to a combined downflows vector - all_nexus_indexes[nexus_id] = i; + all_indexes[current_id] = i; for (int step = 0; step < number_of_timesteps; ++step) { - int offset = step * all_nexus_ids.size() + i; - all_nexus_downflows[offset] = receive_buffer[step]; + int offset = step * all_ids.size() + i; + all_values[offset] = receive_buffer[step]; receive_buffer[step] = 0.0; } } } - if (mpi_rank_ == 0) { + if (this->mpi_rank_ == 0) { // update root's local data for running t-route below - routing_nexus_indexes = &all_nexus_indexes; - routing_nexus_downflows = &all_nexus_downflows; + simulation_values = &all_values; + feature_indexes = &all_indexes; } } #endif // NGEN_WITH_MPI - - if (mpi_rank_ == 0) { // Run t-route from single process - LOG(LogLevel::INFO, "Running T-Route on nexus outflows."); - - // Note: Currently, delta_time is set in the t-route yaml configuration file, and the - // number_of_timesteps is determined from the total number of nexus outputs in t-route. - // It is recommended to still pass these values to the routing_py_adapter object in - // case a future implementation needs these two values from the ngen framework. - int delta_time = sim_time_->get_output_interval_seconds(); - - // model for routing - if (this->py_troute_ == NULL) { - this->make_troute(t_route_config_file_with_path); - } - this->py_troute_->set_value_unchecked("ngen_dt", &delta_time, 1); - +#if NGEN_WITH_ROUTING + if (this->mpi_rank_ == 0) { // set up nexus id indexes - std::vector nexus_df_index(routing_nexus_indexes->size()); - for (const auto& key_value : *routing_nexus_indexes) { + std::vector df_index(feature_indexes->size()); + for (const auto& key_value : *feature_indexes) { int id_index = key_value.second; // Convert string ID into numbers for T-route index @@ -320,21 +301,63 @@ void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::s id_as_int = std::stoi(numbers); } if (id_as_int == -1) { - std::string error_msg = "Cannot convert the nexus ID to an integer: " + key_value.first; + std::string error_msg = "Cannot convert the ID to an integer: " + key_value.first; LOG(LogLevel::FATAL, error_msg); throw std::runtime_error(error_msg); } - nexus_df_index[id_index] = id_as_int; + df_index[id_index] = id_as_int; } // use unchecked messaging to allow the BMI to change its container size - py_troute_->set_value_unchecked("land_surface_water_source__id", nexus_df_index.data(), nexus_df_index.size()); - py_troute_->set_value_unchecked("land_surface_water_source__volume_flow_rate", routing_nexus_downflows->data(), routing_nexus_downflows->size()); - // run the T-Route model and create outputs through Update - py_troute_->Update(); + py_troute_->set_value_unchecked(id_var_name, df_index.data(), df_index.size()); + py_troute_->set_value_unchecked(value_var_name, simulation_values->data(), simulation_values->size()); } #endif // NGEN_WITH_ROUTING } +void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::string const& t_route_config_file_with_path) +{ +#if NGEN_WITH_ROUTING + size_t number_of_timesteps = sim_time_->get_total_output_times(); + if (nexus_downstream_flows_.size() != number_of_timesteps * nexus_indexes_.size()) { + std::string msg = "Routing input data in NgenSimulation::nexus_downstream_flows_ does not reflect a full-duration run"; + LOG(msg, LogLevel::FATAL); + throw std::runtime_error(msg); + } + if (mpi_rank_ == 0) { // Run t-route from single process + LOG(LogLevel::INFO, "Running T-Route on simulation outputs."); + + // Note: Currently, delta_time is set in the t-route yaml configuration file, and the + // number_of_timesteps is determined from the total number of nexus outputs in t-route. + // It is recommended to still pass these values to the routing_py_adapter object in + // case a future implementation needs these two values from the ngen framework. + int delta_time = sim_time_->get_output_interval_seconds(); + + // model for routing + if (this->py_troute_ == NULL) { + this->make_troute(t_route_config_file_with_path); + } + this->py_troute_->set_value_unchecked("ngen_dt", &delta_time, 1); + } + // set the inputs from catchment and nexus results + this->set_troute_inputs( + features, + &this->nexus_downstream_flows_, + &this->nexus_indexes_, + "land_surface_water_source__id", + "land_surface_water_source__volume_flow_rate" + ); + this->set_troute_inputs( + features, + &this->catchment_outflows_, + &this->catchment_indexes_, + "catchment_water_source__id", + "catchment_water_source__volume_flow_rate" + ); + if (this->mpi_rank_ == 0) + this->py_troute_->Update(); +#endif // NGEN_WITH_ROUTING +} + size_t NgenSimulation::get_num_output_times() const { return sim_time_->get_total_output_times(); From bf59b96444417f304ae6f2e42229b5fab454a485 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Thu, 9 Apr 2026 09:25:04 -0400 Subject: [PATCH 39/41] Add documentation --- include/core/NgenSimulation.hpp | 16 ++++++++++++---- src/core/Layer.cpp | 26 +++++++++++++++----------- src/core/NgenSimulation.cpp | 18 +++++++++--------- 3 files changed, 36 insertions(+), 24 deletions(-) diff --git a/include/core/NgenSimulation.hpp b/include/core/NgenSimulation.hpp index 1de4ecd8ed..62744e4588 100644 --- a/include/core/NgenSimulation.hpp +++ b/include/core/NgenSimulation.hpp @@ -83,13 +83,21 @@ class NgenSimulation private: void advance_models_one_output_step(); - // Set T-route input that may require merging results from other MPI processes - std::pair*, std::unordered_map*> set_troute_inputs( - const NgenSimulation::hy_features_t &features, + /** Set T-route input that may require merging results from other MPI processes. The T-route values will only be set for the MPI rank 0 process. + * + * If MPI is running with multiple processes, blocking MPI calls will be made to merge the results. + * @param simulation_values Pointer to vector of simulation results + * @param feature_indexes Pointer to the map between feature IDs and the relative timestep index + * @param id_var_name T-route BMI var name for the feature IDs. `feature_indexes` will be converted to a list and passed to this variable + * @param value_var_name T-route BMI var name for the simulation results + * @param features Features colection used to filter out remote sender nexuses when merging values + */ + void set_troute_inputs( const std::vector *simulation_values, const std::unordered_map *feature_indexes, const std::string id_var_name, - const std::string value_var_name + const std::string value_var_name, + const NgenSimulation::hy_features_t &features ); int simulation_step_; diff --git a/src/core/Layer.cpp b/src/core/Layer.cpp index 537e9548f7..5de09354c5 100644 --- a/src/core/Layer.cpp +++ b/src/core/Layer.cpp @@ -53,27 +53,31 @@ void ngen::Layer::update_models(boost::span catchment_outflows, r_c->write_output(output); } //TODO put this somewhere else. For now, just trying to ensure we get m^3/s into nexus output - double area; + double area_sq_km; try { - area = catchment_data->get_feature(id)->get_property("areasqkm").as_real_number(); + area_sq_km = catchment_data->get_feature(id)->get_property("areasqkm").as_real_number(); } catch(std::invalid_argument &e) { - area = catchment_data->get_feature(id)->get_property("area_sqkm").as_real_number(); + area_sq_km = catchment_data->get_feature(id)->get_property("area_sqkm").as_real_number(); } + double area_sq_m = area_sq_km * 1'000'000; + //TODO put this somewhere else as well, for now, an implicit assumption is that a module's get_response returns + //m/timestep #if NGEN_WITH_ROUTING - // t-route NHF takes in catchment results in m^3/s + // t-route NHF takes in catchment results in (m^3/s) + // depth (m) x area (m^2) / dt (seconds) int results_index = catchment_indexes[id]; catchment_outflows[results_index] += - // response is meters per timestep + // response is meters per timestep (m/t) response - // divide by timestamp seconds to get to (m/s) + // divide by timestep seconds to get to meters per second: (m/t) * (t/s) = (m/s) / simulation_time.get_output_interval_seconds() - // multiply by (m^2) area to get to (m^3/s) - * (area * 1'000'000); + // multiply by square meters: (m/s) * (m^2) = (m^3/s) + * area_sq_m; #endif // NGEN_WITH_ROUTING - double response_m_s = response * (area * 1000000); - //TODO put this somewhere else as well, for now, an implicit assumption is that a module's get_response returns - //m/timestep + // NOTE: the conversion below loos like it's missing a conversion from per timestep to per second + // Maintaining the current code in case there's a step later that accounts for this + double response_m_s = response * area_sq_m; //since we are operating on a 1 hour (3600s) dt, we need to scale the output appropriately //so no response is m^2/hr...m^2/hr * 1hr/3600s = m^3/hr double response_m_h = response_m_s / 3600.0; diff --git a/src/core/NgenSimulation.cpp b/src/core/NgenSimulation.cpp index bf54b5f206..f2caad5831 100644 --- a/src/core/NgenSimulation.cpp +++ b/src/core/NgenSimulation.cpp @@ -218,12 +218,12 @@ double NgenSimulation::get_nexus_outflow(int nexus_index, int timestep_index) co return nexus_downstream_flows_[timestep_index * nexus_indexes_.size() + nexus_index]; } -std::pair*, std::unordered_map*> NgenSimulation::set_troute_inputs( - const NgenSimulation::hy_features_t &features, +void NgenSimulation::set_troute_inputs( const std::vector *simulation_values, const std::unordered_map *feature_indexes, const std::string id_var_name, - const std::string value_var_name + const std::string value_var_name, + const NgenSimulation::hy_features_t &features ) { #if NGEN_WITH_MPI std::vector all_values; @@ -269,7 +269,7 @@ std::pair*, std::unordered_map*> NgenSimul } MPI_Reduce(local_buffer.data(), receive_buffer.data(), number_of_timesteps, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD); if (this->mpi_rank_ == 0) { - // copy reduce values to a combined downflows vector + // copy reduce values to a combined values vector all_indexes[current_id] = i; for (int step = 0; step < number_of_timesteps; ++step) { int offset = step * all_ids.size() + i; @@ -280,7 +280,7 @@ std::pair*, std::unordered_map*> NgenSimul } if (this->mpi_rank_ == 0) { - // update root's local data for running t-route below + // change rank 0's indexes and values to the MPI merged results for setting below simulation_values = &all_values; feature_indexes = &all_indexes; } @@ -340,18 +340,18 @@ void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::s } // set the inputs from catchment and nexus results this->set_troute_inputs( - features, &this->nexus_downstream_flows_, &this->nexus_indexes_, "land_surface_water_source__id", - "land_surface_water_source__volume_flow_rate" + "land_surface_water_source__volume_flow_rate", + features ); this->set_troute_inputs( - features, &this->catchment_outflows_, &this->catchment_indexes_, "catchment_water_source__id", - "catchment_water_source__volume_flow_rate" + "catchment_water_source__volume_flow_rate", + features ); if (this->mpi_rank_ == 0) this->py_troute_->Update(); From 1b17bd4f04192f7cafe8c9e480c27036a21dfd53 Mon Sep 17 00:00:00 2001 From: Ian Todd Date: Thu, 9 Apr 2026 11:24:39 -0400 Subject: [PATCH 40/41] Update t-route submod reference for its BMI updates --- extern/t-route | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extern/t-route b/extern/t-route index 346db58b67..13536b5c9b 160000 --- a/extern/t-route +++ b/extern/t-route @@ -1 +1 @@ -Subproject commit 346db58b6779089bfde7c9519783ecfb39d95a95 +Subproject commit 13536b5c9bbe4906c798344d150499d56d09f368 From 57af853ee74882160887fb9fc367041fdc454ddc Mon Sep 17 00:00:00 2001 From: "jeff.wade" Date: Wed, 15 Apr 2026 09:29:33 -0400 Subject: [PATCH 41/41] Update dockerfile to build from NGEN_FORCING_IMAGE --- Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index f7b574b896..54560a6737 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,10 +7,10 @@ ARG ORG=ngwpc ARG NGEN_FORCING_IMAGE_TAG=latest ARG NGEN_FORCING_IMAGE=ghcr.io/${ORG}/ngen-bmi-forcing:${NGEN_FORCING_IMAGE_TAG} -#FROM ${NGEN_FORCING_IMAGE} AS base +FROM ${NGEN_FORCING_IMAGE} AS base # Uncomment when building locally -FROM ngen-bmi-forcing AS base +# FROM ngen-bmi-forcing AS base # OCI Metadata Arguments ARG NGEN_FORCING_IMAGE