diff --git a/corrai/sampling.py b/corrai/sampling.py index 678cfb1..791d665 100644 --- a/corrai/sampling.py +++ b/corrai/sampling.py @@ -1,5 +1,6 @@ from abc import ABC, abstractmethod from dataclasses import dataclass, field +from functools import wraps import numpy as np import pandas as pd @@ -19,6 +20,74 @@ from corrai.base.simulate import run_simulations +def plot_pcp( + parameter_values: np.ndarray, + parameter_names: list[str], + aggregated_results: pd.DataFrame, + *, + bounds: list[tuple[float, float]] | None = None, + color_by: str | None = None, + title: str | None = "Parallel Coordinates — Samples", + html_file_path: str | None = None, +) -> go.Figure: + """ + Creates a Parallel Coordinates Plot (PCP) for parameter samples and aggregated indicators. + Each vertical axis corresponds to a parameter or an aggregated indicator, + and each polyline represents one simulation. + """ + + if parameter_values.shape[0] != len(aggregated_results): + raise ValueError("Mismatch between number of samples and aggregated results.") + if len(parameter_names) != parameter_values.shape[1]: + raise ValueError( + "`parameter_names` length must match parameter_values.shape[1]." + ) + + df = pd.DataFrame( + parameter_values, columns=parameter_names, index=aggregated_results.index + ) + df = pd.concat([df, aggregated_results], axis=1) + + if color_by is None: + if not aggregated_results.empty: + color_by = aggregated_results.columns[0] + else: + color_by = parameter_names[0] + + dimensions = [] + for j, pname in enumerate(parameter_names): + dim = {"label": pname, "values": df[pname].to_numpy()} + if bounds is not None: + lb, ub = bounds[j] + dim["range"] = [lb, ub] + dimensions.append(dim) + + for col in aggregated_results.columns: + col_vals = df[col].to_numpy() + if np.all(np.isnan(col_vals)): + dim = {"label": col, "values": col_vals} + else: + vmin = float(np.nanmin(col_vals)) + vmax = float(np.nanmax(col_vals)) + if np.isfinite(vmin) and np.isfinite(vmax) and vmin != vmax: + dim = {"label": col, "values": col_vals, "range": [vmin, vmax]} + else: + dim = {"label": col, "values": col_vals} + dimensions.append(dim) + + line_kwargs = {} + if color_by is not None and color_by in df.columns: + line_kwargs = dict(color=df[color_by], colorscale="Viridis", showscale=True) + + fig = go.Figure(data=go.Parcoords(dimensions=dimensions, line=line_kwargs)) + fig.update_layout(title=title) + + if html_file_path: + fig.write_html(html_file_path) + + return fig + + @dataclass class Sample: """ @@ -191,7 +260,7 @@ def add_samples(self, values: np.ndarray, results: list[pd.DataFrame] = None): self.results = pd.concat([self.results, new_results], ignore_index=True) - def get_aggregate_time_series( + def get_aggregated_time_series( self, indicator: str, method: str = "mean", @@ -200,6 +269,109 @@ def get_aggregate_time_series( freq: str | pd.Timedelta | dt.timedelta = None, prefix: str = "aggregated", ) -> pd.DataFrame: + """ + Aggregate sample results using a specified statistical or error metric. + + This method extracts the specified `indicator` column, and aggregates + the time series across simulations using the given method. If a reference + time series is provided, metrics that require ground truth + (e.g., mean_absolute_error) are supported. + + If `freq` is provided, the aggregation is done over time bins, producing a + table of simulation runs versus time periods. + + Parameters + ---------- + indicator : str + The column name in each DataFrame to extract and aggregate. + + method : str, default="mean" + The aggregation method to use. Supported methods include: + - "mean" + - "sum" + - "nmbe" + - "cv_rmse" + - "mean_squared_error" + - "mean_absolute_error" + + agg_method_kwarg : dict, optional + Additional keyword arguments to pass to the aggregation function. + + reference_time_series : pandas.Series, optional + Reference series (`y_true`) to compare each simulation against. + Required for error-based methods such as "mean_absolute_error". + Must have the same datetime index and length as the individual simulation + results. + + freq : str or pandas.Timedelta or datetime.timedelta, optional + If provided, aggregate the time series within bins of this frequency + (e.g., "d" for daily, "h" for hourly). + The result will be a DataFrame where each row corresponds to a simulation and + each column to a time bin. + + prefix : str, default="aggregated" + Prefix to use for naming the output column when `freq` is not specified. + + Returns + ------- + pandas.DataFrame + If `freq` is not provided, returns a one-column DataFrame containing the + aggregated metric per simulation, indexed by the same index as `results`. + + If `freq` is provided, returns a DataFrame indexed by simulation IDs + (same as `results.index`), with columns representing each aggregated time bin. + + Raises + ------ + ValueError + If the shapes of `results` and `reference_time_series` are incompatible. + If the datetime index is not valid or missing. + + Examples + -------- + >>> import pandas as pd + >>> import numpy as np + + >>> from corrai.base.parameter import Parameter + >>> from corrai.sampling import Sample + + >>> sample = Sample( + ... parameters=[ + ... Parameter("a", interval=(1, 10)), + ... Parameter("b", interval=(1, 10)), + ... ] + ... ) + + >>> t = pd.date_range("2009-01-01", freq="h", periods=2) + >>> res_1 = pd.DataFrame({"a": [1, 2]}, index=t) + >>> res_2 = pd.DataFrame({"a": [3, 4]}, index=t) + + >>> sample.add_samples(np.array([[1, 2], [3, 4]]), [res_1, res_2]) + + >>> # No frequency aggregation: one aggregated value per simulation + >>> sample.get_aggregated_time_series("a") + aggregated_a + 0 1.5 + 1 3.5 + + >>> # With frequency aggregation: one value per time bin per simulation + >>> ref = pd.Series( + ... [1, 1], index=pd.date_range("2009-01-01", freq="h", periods=2) + ... ) + + >>> sample.get_aggregated_time_series( + ... indicator="a", + ... method="mean_absolute_error", + ... reference_time_series=ref, + ... freq="h", + ...) + + 2009-01-01 00:00:00 2009-01-01 01:00:00 + 0 0.0 1.0 + 1 2.0 3.0 + + """ + return aggregate_time_series( self.results, indicator, @@ -259,7 +431,7 @@ def plot_hist( go.Figure Plotly histogram figure. """ - res = self.get_aggregate_time_series( + res = self.get_aggregated_time_series( indicator, method, agg_method_kwarg, @@ -300,9 +472,9 @@ def plot_hist( ) return fig - def plot( + def plot_sample( self, - indicator: str | None = None, + indicator: str | None, reference_timeseries: pd.Series | None = None, title: str | None = None, y_label: str | None = None, @@ -310,23 +482,198 @@ def plot( alpha: float = 0.5, show_legends: bool = False, round_ndigits: int = 2, + quantile_band: float = 0.75, + type_graph: str = "area", ) -> go.Figure: - if self.results is None: - raise ValueError("No results available to plot. Run a simulation first.") + """ + Plot simulation results with different visualization modes. - return plot_sample( - results=self.results, - indicator=indicator, - reference_timeseries=reference_timeseries, + This function allows visualization of multiple simulation samples, + either as a scatter plot of all samples or as an aggregated area + with min–max envelope, median, and quantile bands. + + Parameters + ---------- + indicator : str, optional + Column name to extract if inner elements are DataFrames + with multiple columns. + reference_timeseries : pandas.Series, optional + A reference time series to plot alongside simulations + (e.g., measured data). + title : str, optional + Plot title. + y_label : str, optional + Label for the y-axis. + x_label : str, optional + Label for the x-axis. + alpha : float, default=0.5 + Opacity for scatter markers when ``type_graph="scatter"``. + show_legends : bool, default=False + Whether to display legends for each individual sample trace + when ``type_graph="scatter"``. + round_ndigits : int, default=2 + Number of digits for rounding parameter values in legend strings. + quantile_band : float, default=0.75 + Upper quantile to display when ``type_graph="area"``. + Both ``(1 - quantile_band)`` and ``quantile_band`` are drawn + as dotted lines, e.g. ``0.75`` → 25% and 75%. + type_graph : {"area", "scatter"}, default="area" + Visualization mode: + - ``"scatter"`` : plot all samples individually as scatter markers. + - ``"area"`` : plot aggregated area with min–max envelope, + median line, and quantile bands. + + Examples + -------- + >>> fig = plot_sample(results, reference_timeseries=ref) + >>> fig.show() + + >>> fig = plot_sample(results, reference_timeseries=ref, type_graph="scatter") + >>> fig.show() + """ + + if self.results.empty: + raise ValueError("`results` is empty. Simulate samples first.") + + def _legend_for(i: int) -> str: + if not show_legends: + return "Simulations" + parameter_names = [par.name for par in self.parameters] + vals = self.values[i, :] + return ", ".join( + f"{n}: {round(v, round_ndigits)}" for n, v in zip(parameter_names, vals) + ) + + series_list = [] + for sample in self.results: + if sample is None or sample.empty: + continue + series_list.append(sample[indicator]) + + if not series_list and reference_timeseries is None: + raise ValueError("No simulated data available to plot.") + + fig = go.Figure() + if type_graph == "scatter": + for i, s in enumerate(series_list): + fig.add_trace( + go.Scattergl( + name=_legend_for(i), + mode="markers", + x=s.index, + y=s.to_numpy(), + marker=dict(color=f"rgba(135,135,135,{alpha})"), + showlegend=show_legends, + ) + ) + if reference_timeseries is not None: + fig.add_trace( + go.Scattergl( + name="Reference", + mode="lines", + x=reference_timeseries.index, + y=reference_timeseries.to_numpy(), + line=dict(color="red", width=2), + showlegend=True, + ) + ) + + elif type_graph == "area": + df_all = pd.concat(series_list, axis=1) if series_list else None + + if df_all is not None: + lower = df_all.min(axis=1) + upper = df_all.max(axis=1) + + fig.add_trace( + go.Scatter( + x=upper.index, + y=upper.values, + line=dict(width=0), + mode="lines", + name="max", + showlegend=False, + ) + ) + fig.add_trace( + go.Scatter( + x=lower.index, + y=lower.values, + line=dict(width=0), + mode="lines", + fill="tonexty", + name="Area min - max", + fillcolor="rgba(255,165,0,0.4)", + showlegend=True, + ) + ) + + median = df_all.median(axis=1) + fig.add_trace( + go.Scatter( + x=median.index, + y=median.values, + mode="lines", + line=dict(color="black"), + name="Median", + showlegend=True, + ) + ) + + q_low = 1 - quantile_band + q_high = quantile_band + q1 = df_all.quantile(q_low, axis=1) + q2 = df_all.quantile(q_high, axis=1) + + fig.add_trace( + go.Scatter( + x=q1.index, + y=q1.values, + mode="lines", + line=dict(color="black", dash="dot"), + name="Quantiles", + showlegend=False, + ) + ) + fig.add_trace( + go.Scatter( + x=q2.index, + y=q2.values, + mode="lines", + line=dict(color="black", dash="dot"), + name="Quantiles", + showlegend=True, + ) + ) + + if reference_timeseries is not None: + fig.add_trace( + go.Scatter( + name="Reference", + mode="lines", + x=reference_timeseries.index, + y=reference_timeseries.to_numpy(), + line=dict(color="red"), + showlegend=True, + ) + ) + + else: + raise ValueError("`type_graph` must be either 'area' or 'scatter'.") + + if title is None: + title = f"Sample plot of {indicator} indicator" + else: + title = "Sample plot" + + fig.update_layout( title=title, - y_label=y_label, - x_label=x_label, - alpha=alpha, - show_legends=show_legends, - parameter_values=self.values, - parameter_names=[p.name for p in self.parameters], - round_ndigits=round_ndigits, + xaxis_title=x_label, + yaxis_title=y_label, + showlegend=True, + legend_traceorder="normal", ) + return fig class Sampler(ABC): @@ -437,28 +784,21 @@ def simulate_pending(self, n_cpu: int = 1, simulation_kwargs: dict = None): unsimulated_idx = self.sample.get_pending_index() self.simulate_at(unsimulated_idx, n_cpu, simulation_kwargs) + @wraps(Sample.plot_sample) def plot_sample( self, - indicator: str | None = None, + indicator: str | None, reference_timeseries: pd.Series | None = None, title: str | None = None, y_label: str | None = None, x_label: str | None = None, alpha: float = 0.5, show_legends: bool = False, - parameter_values: np.ndarray | None = None, - parameter_names: list[str] | None = None, round_ndigits: int = 2, quantile_band: float = 0.75, type_graph: str = "area", ) -> go.Figure: - if parameter_values is None: - parameter_values = self.values - if parameter_names is None: - parameter_names = [p.name for p in self.parameters] - - return plot_sample( - self.results, + return self.sample.plot_sample( indicator=indicator, reference_timeseries=reference_timeseries, title=title, @@ -466,13 +806,25 @@ def plot_sample( x_label=x_label, alpha=alpha, show_legends=show_legends, - parameter_values=parameter_values, - parameter_names=parameter_names, round_ndigits=round_ndigits, quantile_band=quantile_band, type_graph=type_graph, ) + @wraps(Sample.get_aggregated_time_series) + def get_sample_aggregated_time_series( + self, + indicator: str, + method: str = "mean", + agg_method_kwarg: dict = None, + reference_time_series: pd.Series = None, + freq: str | pd.Timedelta | dt.timedelta = None, + prefix: str = "aggregated", + ): + return self.sample.get_aggregated_time_series( + indicator, method, agg_method_kwarg, reference_time_series, freq, prefix + ) + def plot_pcp( self, indicator: str | None = None, @@ -770,314 +1122,3 @@ def add_sample( **sobol_kwargs, ) self._post_draw_sample(new_sample, simulate, n_cpu, sample_is_dimless=False) - - -def plot_sample( - results: pd.Series, - indicator: str | None = None, - reference_timeseries: pd.Series | None = None, - title: str | None = None, - y_label: str | None = None, - x_label: str | None = None, - alpha: float = 0.5, - show_legends: bool = False, - parameter_values: np.ndarray | None = None, - parameter_names: list[str] | None = None, - round_ndigits: int = 2, - quantile_band: float = 0.75, - type_graph: str = "area", -) -> go.Figure: - """ - Plot simulation results with different visualization modes. - - This function allows visualization of multiple simulation samples, - either as a scatter plot of all samples or as an aggregated area - with min–max envelope, median, and quantile bands. - - Parameters - ---------- - results : pandas.Series - A Series where each element is either a pandas Series or - a pandas DataFrame containing simulation results. Empty - elements are ignored. - indicator : str, optional - Column name to extract if inner elements are DataFrames - with multiple columns. If ``None`` and the DataFrame has - exactly one column, that column is used. - reference_timeseries : pandas.Series, optional - A reference time series to plot alongside simulations - (e.g., measured data). - title : str, optional - Plot title. - y_label : str, optional - Label for the y-axis. - x_label : str, optional - Label for the x-axis. - alpha : float, default=0.5 - Opacity for scatter markers when ``type_graph="scatter"``. - show_legends : bool, default=False - Whether to display legends for each individual sample trace - when ``type_graph="scatter"``. - parameter_values : numpy.ndarray, optional - Array of shape (n_samples, n_params) with the parameter values - used per sample. Only used for legend strings when - ``show_legends=True``. - parameter_names : list of str, optional - Names of the parameters (same order as in ``parameter_values``). - round_ndigits : int, default=2 - Number of digits for rounding parameter values in legend strings. - quantile_band : float, default=0.75 - Upper quantile to display when ``type_graph="area"``. - Both ``(1 - quantile_band)`` and ``quantile_band`` are drawn - as dotted lines, e.g. ``0.75`` → 25% and 75%. - type_graph : {"area", "scatter"}, default="area" - Visualization mode: - - ``"scatter"`` : plot all samples individually as scatter markers. - - ``"area"`` : plot aggregated area with min–max envelope, - median line, and quantile bands. - - Examples - -------- - >>> fig = plot_sample(results, reference_timeseries=ref) - >>> fig.show() - - >>> fig = plot_sample(results, reference_timeseries=ref, type_graph="scatter") - >>> fig.show() - """ - if not isinstance(results, pd.Series): - raise ValueError("`results` must be a pandas Series.") - if results.empty: - raise ValueError("`results` is empty. Simulate samples first.") - - ref_name = getattr(reference_timeseries, "name", None) - - def _to_series(obj, indicator_, ref_name_): - if isinstance(obj, pd.Series): - return obj - if isinstance(obj, pd.DataFrame): - if obj.empty: - return None - if indicator_ is not None: - return obj[indicator_] - if ref_name_ is not None and ref_name_ in obj.columns: - return obj[ref_name_] - if obj.shape[1] == 1: - return obj.iloc[:, 0] - raise ValueError( - "Provide `indicator`: multiple columns in the sample DataFrame." - ) - return None - - def _legend_for(i: int) -> str: - if not show_legends: - return "Simulations" - if parameter_values is None or parameter_names is None: - return f"Sample {i}" - vals = parameter_values[i] - return ", ".join( - f"{n}: {round(v, round_ndigits)}" for n, v in zip(parameter_names, vals) - ) - - fig = go.Figure() - series_list = [] - df_all = None - - for sample in results: - s = _to_series(sample, indicator, ref_name) - if s is None or s.empty: - continue - series_list.append(s) - - if not series_list and reference_timeseries is None: - raise ValueError("No simulated data available to plot.") - - if type_graph == "scatter": - for i, s in enumerate(series_list): - fig.add_trace( - go.Scattergl( - name=_legend_for(i), - mode="markers", - x=s.index, - y=s.to_numpy(), - marker=dict(color=f"rgba(135,135,135,{alpha})"), - showlegend=show_legends, - ) - ) - if reference_timeseries is not None: - fig.add_trace( - go.Scatter( - name="Reference", - mode="lines", - x=reference_timeseries.index, - y=reference_timeseries.to_numpy(), - line=dict(color="red"), - showlegend=True, - ) - ) - - elif type_graph == "area": - df_all = pd.concat(series_list, axis=1) if series_list else None - - if df_all is not None: - lower = df_all.min(axis=1) - upper = df_all.max(axis=1) - - fig.add_trace( - go.Scatter( - x=upper.index, - y=upper.values, - line=dict(width=0), - mode="lines", - name="max", - showlegend=False, - ) - ) - fig.add_trace( - go.Scatter( - x=lower.index, - y=lower.values, - line=dict(width=0), - mode="lines", - fill="tonexty", - name="Area min - max", - fillcolor="rgba(255,165,0,0.4)", - showlegend=True, - ) - ) - - median = df_all.median(axis=1) - fig.add_trace( - go.Scatter( - x=median.index, - y=median.values, - mode="lines", - line=dict(color="black"), - name="Median", - showlegend=True, - ) - ) - - q_low = 1 - quantile_band - q_high = quantile_band - q1 = df_all.quantile(q_low, axis=1) - q2 = df_all.quantile(q_high, axis=1) - - fig.add_trace( - go.Scatter( - x=q1.index, - y=q1.values, - mode="lines", - line=dict(color="black", dash="dot"), - name="Quantiles", - showlegend=False, - ) - ) - fig.add_trace( - go.Scatter( - x=q2.index, - y=q2.values, - mode="lines", - line=dict(color="black", dash="dot"), - name="Quantiles", - showlegend=True, - ) - ) - - if reference_timeseries is not None: - fig.add_trace( - go.Scatter( - name="Reference", - mode="lines", - x=reference_timeseries.index, - y=reference_timeseries.to_numpy(), - line=dict(color="red"), - showlegend=True, - ) - ) - - else: - raise ValueError("`type_graph` must be either 'area' or 'scatter'.") - - # === titre auto si pas fourni === - if title is None: - if indicator is not None: - title = f"Sample plot of {indicator} indicator" - else: - title = "Sample plot" - - fig.update_layout( - title=title, - xaxis_title=x_label, - yaxis_title=y_label, - showlegend=True, - legend_traceorder="normal", - ) - return fig - - -def plot_pcp( - parameter_values: np.ndarray, - parameter_names: list[str], - aggregated_results: pd.DataFrame, - *, - bounds: list[tuple[float, float]] | None = None, - color_by: str | None = None, - title: str | None = "Parallel Coordinates — Samples", - html_file_path: str | None = None, -) -> go.Figure: - """ - Creates a Parallel Coordinates Plot (PCP) for parameter samples and aggregated indicators. - Each vertical axis corresponds to a parameter or an aggregated indicator, - and each polyline represents one simulation. - """ - - if parameter_values.shape[0] != len(aggregated_results): - raise ValueError("Mismatch between number of samples and aggregated results.") - if len(parameter_names) != parameter_values.shape[1]: - raise ValueError( - "`parameter_names` length must match parameter_values.shape[1]." - ) - - df = pd.DataFrame( - parameter_values, columns=parameter_names, index=aggregated_results.index - ) - df = pd.concat([df, aggregated_results], axis=1) - - if color_by is None: - if not aggregated_results.empty: - color_by = aggregated_results.columns[0] - else: - color_by = parameter_names[0] - - dimensions = [] - for j, pname in enumerate(parameter_names): - dim = {"label": pname, "values": df[pname].to_numpy()} - if bounds is not None: - lb, ub = bounds[j] - dim["range"] = [lb, ub] - dimensions.append(dim) - - for col in aggregated_results.columns: - col_vals = df[col].to_numpy() - if np.all(np.isnan(col_vals)): - dim = {"label": col, "values": col_vals} - else: - vmin = float(np.nanmin(col_vals)) - vmax = float(np.nanmax(col_vals)) - if np.isfinite(vmin) and np.isfinite(vmax) and vmin != vmax: - dim = {"label": col, "values": col_vals, "range": [vmin, vmax]} - else: - dim = {"label": col, "values": col_vals} - dimensions.append(dim) - - line_kwargs = {} - if color_by is not None and color_by in df.columns: - line_kwargs = dict(color=df[color_by], colorscale="Viridis", showscale=True) - - fig = go.Figure(data=go.Parcoords(dimensions=dimensions, line=line_kwargs)) - fig.update_layout(title=title) - - if html_file_path: - fig.write_html(html_file_path) - - return fig diff --git a/corrai/sensitivity.py b/corrai/sensitivity.py index c415762..1049dca 100644 --- a/corrai/sensitivity.py +++ b/corrai/sensitivity.py @@ -86,184 +86,6 @@ def values(self): def results(self): return self.sampler.results - @wraps(Sample.plot_hist) - def plot_sample_hist( - self, - indicator: str, - method: str = "mean", - unit: str = "", - agg_method_kwarg: dict = None, - reference_time_series: pd.Series = None, - bins: int = 30, - colors: str = "orange", - reference_value: int | float = None, - reference_label: str = "Reference", - show_rug: bool = False, - title: str = None, - ): - return self.sampler.sample.plot_hist( - indicator=indicator, - method=method, - unit=unit, - agg_method_kwarg=agg_method_kwarg, - reference_time_series=reference_time_series, - bins=bins, - colors=colors, - reference_value=reference_value, - reference_label=reference_label, - show_rug=show_rug, - title=title, - ) - - def plot_sample( - self, - indicator: str | None = None, - reference_timeseries: pd.Series | None = None, - title: str | None = None, - y_label: str | None = None, - x_label: str | None = None, - alpha: float = 0.5, - show_legends: bool = False, - parameter_values: np.ndarray | None = None, - parameter_names: list[str] | None = None, - round_ndigits: int = 2, - quantile_band: float = 0.75, - type_graph: str = "area", - ) -> go.Figure: - """ - Plot simulation runs against an optional reference time series. - - This method wraps :meth:`Sampler.plot_sample` and plots the simulations - associated with this sensitivity analysis instance. It supports both - scatter plots of all runs or aggregated area plots with envelopes, - quantiles, and median. - - Parameters - ---------- - indicator : str, optional - Column name to select if simulation outputs are DataFrames with multiple - columns. If None and a DataFrame has a single column, that column is used. - reference_timeseries : pandas.Series, optional - A time series to plot as ground truth or reference, shown as a red line. - title : str, optional - Plot title. - y_label : str, optional - Label for the y-axis. - x_label : str, optional - Label for the x-axis. - alpha : float, default=0.5 - Opacity for scatter markers when ``type_graph='scatter'``. - show_legends : bool, default=False - Whether to display a legend entry for each sample trace. - parameter_values : numpy.ndarray, optional - Custom parameter values for legend annotation. If None, values from - this analysis instance are used. - parameter_names : list of str, optional - Custom parameter names. If None, names from this analysis instance are used. - round_ndigits : int, default=2 - Number of decimal digits for rounding parameter values in legends. - quantile_band : float, default=0.75 - Upper quantile to display when ``type_graph='area'``. - Both ``(1 - quantile_band)`` and ``quantile_band`` are drawn - as dotted lines (e.g. 0.75 → 25% and 75%). - type_graph : {"area", "scatter"}, default="area" - Visualization mode: - - ``"scatter"`` : plot all runs individually as scatter markers. - - ``"area"`` : plot aggregated area with min–max envelope, - median line, and quantile bands. - - Returns - ------- - plotly.graph_objects.Figure - A Plotly Figure containing the simulation runs and optional reference. - """ - return self.sampler.plot_sample( - indicator=indicator, - reference_timeseries=reference_timeseries, - title=title, - y_label=y_label, - x_label=x_label, - alpha=alpha, - show_legends=show_legends, - parameter_values=parameter_values, - parameter_names=parameter_names, - round_ndigits=round_ndigits, - quantile_band=quantile_band, - type_graph=type_graph, - ) - - def plot_pcp( - self, - indicator: str | None = None, - method: str = "mean", - agg_method_kwarg: dict = None, - reference_time_series: pd.Series = None, - freq: str | pd.Timedelta | dt.timedelta = None, - prefix: str | None = None, - bounds: list[tuple[float, float]] | None = None, - color_by: str | None = None, - title: str | None = "Parallel Coordinates - Samples", - html_file_path: str | None = None, - ) -> go.Figure: - """ - Create a Parallel Coordinates Plot (PCP) of parameters and aggregated results. - - Each vertical axis corresponds to a parameter or an aggregated indicator, - and each polyline represents one simulation. Useful for visualizing the - relationship between sampled parameters and performance metrics. - - This method wraps :meth:`Sampler.plot_pcp`. - - Parameters - ---------- - indicator : str, optional - Indicator name to extract from simulation results before aggregation. - If None, only parameters are shown. - method : str, default="mean" - Aggregation method to apply. Supported values include: - - "mean" - - "sum" - - "nmbe" - - "cv_rmse" - - "mean_squared_error" - - "mean_absolute_error" - agg_method_kwarg : dict, optional - Extra keyword arguments passed to the aggregation function. - reference_time_series : pd.Series, optional - Required for error-based methods (e.g., "cv_rmse"). Must have the same - index and length as each simulation. - freq : str or pd.Timedelta or datetime.timedelta, optional - If provided, aggregation is performed per time bin. - prefix : str, optional - Custom prefix for naming aggregated columns. Defaults to the method name. - bounds : list of tuple(float, float), optional - Parameter bounds for each parameter axis. - color_by : str, optional - Column name (parameter or aggregate) used to color polylines. - title : str, optional - Figure title. - html_file_path : str, optional - If provided, saves the plot as an interactive HTML file. - - Returns - ------- - go.Figure - A Plotly Figure with the parallel coordinates visualization. - """ - - return self.sampler.plot_pcp( - indicator=indicator, - method=method, - agg_method_kwarg=agg_method_kwarg, - reference_time_series=reference_time_series, - freq=freq, - prefix=prefix, - bounds=bounds, - color_by=color_by, - title=title, - html_file_path=html_file_path, - ) - @abstractmethod def _set_sampler( self, parameters: list[Parameter], model: Model, simulation_options: dict = None @@ -340,7 +162,7 @@ def analyze( - If `x_needed=True`, the analyser will receive both `X` and `Y`. - The analyser is typically an object from SALib. """ - agg_result = self.sampler.sample.get_aggregate_time_series( + agg_result = self.sampler.sample.get_aggregated_time_series( indicator, method, agg_method_kwarg, @@ -484,46 +306,107 @@ def salib_plot_dynamic_metric( return plot_dynamic_metric(metrics, sensitivity_metric, unit, title, stacked) - # def plot_pcp( - # self, - # aggregations: dict | None = None, # <= optionnel - # *, - # bounds: list[tuple[float, float]] | None = None, - # color_by: str | None = None, - # title: str | None = "Parallel Coordinates — Samples", - # html_file_path: str | None = None, - # ): - # """ - # Parallel Coordinates Plot basé sur les échantillons et résultats présents dans l'analyse. - # - # Parameters - # ---------- - # aggregations : dict - # {indicator: callable | [callable] | {label: callable}} - # Ex. {"res": [np.sum, np.mean]} -> colonnes "res:sum", "res:mean". - # bounds : list[(float, float)] | None - # Bornes (min, max) par paramètre (même ordre que les paramètres). Si None, autoscale. - # color_by : str | None - # Nom d'une dimension (paramètre ou indicateur agrégé) pour colorer les lignes. - # title : str | None - # Titre. - # html_file_path : str | None - # Si fourni, export HTML. - # """ - # results = self.sampler.sample.results - # parameter_values = self.sampler.sample.values - # parameter_names = [p.name for p in self.sampler.sample.parameters] - # - # return _plot_pcp( - # results=results, - # parameter_values=parameter_values, - # parameter_names=parameter_names, - # aggregations=aggregations, - # bounds=bounds, - # color_by=color_by, - # title=title, - # html_file_path=html_file_path, - # ) + @wraps(Sample.get_aggregated_time_series) + def get_sample_aggregated_time_series( + self, + indicator: str, + method: str = "mean", + agg_method_kwarg: dict = None, + reference_time_series: pd.Series = None, + freq: str | pd.Timedelta | dt.timedelta = None, + prefix: str = "aggregated", + ) -> pd.DataFrame: + return self.sampler.sample.get_aggregated_time_series( + self.results, + indicator, + method, + agg_method_kwarg, + reference_time_series, + freq, + prefix, + ) + + @wraps(Sample.plot_hist) + def plot_sample_hist( + self, + indicator: str, + method: str = "mean", + unit: str = "", + agg_method_kwarg: dict = None, + reference_time_series: pd.Series = None, + bins: int = 30, + colors: str = "orange", + reference_value: int | float = None, + reference_label: str = "Reference", + show_rug: bool = False, + title: str = None, + ): + return self.sampler.sample.plot_hist( + indicator=indicator, + method=method, + unit=unit, + agg_method_kwarg=agg_method_kwarg, + reference_time_series=reference_time_series, + bins=bins, + colors=colors, + reference_value=reference_value, + reference_label=reference_label, + show_rug=show_rug, + title=title, + ) + + @wraps(Sample.plot_sample) + def plot_sample( + self, + indicator: str | None, + reference_timeseries: pd.Series | None = None, + title: str | None = None, + y_label: str | None = None, + x_label: str | None = None, + alpha: float = 0.5, + show_legends: bool = False, + round_ndigits: int = 2, + quantile_band: float = 0.75, + type_graph: str = "area", + ) -> go.Figure: + return self.sampler.sample.plot_sample( + indicator=indicator, + reference_timeseries=reference_timeseries, + title=title, + y_label=y_label, + x_label=x_label, + alpha=alpha, + show_legends=show_legends, + round_ndigits=round_ndigits, + quantile_band=quantile_band, + type_graph=type_graph, + ) + + def plot_pcp( + self, + indicator: str | None = None, + method: str = "mean", + agg_method_kwarg: dict = None, + reference_time_series: pd.Series = None, + freq: str | pd.Timedelta | dt.timedelta = None, + prefix: str | None = None, + bounds: list[tuple[float, float]] | None = None, + color_by: str | None = None, + title: str | None = "Parallel Coordinates - Samples", + html_file_path: str | None = None, + ) -> go.Figure: + return self.sampler.plot_pcp( + indicator=indicator, + method=method, + agg_method_kwarg=agg_method_kwarg, + reference_time_series=reference_time_series, + freq=freq, + prefix=prefix, + bounds=bounds, + color_by=color_by, + title=title, + html_file_path=html_file_path, + ) def salib_plot_matrix( self, diff --git a/tests/test_sampling.py b/tests/test_sampling.py index bb1fed8..1a4a095 100644 --- a/tests/test_sampling.py +++ b/tests/test_sampling.py @@ -4,7 +4,6 @@ from corrai.base.parameter import Parameter from corrai.sampling import ( - plot_sample, plot_pcp, LHSSampler, MorrisSampler, @@ -43,108 +42,101 @@ class TestSample: - def test_plot_hist(self): - sampler = LHSSampler( - parameters=REAL_PARAM, - model=Pymodel(), - simulation_options=SIMULATION_OPTIONS, - ) - sampler.add_sample(3, 42, simulate=True) + def test_sample_functions(self): + sample = Sample(REAL_PARAM) + assert sample.values.shape == (0, 3) + pd.testing.assert_series_equal(sample.results, pd.Series()) - fig = sampler.sample.plot_hist( - indicator="res", - method="mean", - unit="J", - bins=10, - colors="orange", - reference_value=70, - show_rug=True, + sample.add_samples( + np.array([[1, 0.9, 10], [3, 0.85, 20]]), + [ + pd.DataFrame(), + pd.DataFrame( + {"res": [1, 2]}, index=pd.date_range("2009", freq="h", periods=2) + ), + ], ) - assert isinstance(fig, go.Figure) - assert fig.layout.title.text == "Sample distribution of mean res" + assert sample.get_pending_index().tolist() == [True, False] + assert sample.values.tolist() == [[1.0, 0.9, 10.0], [3.0, 0.85, 20.0]] + assert sample.get_parameters_intervals().tolist() == [ + [0.0, 10.0], + [0.8, 1.2], + [0.0, 100.0], + ] + assert sample.get_list_parameter_value_pairs(sample.get_pending_index()) == [ + [(REAL_PARAM[0], 1.0), (REAL_PARAM[1], 0.9), (REAL_PARAM[2], 10.0)], + ] - hist_traces = [tr for tr in fig.data if tr.type == "histogram"] - assert len(hist_traces) == 1 - hist = hist_traces[0] - assert len(hist.x) == len(sampler.results) + assert len(sample) == 2 - def test_plot_pcp(self): - t = pd.date_range("2025-01-01 00:00:00", periods=2, freq="h") - df1 = pd.DataFrame({"res": [1.0, 2.0]}, index=t) - df2 = pd.DataFrame({"res": [3.0, 4.0]}, index=t) - df3 = pd.DataFrame({"res": [5.0, 6.0]}, index=t) - results = pd.Series([df1, df2, df3]) + item = sample[1] + assert isinstance(item, dict) + assert np.allclose(item["values"], [3.0, 0.85, 20.0]) + pd.testing.assert_frame_equal(item["results"], sample.results.iloc[1]) - param_names = ["p1", "p2"] - param_values = np.array( - [ - [1.1, 2.2], - [3.3, 4.4], - [5.5, 6.6], - ] - ) + new_result = pd.DataFrame({"res": [42]}, index=pd.date_range("2009", periods=1)) + sample[1] = {"results": new_result} + pd.testing.assert_frame_equal(sample.results.iloc[1], new_result) - agg_sum = aggregate_time_series( - results, indicator="res", method="sum", prefix="sum" - ) - agg_mean = aggregate_time_series( - results, indicator="res", method="mean", prefix="mean" + sample[0] = { + "values": np.array([9.9, 1.1, 88]), + "results": pd.DataFrame({"res": [123]}, index=[pd.Timestamp("2009-01-01")]), + } + np.testing.assert_allclose(sample.values[0], [9.9, 1.1, 88]) + assert not sample.results.iloc[0].empty + + dimless_val = sample.get_dimension_less_values() + np.testing.assert_allclose( + dimless_val, np.array([[0.99, 0.75, 0.88], [0.3, 0.125, 0.2]]) ) - aggregated = pd.concat([agg_sum, agg_mean], axis=1) - fig = plot_pcp( - parameter_values=param_values, - parameter_names=param_names, - aggregated_results=aggregated, - color_by="sum_res", - title="Parallel Coordinates — Samples", + pd.testing.assert_frame_equal( + sample.get_aggregated_time_series("res"), + pd.DataFrame([123.0, 42.0], [0, 1], columns=["aggregated_res"]), ) - assert isinstance(fig, go.Figure) - assert len(fig.data) == 1 - pc = fig.data[0] - np.testing.assert_allclose(pc.dimensions[0]["values"], [1.1, 3.3, 5.5]) # p1 + fig = sample.plot_hist("res") + assert fig.layout.title["text"] == "Sample distribution of mean res" + assert fig.layout.xaxis.title["text"] == "mean res " - def test_plot_pcp_in_sampler(self): - sampler = LHSSampler( - parameters=REAL_PARAM, - model=Pymodel(), - simulation_options=SIMULATION_OPTIONS, - ) - sampler.add_sample(3, 42, simulate=True) + fig = sample.plot_sample("res") + assert fig - fig = sampler.plot_pcp( - indicator="res", - ) - assert isinstance(fig, go.Figure) - assert len(fig.data) == 1 + sample._validate() def test_plot_sample(self): t = pd.date_range("2025-01-01 00:00:00", periods=2, freq="h") df1 = pd.DataFrame({"res": [1.0, 2.0]}, index=t) df2 = pd.DataFrame({"res": [3.0, 4.0]}, index=t) df3 = pd.DataFrame({"res": [5.0, 6.0]}, index=t) - results = pd.Series([df1, df2, df3]) + ref = pd.Series([2.0, 2.0], index=t) - param_names = ["p1", "p2"] - param_values = np.array([[1.1, 2.2], [3.3, 4.4], [5.5, 6.6]]) + sample = Sample( + parameters=[ + Parameter("p1", interval=(0, 10)), + Parameter("p2", interval=(0, 10)), + ] + ) - fig = plot_sample( - results=results, + sample.add_samples( + np.array([[1.1, 2.2], [3.3, 4.4], [5.5, 6.6]]), [df1, df2, df3] + ) + + fig = sample.plot_sample( + indicator="res", reference_timeseries=ref, title="test", x_label="time", y_label="value", alpha=0.3, show_legends=True, - parameter_values=param_values, - parameter_names=param_names, type_graph="scatter", ) assert isinstance(fig, go.Figure) assert len(fig.data) == 4 + np.testing.assert_allclose(fig.data[0]["y"], df1["res"].to_numpy()) np.testing.assert_allclose(fig.data[-1]["y"], ref.to_numpy()) @@ -152,39 +144,24 @@ def test_plot_sample(self): assert fig.data[1].name == "p1: 3.3, p2: 4.4" assert fig.data[2].name == "p1: 5.5, p2: 6.6" - df_multi = pd.concat( - [df1.rename(columns={"res": "a"}), df2.rename(columns={"res": "b"})], axis=1 - ) - results_multi = pd.Series([df_multi]) - with pytest.raises(ValueError, match="Provide `indicator`: multiple columns"): - plot_sample(results=results_multi, type_graph="scatter") - fig_a = plot_sample(results=results_multi, indicator="a") - y_values = np.array(fig_a.data[0].y) - np.testing.assert_allclose(y_values, df_multi["a"].to_numpy()) - - # empty Series - with pytest.raises(ValueError): - plot_sample(results=pd.Series(dtype=object), type_graph="scatter") - # Partial simulation empty_df = pd.DataFrame({"res": []}) - results_partial = pd.Series([empty_df, df1, empty_df]) - fig_partial = plot_sample( - results=results_partial, indicator="res", type_graph="scatter" - ) + sample[:] = {"results": [empty_df, df1, empty_df]} + + fig_partial = sample.plot_sample(indicator="res", type_graph="scatter") # Only 1 non-empty sample assert len(fig_partial.data) == 1 np.testing.assert_allclose(fig_partial.data[0]["y"], df1["res"].to_numpy()) # All results empty and no reference - results_all_empty = pd.Series([empty_df, empty_df]) + sample[:] = {"results": [empty_df] * 3} + with pytest.raises(ValueError, match="No simulated data available to plot."): - plot_sample(results_all_empty, indicator="res", type_graph="scatter") + sample.plot_sample(indicator="res", type_graph="scatter") # All results empty but with reference - fig_ref_only = plot_sample( - results_all_empty, + fig_ref_only = sample.plot_sample( indicator="res", reference_timeseries=ref, type_graph="scatter", @@ -192,19 +169,20 @@ def test_plot_sample(self): assert len(fig_ref_only.data) == 1 np.testing.assert_allclose(fig_ref_only.data[0]["y"], ref.to_numpy()) - def test_plot_sample_area(self): t = pd.date_range("2025-01-01 00:00:00", periods=3, freq="h") df1 = pd.DataFrame({"res": [1.0, 2.0, 3.0]}, index=t) df2 = pd.DataFrame({"res": [2.0, 3.0, 4.0]}, index=t) df3 = pd.DataFrame({"res": [3.0, 4.0, 5.0]}, index=t) - results = pd.Series([df1, df2, df3]) ref = pd.Series([2.0, 2.5, 3.0], index=t) - fig = plot_sample( - results=results, + sample[:] = {"results": [df1, df2, df3]} + + fig = sample.plot_sample( + indicator="res", reference_timeseries=ref, type_graph="area", ) + assert isinstance(fig, go.Figure) assert len(fig.data) == 6 np.testing.assert_allclose(fig.data[-1]["y"], ref.to_numpy()) @@ -214,7 +192,18 @@ def test_plot_sample_area(self): assert "Median" in names assert "Quantiles" in names - def test_plot_sample_in_sampler(self): + fig = sample.plot_sample( + indicator="res", + reference_timeseries=ref, + show_legends=False, + type_graph="scatter", + ) + assert len(fig.data) == 4 + assert fig.data[0].mode == "markers" + np.testing.assert_allclose(np.array(fig.data[-1].y), ref.to_numpy()) + assert fig.data[-1].mode == "lines" + + def test_plot_hist(self): sampler = LHSSampler( parameters=REAL_PARAM, model=Pymodel(), @@ -222,102 +211,74 @@ def test_plot_sample_in_sampler(self): ) sampler.add_sample(3, 42, simulate=True) - fig = sampler.plot_sample( + fig = sampler.sample.plot_hist( indicator="res", - reference_timeseries=None, - title="test", - x_label="time", - y_label="value", - alpha=0.3, - show_legends=True, - type_graph="scatter", + method="mean", + unit="J", + bins=10, + colors="orange", + reference_value=70, + show_rug=True, ) - assert isinstance(fig, go.Figure) - assert len(fig.data) == 3 - assert fig.layout.title.text == "test" - - def test_plot_sample_infer_indicator_from_reference_name(self): - t = pd.date_range("2025-01-01 00:00:00", periods=3, freq="h") - df = pd.DataFrame({"a": [1.0, 2.0, 3.0], "b": [10.0, 20.0, 30.0]}, index=t) - results = pd.Series([df]) - - ref = pd.Series([0.0, 0.0, 0.0], index=t, name="a") # <- nom = "a" - fig = plot_sample( - results=results, - reference_timeseries=ref, - show_legends=False, - type_graph="scatter", - ) - assert len(fig.data) == 2 + assert isinstance(fig, go.Figure) + assert fig.layout.title.text == "Sample distribution of mean res" - np.testing.assert_allclose(np.array(fig.data[0].y), df["a"].to_numpy()) - assert fig.data[0].mode == "markers" - np.testing.assert_allclose(np.array(fig.data[1].y), ref.to_numpy()) - assert fig.data[1].mode == "lines" + hist_traces = [tr for tr in fig.data if tr.type == "histogram"] + assert len(hist_traces) == 1 + hist = hist_traces[0] + assert len(hist.x) == len(sampler.results) - def test_sample(self): - sample = Sample(REAL_PARAM) - assert sample.values.shape == (0, 3) - pd.testing.assert_series_equal(sample.results, pd.Series()) + def test_plot_pcp(self): + t = pd.date_range("2025-01-01 00:00:00", periods=2, freq="h") + df1 = pd.DataFrame({"res": [1.0, 2.0]}, index=t) + df2 = pd.DataFrame({"res": [3.0, 4.0]}, index=t) + df3 = pd.DataFrame({"res": [5.0, 6.0]}, index=t) + results = pd.Series([df1, df2, df3]) - sample.add_samples( - np.array([[1, 0.9, 10], [3, 0.85, 20]]), + param_names = ["p1", "p2"] + param_values = np.array( [ - pd.DataFrame(), - pd.DataFrame( - {"res": [1, 2]}, index=pd.date_range("2009", freq="h", periods=2) - ), - ], + [1.1, 2.2], + [3.3, 4.4], + [5.5, 6.6], + ] ) - assert sample.get_pending_index().tolist() == [True, False] - assert sample.values.tolist() == [[1.0, 0.9, 10.0], [3.0, 0.85, 20.0]] - assert sample.get_parameters_intervals().tolist() == [ - [0.0, 10.0], - [0.8, 1.2], - [0.0, 100.0], - ] - assert sample.get_list_parameter_value_pairs(sample.get_pending_index()) == [ - [(REAL_PARAM[0], 1.0), (REAL_PARAM[1], 0.9), (REAL_PARAM[2], 10.0)], - ] - - assert len(sample) == 2 - - item = sample[1] - assert isinstance(item, dict) - assert np.allclose(item["values"], [3.0, 0.85, 20.0]) - pd.testing.assert_frame_equal(item["results"], sample.results.iloc[1]) - - new_result = pd.DataFrame({"res": [42]}, index=pd.date_range("2009", periods=1)) - sample[1] = {"results": new_result} - pd.testing.assert_frame_equal(sample.results.iloc[1], new_result) - - sample[0] = { - "values": np.array([9.9, 1.1, 88]), - "results": pd.DataFrame({"res": [123]}, index=[pd.Timestamp("2009-01-01")]), - } - np.testing.assert_allclose(sample.values[0], [9.9, 1.1, 88]) - assert not sample.results.iloc[0].empty - - dimless_val = sample.get_dimension_less_values() - np.testing.assert_allclose( - dimless_val, np.array([[0.99, 0.75, 0.88], [0.3, 0.125, 0.2]]) + agg_sum = aggregate_time_series( + results, indicator="res", method="sum", prefix="sum" + ) + agg_mean = aggregate_time_series( + results, indicator="res", method="mean", prefix="mean" ) + aggregated = pd.concat([agg_sum, agg_mean], axis=1) - pd.testing.assert_frame_equal( - sample.get_aggregate_time_series("res"), - pd.DataFrame([123.0, 42.0], [0, 1], columns=["aggregated_res"]), + fig = plot_pcp( + parameter_values=param_values, + parameter_names=param_names, + aggregated_results=aggregated, + color_by="sum_res", + title="Parallel Coordinates — Samples", ) - fig = sample.plot_hist("res") - assert fig.layout.title["text"] == "Sample distribution of mean res" - assert fig.layout.xaxis.title["text"] == "mean res " + assert isinstance(fig, go.Figure) + assert len(fig.data) == 1 + pc = fig.data[0] + np.testing.assert_allclose(pc.dimensions[0]["values"], [1.1, 3.3, 5.5]) # p1 - fig = sample.plot("res") - assert fig + def test_plot_pcp_in_sampler(self): + sampler = LHSSampler( + parameters=REAL_PARAM, + model=Pymodel(), + simulation_options=SIMULATION_OPTIONS, + ) + sampler.add_sample(3, 42, simulate=True) - sample._validate() + fig = sampler.plot_pcp( + indicator="res", + ) + assert isinstance(fig, go.Figure) + assert len(fig.data) == 1 def test_lhs_sampler(self): sampler = LHSSampler( @@ -369,13 +330,41 @@ def test_lhs_sampler(self): sampler.simulate_at(slice(4, 7)) assert [df.empty for df in sampler.results[-3:].values] == [False, True, True] - sampler.add_sample(3, simulate=False) + sampler.add_sample(3, rng=42, simulate=False) sampler.simulate_at(slice(10, None)) assert [df.empty for df in sampler.results[-3:].values] == [True, False, False] - sampler.add_sample(3, simulate=True) + sampler.add_sample(3, rng=42, simulate=True) assert all(not df.empty for df in sampler.results[-3:]) + sampler.simulate_pending() + to_test = sampler.get_sample_aggregated_time_series("res") + pd.testing.assert_frame_equal( + to_test, + pd.DataFrame( + { + "aggregated_res": { + 0: 85.75934698790918, + 1: 38.08478803524709, + 2: 61.67268698504139, + 3: 85.75934698790918, + 4: 38.08478803524709, + 5: 61.67268698504139, + 6: 85.75934698790918, + 7: 38.08478803524709, + 8: 61.67268698504139, + 9: 85.75934698790918, + 10: 38.08478803524709, + 11: 61.67268698504139, + 12: 85.75934698790918, + 13: 38.08478803524709, + 14: 61.67268698504139, + } + } + ), + check_exact=False, + ) + def test_morris_sampler(self): sampler = MorrisSampler( parameters=ISHIGAMI_PARAMETERS, diff --git a/tests/test_sensitivity.py b/tests/test_sensitivity.py index e50bd4e..7f0cf91 100644 --- a/tests/test_sensitivity.py +++ b/tests/test_sensitivity.py @@ -67,7 +67,7 @@ def test_sanalysis_morris(self): simulation_options=SIMULATION_OPTIONS, ) morris_analysis.add_sample(N=1000, n_cpu=1, seed=42) - agg_res = morris_analysis.sampler.sample.get_aggregate_time_series("res") + agg_res = morris_analysis.sampler.sample.get_aggregated_time_series("res") pd.testing.assert_frame_equal( agg_res.loc[0:7],