diff --git a/log/log.tex b/log/log.tex index 3731128..759a3de 100644 --- a/log/log.tex +++ b/log/log.tex @@ -110,6 +110,13 @@ \subsection{Testing this functionality} \begin{itemize} \item NMPC and open-loop have been tested. + \item KH's MHE code. + \begin{itemize} + \item Tests pass. + \item Do the new data structure buy me anything here? + \item They make the code for sending weights to the disturbance objective + slightly easier to understand. + \end{itemize} \item KH's MHE has been tested. \item Now I need to test the code in \texttt{gas\_distribution} \end{itemize} @@ -163,6 +170,45 @@ \section{June 1, 2022} \item Maybe some functions for adding noise to data. \end{itemize} +\section{June 1, 2022} +\begin{itemize} + \item I can use a small, simple \texttt{get\_error\_cost} function in place + of KH's \texttt{get\_error\_disturbance\_cost}. Now I should delete that + old function. + \item It seems like my setpoint tracking cost function holds up to this + challenge. + \item What if we wanted a time-varying setpoint between measurements + and estimated measurable variables? I should probably test this? + -- {\bf Done.} Using a time-varying tracking cost between measurements + and estimated measurable variables seems to work. + \item Regardless, \texttt{get\_error\_disturbance\_cost} can go. +\end{itemize} + +\section{May 31, 2022} + +Modifying KH's branch to make sure it makes use of my existing tools whenever +possible. +\begin{itemize} + \item The test runs quickly. I can now make changes and run it to make sure I + didn't horribly break anything. + \item Remove json file for control inputs. -- {\bf Done.} + \item Make sure code works with both model linker and model helper for data + transfer between models -- {\bf Done.} The problem was that I wasn't + initializing the scalar variables. + \item Now I need to replace \texttt{get\_error\_disturbance\_cost} with + the time-varying setpoint tracking cost. +\end{itemize} + +Recall that this all falls into the category of making sure my data structures +hold up and can be used in KH's example. I need to make sure they can be used +everywhere I think they can be used. +These modifications would be sufficient for a PR into KH's branch. Then another +PR could incorporate whatever new functionality I decide to add. +Functionality to add: +\begin{itemize} + \item Data structure for scalar data for time-indexed variable. +\end{itemize} + \section{Mar 17, 2022} Where did the code in the \texttt{simple\_pipeline} directory go? diff --git a/nmpc_examples/mhe/mhe_constructor.py b/nmpc_examples/mhe/mhe_constructor.py new file mode 100644 index 0000000..c345c19 --- /dev/null +++ b/nmpc_examples/mhe/mhe_constructor.py @@ -0,0 +1,232 @@ +################################################################################# +# The Institute for the Design of Advanced Energy Systems Integrated Platform +# Framework (IDAES IP) was produced under the DOE Institute for the +# Design of Advanced Energy Systems (IDAES), and is copyright (c) 2018-2021 +# by the software owners: The Regents of the University of California, through +# Lawrence Berkeley National Laboratory, National Technology & Engineering +# Solutions of Sandia, LLC, Carnegie Mellon University, West Virginia University +# Research Corporation, et al. All rights reserved. +# +# Please see the files COPYRIGHT.md and LICENSE.md for full copyright and +# license information. +################################################################################# +from pyomo.core.base.set import Set +from pyomo.core.base.var import Var +from pyomo.core.base.constraint import Constraint +from pyomo.core.base.componentuid import ComponentUID +from pyomo.core.base.expression import Expression + +from nmpc_examples.nmpc.dynamic_data.series_data import get_time_indexed_cuid + + +def curr_sample_point(tp, sample_points): + """ + This function returns the sample point to which the given time element + belongs. + + Arguments + --------- + tp: float + Time element + sample_points: iterable + Set of sample points + + Returns + ------- + A sample point to which the given time element belongs. + + """ + for spt in sample_points: + if spt >= tp: + return spt + + +def construct_measurement_variables_constraints( + sample_points, + variables, +): + """ + This function constructs components for measurments, including: + 1. Index set for measurements + 2. Variables for measurements and measurement errors + 2. Constraints for measured variables, measurements, and measurement + errors. + + Arguments + --------- + sample_points: iterable + Set of sample points + variables: list + List of time-indexed variables that are measured + + Returns + ------- + meas_set: Pyomo Set + Index set for measurements + meas_var: Pyomo Var + Measurement variable, indexed by meas_set and sample_points + meas_error_var: Pyomo Var + Measurement error variable, indexed by meas_set and sample_points + meas_con: Pyomo Constraint + measurement constraint, indexed by meas_set and sample_points, + **measurement == measured_var + measurement_error** + + """ + meas_set = Set(initialize=range(len(variables))) + meas_var = Var(meas_set, sample_points) + meas_error_var = Var(meas_set, sample_points, initialize=0.0) + + def meas_con_rule(mod, index, sp): + return meas_var[index, sp] == \ + variables[index][sp] + meas_error_var[index, sp] + meas_con = Constraint( + meas_set, sample_points, rule=meas_con_rule, + ) + + return meas_set, meas_var, meas_error_var, meas_con + + +def construct_disturbed_model_constraints( + time, + sample_points, + mod_constraints, +): + """ + This function constructs components for model disturbances, including: + 1. Index set for model disturbances + 2. Variables for model disturbances + 3. Disturbed model constraints, consisting of original model constraints + and model disturbances. + + Arguments + ---------- + time: iterable + Set by which to index model constraints + sample_points: iterable + Set of sample points + mod_constraints : list + List of model constraints to add model disturbances + + Returns + ------- + disturbance_set: Pyomo Set + Index set of model disturbances + disturbance_var: Pyomo Var + Model disturbance variable, indexed by disturbance_set and time + disturbed_con: Pyomo Constraint + Model constraints with model disturbances, indexed by disturbance_set + and time, + ** original_constraint + disturbance == 0.0 ** + + """ + disturbance_set = Set(initialize=range(len(mod_constraints))) + disturbance_var = Var(disturbance_set, sample_points, initialize=0.0) + + def disturbed_con_rule(m, index, i): + # try: + con_ele = mod_constraints[index][i] + # except KeyError: + # if i == 0: + # # Assuem we're using an implicit time discretization. + # return Constraint.Skip + # else: + # raise KeyError(i) + + if not con_ele.equality: + raise RuntimeError(con_ele.name, " is not an equality constraint.") + if con_ele.lower.value != con_ele.upper.value: + raise RuntimeError(con_ele_name, " has different lower and upper " + "bounds.") + if con_ele.upper.value != 0.0: + raise RuntimeError(con_ele_name, " is an equality but its bound " + "is not zero.") + + spt = curr_sample_point(i, sample_points) + + return con_ele.body + disturbance_var[index, spt] == \ + con_ele.upper.value + disturbed_con = Constraint( + disturbance_set, time, rule=disturbed_con_rule, + ) + + return disturbance_set, disturbance_var, disturbed_con + + +def activate_disturbed_constraints_based_on_original_constraints( + time, + sample_points, + disturbance_var, + mod_constraints, + disturbed_con, +): + """ + This function activate the model constraint and deactivate the original + constraint, if the original constarint is active. Also, if all model + constraints within a specific sample period are not active, fix the + disturbance variable at zero. + + Parameters + ---------- + time: iterable + Time set which indexes model constraints + sample_points: iterable + Set of sample points + disturbance_var: Pyomo Var + Model disturbances + mod_constraints: list + List of original model constraints + disturbed_con: Pyomo Constraint + Model constraints with model disturbances + + """ + # Deactivate original equalities and activate disturbed equalities + for index, con in enumerate(mod_constraints): + for tp in time: + con_ele = con[tp] + if con_ele.active: + con_ele.deactivate() + disturbed_con[index, tp].activate() + else: + # If the original equality is not active, deactivate the + # disturbed one. + disturbed_con[index, tp].deactivate() + + # If all constraints in a specific sample time are not active, fix that + # model disturbance at zero. + spt_saw_tp = {spt:[] for spt in sample_points} + for tp in time: + spt = curr_sample_point(tp, sample_points) + spt_saw_tp[spt].append(tp) + + for index in range(len(mod_constraints)): + for spt in sample_points: + con_active_list = [ + disturbed_con[index, tp].active for tp in spt_saw_tp[spt] + ] + if not any(con_active_list): + disturbance_var[index, spt].fix(0.0) + + +def get_error_cost(variables, time, weight_data=None): + """ + """ + # I know that these variables are indexed by time (aren't VarData), so I + # don't need to send the time set to get_time_indexed_cuid + # + # NOTE: This name is slightly confusing. Something better might be + # get_cost_from_error_variables + # error_cost seems to imply an "error" between two quantities... + cuids = [get_time_indexed_cuid(var) for var in variables] + if weight_data is None: + weight_data = {cuid: 1.0 for cuid in cuids} + setpoint_data = {cuid: 0.0 for cuid in cuids} + from nmpc_examples.nmpc.cost_expressions import ( + get_tracking_cost_from_constant_setpoint, + ) + error_cost = get_tracking_cost_from_constant_setpoint( + variables, + time, + setpoint_data, + weight_data=weight_data, + ) + return error_cost diff --git a/nmpc_examples/pipeline/run_pipeline_mhe.py b/nmpc_examples/pipeline/run_pipeline_mhe.py new file mode 100644 index 0000000..dc74416 --- /dev/null +++ b/nmpc_examples/pipeline/run_pipeline_mhe.py @@ -0,0 +1,458 @@ +import pyomo.environ as pyo +from pyomo.dae import ContinuousSet +from pyomo.dae.flatten import flatten_dae_components +from pyomo.common.collections import ComponentMap + +from nmpc_examples.pipeline.pipeline_model import make_model + +from nmpc_examples.mhe.mhe_constructor import ( + construct_measurement_variables_constraints, + construct_disturbed_model_constraints, + get_error_cost, + activate_disturbed_constraints_based_on_original_constraints, +) +from nmpc_examples.nmpc.cost_expressions import ( + get_tracking_cost_from_time_varying_setpoint, +) + +from nmpc_examples.nmpc.model_linker import DynamicVarLinker +from nmpc_examples.nmpc.model_helper import DynamicModelHelper +from nmpc_examples.nmpc.dynamic_data.series_data import TimeSeriesData + +import json +import os +import matplotlib.pyplot as plt + +""" +Script to run an MHE simulation with a single natural gas pipeline model. +""" + + +def get_control_inputs(sample_period=2.0): + n_samples = 2 + control_input_time = [i*sample_period for i in range(n_samples)] + control_input_data = { + 'fs.pipeline.control_volume.flow_mass[*,1.0]': [ + 553003.1393240632, 535472.4292275416 + ], + 'fs.pipeline.control_volume.pressure[*,0]': [ + 58.905880452751845, 58.92036532907178 + ], + } + series = TimeSeriesData(control_input_data, control_input_time) + # Note that if we want a json representation of this data, we + # can always call json.dump(fp, series.to_serializable()). + return series + + +def run_mhe( + simulation_horizon=20.0, + estimator_horizon=20.0, + sample_period=2.0, + ): + """ + Run a simple MHE problem on a pipeline model. + """ + # + # Make dynamic model + # + nxfe = 4 + ntfe_per_sample = 4 + n_cycles = round(simulation_horizon/sample_period) + m_plant = make_model( + horizon=sample_period, + ntfe=ntfe_per_sample, + nxfe=nxfe, + ) + time = m_plant.fs.time + t0 = time.first() + tf = time.last() + + ipopt = pyo.SolverFactory("ipopt") + + # Fix "inputs" in dynamic model: inlet pressure and outlet flow rate + space = m_plant.fs.pipeline.control_volume.length_domain + x0 = space.first() + xf = space.last() + m_plant.fs.pipeline.control_volume.flow_mass[:, xf].fix() + m_plant.fs.pipeline.control_volume.pressure[:, x0].fix() + + # + # Make steady model for plant's and estimator's initializations + # + m_steady = make_model(dynamic=False, nxfe=nxfe) + m_steady.fs.pipeline.control_volume.flow_mass[:, x0].fix( + 3.0e5*pyo.units.kg/pyo.units.hr + ) + m_steady.fs.pipeline.control_volume.pressure[:, x0].fix( + 57.0*pyo.units.bar + ) + + ipopt.solve(m_steady, tee=True) + + # + # Extract data from steady state model + # + m_steady_helper = DynamicModelHelper(m_steady, m_steady.fs.time) + initial_data = m_steady_helper.get_data_at_time(t0) + scalar_data = m_steady_helper.get_scalar_variable_data() + + # + # Load data into dynamic model + # + use_linker = False + m_plant_helper = DynamicModelHelper(m_plant, m_plant.fs.time) + m_plant_helper.load_scalar_data(scalar_data) + if use_linker: + # If I want to use DynamicVarLinker: + # (E.g. if we didn't know that names in the steady model would + # be valid for the dynamic model.) + steady_scalar_vars, steady_dae_vars = flatten_dae_components( + m_steady, m_steady.fs.time, pyo.Var + ) + steady_dae_vars_in_plant = [ + m_plant.find_component(var.referent) for var in steady_dae_vars + ] + plant_steady_linker = DynamicVarLinker( + steady_dae_vars, + steady_dae_vars_in_plant, + m_steady.fs.time, + m_plant.fs.time, + ) + plant_steady_linker.transfer() + + else: + # If I want to use DynamicModelHelper: + m_plant_helper.load_data_at_time(initial_data) + + # Solve as a sanity check -- should be square with zero infeasibility + res = ipopt.solve(m_plant, tee=True) + pyo.assert_optimal_termination(res) + + # + # Initialize data structure for simulation data + # + sim_data = m_plant_helper.get_data_at_time([t0]) + + # + # Construct dynamic model for estimator + # + samples_per_estimator = round(estimator_horizon/sample_period) + ntfe_per_estimator = ntfe_per_sample * samples_per_estimator + m_estimator = make_model( + horizon=estimator_horizon, + ntfe=ntfe_per_estimator, + nxfe=nxfe, + ) + # Fix inputs at all time points + m_estimator.fs.pipeline.control_volume.pressure[:, x0].fix() + m_estimator.fs.pipeline.control_volume.flow_mass[:, xf].fix() + + # Make helper object w.r.t. time + m_estimator_time_helper = DynamicModelHelper( + m_estimator, m_estimator.fs.time + ) + + # + # Make sample-point set for measurements and model disturbances + # + sample_points = [ + t0 + sample_period*i for i in range(samples_per_estimator+1) + ] + m_estimator.fs.sample_points = ContinuousSet(initialize=sample_points) + + # + # Construct components for measurements and measurement errors + # + m_estimator.estimation_block = pyo.Block() + esti_blo = m_estimator.estimation_block + + cv = m_estimator.fs.pipeline.control_volume + measured_variables = [ + pyo.Reference(cv.flow_mass[:, x]) for x in list(space)[:-1] + ] + measurement_info = construct_measurement_variables_constraints( + m_estimator.fs.sample_points, + measured_variables, + ) + esti_blo.measurement_set = measurement_info[0] + esti_blo.measurement_variables = measurement_info[1] + # Measurement variables should be fixed all the time + esti_blo.measurement_variables.fix() + esti_blo.measurement_error_variables = measurement_info[2] + esti_blo.measurement_constraints = measurement_info[3] + + # + # Construct disturbed model constraints + # + flatten_momentum_balance = [ + pyo.Reference(cv.momentum_balance[:,x]) for x in space + ] + flatten_material_balance = [ + pyo.Reference(cv.material_balances[:,x,"Vap","natural_gas"]) + for x in list(space)[:-1] + ] + model_constraints_to_be_disturbed = \ + flatten_momentum_balance + flatten_material_balance + + model_disturbance_info = construct_disturbed_model_constraints( + m_estimator.fs.time, + m_estimator.fs.sample_points, + model_constraints_to_be_disturbed, + ) + esti_blo.disturbance_set = model_disturbance_info[0] + esti_blo.disturbance_variables = model_disturbance_info[1] + esti_blo.disturbed_constraints = model_disturbance_info[2] + + activate_disturbed_constraints_based_on_original_constraints( + m_estimator.fs.time, + m_estimator.fs.sample_points, + esti_blo.disturbance_variables, + model_constraints_to_be_disturbed, + esti_blo.disturbed_constraints, + ) + + # Make helper object w.r.t. sample points + m_estimator_spt_helper = DynamicModelHelper( + m_estimator, m_estimator.fs.sample_points + ) + + # + # Construct least square objective to minimize measurement errors + # and model disturbances + # + # This flag toggles between two different objective formulations. + # I included it just to test that we can support both. + error_var_objective = True + if error_var_objective: + error_vars = [ + pyo.Reference(esti_blo.measurement_error_variables[idx, :]) + for idx in esti_blo.measurement_set + ] + # This cost function penalizes the square of the "error variables" + m_estimator.measurement_error_cost = get_error_cost( + error_vars, m_estimator.fs.sample_points + ) + else: + measurement_map = ComponentMap( + (var, [ + esti_blo.measurement_variables[i, t] + for t in m_estimator.fs.sample_points + ]) + for i, var in enumerate(measured_variables) + ) + setpoint_data = TimeSeriesData( + measurement_map, m_estimator.fs.sample_points + ) + # This cost function penalizes the difference between measurable + # estimates and their corresponding measurements. + error_cost = get_tracking_cost_from_time_varying_setpoint( + measured_variables, m_estimator.fs.sample_points, setpoint_data + ) + m_estimator.measurement_error_cost = error_cost + + # + # Construct disturbance cost expression + # + disturbance_vars = [ + pyo.Reference(esti_blo.disturbance_variables[idx, :]) + for idx in esti_blo.disturbance_set + ] + + # We know what order we sent constraints to the disturbance constraint + # function, so we know which indices correspond to which balance equations + momentum_bal_indices = set(range(len(flatten_momentum_balance))) + material_bal_indices = set(range( + len(flatten_momentum_balance), + len(flatten_momentum_balance) + len(flatten_material_balance), + )) + weights = {} + for idx in esti_blo.disturbance_set: + if idx in momentum_bal_indices: + weight = 10.0 + elif idx in material_bal_indices: + weight = 20.0 + else: + weight = 1.0 + weights[esti_blo.disturbance_variables[idx, :]] = weight + m_estimator.model_disturbance_cost = get_error_cost( + disturbance_vars, m_estimator.fs.sample_points, weight_data=weights + ) + ### + + m_estimator.squred_error_disturbance_objective = pyo.Objective( + expr=(sum(m_estimator.measurement_error_cost.values()) + + sum(m_estimator.model_disturbance_cost.values()) + ) + ) + + # + # Initialize dynamic model to initial steady state + # + # TODO: load a time-series measurements & controls as initialization + m_estimator_time_helper.load_scalar_data(scalar_data) + m_estimator_time_helper.load_data_at_time(initial_data, m_estimator.fs.time) + + for index, var in enumerate(measured_variables): + for spt in m_estimator.fs.sample_points: + esti_blo.measurement_variables[index, spt].set_value(var[spt].value) + + # + # Initialize data sturcture for estimates + # + estimate_data = m_estimator_time_helper.get_data_at_time([t0]) + + # + # Set up a model linker to send measurements to estimator to update + # measurement variables + # + measured_variables_in_plant = [m_plant.find_component(var.referent) + for var in measured_variables + ] + flatten_measurements = [ + pyo.Reference(esti_blo.measurement_variables[index, :]) + for index in esti_blo.measurement_set + ] + measurement_linker = DynamicVarLinker( + measured_variables_in_plant, + flatten_measurements, + ) + + # Set up a model linker to send measurements to estimator to initialize + # measured variables + # + estimate_linker = DynamicVarLinker( + measured_variables_in_plant, + measured_variables, + ) + + # + # Load control input data for simulation + # + control_inputs = get_control_inputs() + + for i in range(n_cycles): + # time.first() in the model corresponds to sim_t0 in "simulation time" + # time.last() in the model corresponds to sim_tf in "simulation time" + sim_t0 = i*sample_period + sim_tf = (i + 1)*sample_period + + # + # Load inputs into plant + # + current_control = control_inputs.get_data_at_time(time=sim_t0) + non_initial_plant_time = list(m_plant.fs.time)[1:] + m_plant_helper.load_data_at_time( + current_control, non_initial_plant_time + ) + + res = ipopt.solve(m_plant, tee=True) + pyo.assert_optimal_termination(res) + + # + # Extract time series data from solved model + # + # Initial conditions have already been accounted for. + # Note that this is only correct because we're using an implicit + # time discretization. + non_initial_time = list(m_plant.fs.time)[1:] + model_data = m_plant_helper.get_data_at_time(non_initial_time) + + # + # Apply offset to data from model + # + model_data.shift_time_points(sim_t0) + + # + # Extend simulation data with result of new simulation + # + sim_data.concatenate(model_data) + + # + # Load measurements from plant to estimator + # + plant_tf = m_plant.fs.time.last() + estimator_tf = m_estimator.fs.time.last() + measurement_linker.transfer(plant_tf, estimator_tf) + + # + # Initialize measured variables within the last sample period to + # current measurements + # + last_sample_period = list(m_estimator.fs.time)[-ntfe_per_sample:] + estimate_linker.transfer(tf, last_sample_period) + + # Load inputs to estimator + m_estimator_time_helper.load_data_at_time( + current_control, last_sample_period + ) + + res = ipopt.solve(m_estimator, tee=True) + pyo.assert_optimal_termination(res) + + # + # Extract estimate data from estimator + # + estimator_data = m_estimator_time_helper.get_data_at_time([estimator_tf]) + # Shift time points from "estimator time" to "simulation time" + estimator_data.shift_time_points(sim_tf-estimator_tf) + + # + # Extend data structure of estimates + # + estimate_data.concatenate(estimator_data) + + # + # Re-initialize estimator model + # + m_estimator_time_helper.shift_values_by_time(sample_period) + m_estimator_spt_helper.shift_values_by_time(sample_period) + + # + # Re-initialize model to final values. + # This sets new initial conditions, including inputs. + # + m_plant_helper.copy_values_at_time(source_time=plant_tf) + + return sim_data, estimate_data + + +def plot_states_estimates_from_data( + state_data, + estimate_data, + names, + show=False + ): + state_time = state_data.get_time_points() + states = state_data.get_data() + estimate_time = estimate_data.get_time_points() + estimates = estimate_data.get_data() + for i, name in enumerate(names): + fig, ax = plt.subplots() + state_values = states[name] + estimate_values = estimates[name] + ax.plot(state_time, state_values, label="Plant states") + ax.plot(estimate_time, estimate_values, "o", label="Estimates") + ax.set_title(name) + ax.set_xlabel("Time (hr)") + ax.legend() + + if show: + fig.show() + else: + fname = "state_estimate%s.png" % i + fig.savefig(fname, transparent=False) + + +if __name__ == "__main__": + simulation_data, estimate_data = run_mhe( + simulation_horizon=4.0, + ) + plot_states_estimates_from_data( + simulation_data, + estimate_data, + [ + pyo.ComponentUID("fs.pipeline.control_volume.flow_mass[*,0.0]"), + pyo.ComponentUID("fs.pipeline.control_volume.pressure[*,1.0]"), + ], + ) diff --git a/nmpc_examples/pipeline/tests/test_pipeline_mhe.py b/nmpc_examples/pipeline/tests/test_pipeline_mhe.py new file mode 100644 index 0000000..949c5bd --- /dev/null +++ b/nmpc_examples/pipeline/tests/test_pipeline_mhe.py @@ -0,0 +1,63 @@ +import pyomo.common.unittest as unittest +import pyomo.environ as pyo + +from nmpc_examples.pipeline.run_pipeline_mhe import run_mhe + +class TestPipelineMHE(unittest.TestCase): + + def test_small_mhe_simulation(self): + simulation_data, estimate_data = run_mhe( + simulation_horizon=4.0, + sample_period=2.0, + # NOTE: When I reduce the estimator horizon to 4.0, the test + # runs much faster, but the results here don't change. + # Is this what I expect? + # It seems plausible, as the estimated values are the same + # as those simulated... + estimator_horizon=4.0, + ) + + # These values were not reproduced with another model, just taken + # from this MHE simulation. They are here to make sure the result + # doesn't change. + pred_inlet_flow = [ + 3.000e5, 4.927e5, 4.446e5, 4.460e5, 4.557e5, 4.653e5, + 4.722e5, 4.785e5, 4.842e5, + ] + pred_outlet_pressure = [ + 50.91, 47.82, 46.19, 45.06, 43.58, 43.08, 42.62, 42.21, 41.83, + ] + + pred_estimate_inlet_flow = [3.00000e5, 4.557e5, 4.842e5] + pred_estimate_outlet_pressure = [50.91, 43.58, 41.83] + + actual_inlet_flow = simulation_data.get_data()[ + pyo.ComponentUID("fs.pipeline.control_volume.flow_mass[*,0.0]") + ] + actual_outlet_pressure = simulation_data.get_data()[ + pyo.ComponentUID("fs.pipeline.control_volume.pressure[*,1.0]") + ] + + estimate_inlet_flow = estimate_data.get_data()[ + pyo.ComponentUID("fs.pipeline.control_volume.flow_mass[*,0.0]") + ] + estimate_outlet_pressure = estimate_data.get_data()[ + pyo.ComponentUID("fs.pipeline.control_volume.pressure[*,1.0]") + ] + + self.assertStructuredAlmostEqual( + pred_inlet_flow, actual_inlet_flow, reltol=1e-3 + ) + self.assertStructuredAlmostEqual( + pred_outlet_pressure, actual_outlet_pressure, reltol=1e-3 + ) + self.assertStructuredAlmostEqual( + pred_estimate_inlet_flow, estimate_inlet_flow, reltol=1e-3 + ) + self.assertStructuredAlmostEqual( + pred_estimate_outlet_pressure, estimate_outlet_pressure, reltol=1e-3 + ) + + +if __name__ == "__main__": + unittest.main()