Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ settings.json
.mypy_cache/
.pytest_cache/

# Claude Code
CLAUDE.md

# Tests artifacts
reports/
coverage.xml
Expand Down
3 changes: 2 additions & 1 deletion ambrosia/preprocessing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from .ml_var_reducer import MLVarianceReducer
from .preprocessor import Preprocessor
from .robust import IQRPreprocessor, RobustPreprocessor
from .transformers import BoxCoxTransformer, LogTransformer
from .transformers import BoxCoxTransformer, LinearizationTransformer, LogTransformer

__all__ = [
"AggregatePreprocessor",
Expand All @@ -32,5 +32,6 @@
"RobustPreprocessor",
"IQRPreprocessor",
"BoxCoxTransformer",
"LinearizationTransformer",
"LogTransformer",
]
46 changes: 45 additions & 1 deletion ambrosia/preprocessing/preprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from ambrosia.preprocessing.aggregate import AggregatePreprocessor
from ambrosia.preprocessing.cuped import Cuped, MultiCuped
from ambrosia.preprocessing.robust import IQRPreprocessor, RobustPreprocessor
from ambrosia.preprocessing.transformers import BoxCoxTransformer, LogTransformer
from ambrosia.preprocessing.transformers import BoxCoxTransformer, LinearizationTransformer, LogTransformer


class Preprocessor:
Expand Down Expand Up @@ -378,6 +378,50 @@ def multicuped(
self.transformers.append(transformer)
return self

def linearize(
self,
numerator: types.ColumnNameType,
denominator: types.ColumnNameType,
transformed_name: Optional[types.ColumnNameType] = None,
load_path: Optional[Path] = None,
) -> Preprocessor:
"""
Linearize a ratio metric for use in A/B testing.

Computes a per-unit linearized value that is approximately normally
distributed, enabling correct t-test usage for ratio metrics:

linearized_i = numerator_i - ratio * denominator_i

where ratio = mean(numerator) / mean(denominator) is estimated on
the data passed to this ``Preprocessor`` instance (reference / control data).

Parameters
----------
numerator : ColumnNameType
Column name of the ratio numerator (e.g. ``"revenue"``).
denominator : ColumnNameType
Column name of the ratio denominator (e.g. ``"orders"``).
transformed_name : ColumnNameType, optional
Name for the new linearized column. Defaults to
``"{numerator}_lin"``.
load_path : Path, optional
Path to a json file with pre-fitted parameters.

Returns
-------
self : Preprocessor
Instance object.
"""
transformer = LinearizationTransformer()
if load_path is None:
transformer.fit_transform(self.dataframe, numerator, denominator, transformed_name, inplace=True)
else:
transformer.load_params(load_path)
transformer.transform(self.dataframe, inplace=True)
self.transformers.append(transformer)
return self

def transformations(self) -> List:
"""
List of all transformations which were called.
Expand Down
133 changes: 132 additions & 1 deletion ambrosia/preprocessing/transformers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
Module contains tools for metrics transformations during a
preprocessing task.
"""
from typing import Dict, Union
from typing import Dict, Optional, Union

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -386,3 +386,134 @@ def inverse_transform(self, dataframe: pd.DataFrame, inplace: bool = False) -> U
transformed: pd.DataFrame = dataframe if inplace else dataframe.copy()
transformed[self.column_names] = np.exp(transformed[self.column_names].values)
return None if inplace else transformed


class LinearizationTransformer(AbstractFittableTransformer):
"""
Linearization transformer for ratio metrics.

Converts a ratio metric (numerator / denominator) into a per-unit linearized
metric that is approximately normally distributed, enabling correct t-test usage:

linearized_i = numerator_i - ratio * denominator_i

where ratio = mean(numerator) / mean(denominator), estimated on the reference
(control group / historical) data passed to fit().

Parameters
----------
numerator : str
Column name of the ratio numerator (e.g. "revenue").
denominator : str
Column name of the ratio denominator (e.g. "orders").
transformed_name : str, optional
Name for the new column. Defaults to ``"{numerator}_lin"``.

Examples
--------
>>> transformer = LinearizationTransformer()
>>> transformer.fit(control_df, "revenue", "orders", "arpu_lin")
>>> transformer.transform(experiment_df, inplace=True)
"""

def __str__(self) -> str:
return "Linearization transformation"

def __init__(self) -> None:
self.numerator: Optional[str] = None
self.denominator: Optional[str] = None
self.transformed_name: Optional[str] = None
self.ratio: Optional[float] = None
super().__init__()

def get_params_dict(self) -> Dict:
self._check_fitted()
return {
"numerator": self.numerator,
"denominator": self.denominator,
"transformed_name": self.transformed_name,
"ratio": self.ratio,
}

def load_params_dict(self, params: Dict) -> None:
for key in ("numerator", "denominator", "transformed_name", "ratio"):
if key not in params:
raise TypeError(f"params argument must contain: {key}")
setattr(self, key, params[key])
self.fitted = True

def fit(
self,
dataframe: pd.DataFrame,
numerator: str,
denominator: str,
transformed_name: Optional[str] = None,
):
"""
Estimate ratio = mean(numerator) / mean(denominator) on reference data.

Parameters
----------
dataframe : pd.DataFrame
Reference dataframe (typically control group or historical data).
numerator : str
Column name of the ratio numerator.
denominator : str
Column name of the ratio denominator.
transformed_name : str, optional
Name for the linearized column. Defaults to ``"{numerator}_lin"``.
"""
self._check_cols(dataframe, [numerator, denominator])
denom_mean = dataframe[denominator].mean()
if denom_mean == 0:
raise ValueError(f"Mean of denominator column '{denominator}' is zero; cannot compute ratio.")
self.numerator = numerator
self.denominator = denominator
self.transformed_name = transformed_name if transformed_name is not None else f"{numerator}_lin"
self.ratio = dataframe[numerator].mean() / denom_mean
self.fitted = True
return self

def transform(self, dataframe: pd.DataFrame, inplace: bool = False) -> Union[pd.DataFrame, None]:
"""
Apply linearization: transformed = numerator - ratio * denominator.

Parameters
----------
dataframe : pd.DataFrame
Dataframe to transform.
inplace : bool, default: ``False``
If ``True`` modifies dataframe in place, otherwise returns a copy.
"""
self._check_fitted()
self._check_cols(dataframe, [self.numerator, self.denominator])
df = dataframe if inplace else dataframe.copy()
df[self.transformed_name] = df[self.numerator] - self.ratio * df[self.denominator]
return None if inplace else df

def fit_transform(
self,
dataframe: pd.DataFrame,
numerator: str,
denominator: str,
transformed_name: Optional[str] = None,
inplace: bool = False,
) -> Union[pd.DataFrame, None]:
"""
Fit and transform in one step.

Parameters
----------
dataframe : pd.DataFrame
Reference dataframe for fitting and transformation.
numerator : str
Column name of the ratio numerator.
denominator : str
Column name of the ratio denominator.
transformed_name : str, optional
Name for the linearized column.
inplace : bool, default: ``False``
If ``True`` modifies dataframe in place.
"""
self.fit(dataframe, numerator, denominator, transformed_name)
return self.transform(dataframe, inplace)
19 changes: 16 additions & 3 deletions ambrosia/tester/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,23 @@ class SparkCriteria(enum.Enum):

class TheoreticalTesterHandler:
def __init__(
self, group_a, group_b, column: str, alpha: np.ndarray, effect_type: str, criterion: StatCriterion, **kwargs
self,
group_a,
group_b,
column: str,
alpha: np.ndarray,
effect_type: str,
criterion: StatCriterion,
metric_func=None,
**kwargs,
):
self.group_a = group_a
self.group_b = group_b
self.column = column
self.alpha = alpha
self.effect_type = effect_type
self.criterion = criterion
self.metric_func = metric_func
self.kwargs = kwargs

def _correct_criterion(self, criterion: tp.Any) -> bool:
Expand All @@ -79,8 +88,12 @@ def get_criterion(self, criterion: str, data_example: types.SparkOrPandas):

def _set_kwargs(self):
if isinstance(self.group_a, pd.DataFrame):
self.group_a = self.group_a[self.column].values
self.group_b = self.group_b[self.column].values
if self.metric_func is not None:
self.group_a = np.asarray(self.metric_func(self.group_a))
self.group_b = np.asarray(self.metric_func(self.group_b))
else:
self.group_a = self.group_a[self.column].values
self.group_b = self.group_b[self.column].values
elif isinstance(self.group_a, types.SparkDataFrame):
self.kwargs["column"] = self.column
self.kwargs["alpha"] = self.alpha
Expand Down
35 changes: 32 additions & 3 deletions ambrosia/tester/tester.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"""
import itertools
from copy import deepcopy
from typing import Dict, List, Optional, Union
from typing import Callable, Dict, List, Optional, Union
from warnings import warn

import numpy as np
Expand Down Expand Up @@ -88,6 +88,12 @@ class Tester(ABToolAbstract):
metrics : MetricNameType, optional
Metrics (columns of dataframe) which is used to calculate
experiment result.
metric_funcs : Dict[str, Callable], optional
Dictionary mapping metric names to callable functions.
Each function receives a ``pd.DataFrame`` (group data) and must
return an array-like of numeric values. When provided, the
function is used instead of column lookup for the corresponding
metric name. Only supported for pandas DataFrames.

Attributes
----------
Expand Down Expand Up @@ -241,6 +247,7 @@ def __init__(
id_column: Optional[types.ColumnNameType] = None,
first_type_errors: types.StatErrorType = 0.05,
metrics: Optional[types.MetricNamesType] = None,
metric_funcs: Optional[Dict[str, Callable]] = None,
):
"""
Tester class constructor to initialize the object.
Expand All @@ -257,6 +264,7 @@ def __init__(
self.set_experiment_results(experiment_results=experiment_results)
self.set_errors(first_type_errors)
self.set_metrics(metrics)
self.__metric_funcs = metric_funcs or {}

@staticmethod
def __filter_data(
Expand Down Expand Up @@ -372,9 +380,15 @@ def __pre_run(method: str, args: types._UsageArgumentsType, **kwargs) -> types.T
if method not in accepted_methods:
raise ValueError(f'Choose method from {", ".join(accepted_methods)}')
result: types.TesterResult = {}
metric_funcs: Dict = args.get("metric_funcs", {})
for metric in args["metrics"]:
a_values: np.ndarray = args["data_a_group"][metric].values
b_values: np.ndarray = args["data_b_group"][metric].values
metric_func = metric_funcs.get(metric)
if metric_func is not None:
a_values: np.ndarray = np.asarray(metric_func(args["data_a_group"]))
b_values: np.ndarray = np.asarray(metric_func(args["data_b_group"]))
else:
a_values = args["data_a_group"][metric].values
b_values = args["data_b_group"][metric].values
if method == "theory":
# TODO: Make it SolverClass ~ method
# solver = SolverClass(...)
Expand All @@ -386,6 +400,7 @@ def __pre_run(method: str, args: types._UsageArgumentsType, **kwargs) -> types.T
alpha=np.array(args["alpha"]),
effect_type=args["effect_type"],
criterion=args["criterion"],
metric_func=metric_func,
**kwargs,
)
sub_result = solver.solve()
Expand Down Expand Up @@ -473,6 +488,7 @@ def run(
criterion: Optional[ABStatCriterion] = None,
correction_method: Union[str, None] = "bonferroni",
as_table: bool = True,
metric_funcs: Optional[Dict[str, Callable]] = None,
**kwargs,
) -> types.TesterResult:
"""
Expand Down Expand Up @@ -515,6 +531,11 @@ def run(
as_table : bool, default: ``True``
Return the test results as a pandas dataframe.
If ``False``, a list of dicts with results will be returned.
metric_funcs : Dict[str, Callable], optional
Dictionary mapping metric names to callable functions.
Each function receives a group ``pd.DataFrame`` and returns
array-like values. Overrides functions set in constructor
for matching metric names. Only pandas DataFrames supported.
**kwargs : Dict
Other keyword arguments.

Expand Down Expand Up @@ -556,6 +577,8 @@ def run(
chosen_args: types._UsageArgumentsType = Tester._prepare_arguments(arguments_choice)
chosen_args["effect_type"] = effect_type
chosen_args["criterion"] = criterion
effective_metric_funcs = {**self.__metric_funcs, **(metric_funcs or {})}
chosen_args["metric_funcs"] = effective_metric_funcs

hypothesis_num: int = len(list(itertools.combinations(chosen_args["experiment_results"], 2))) * len(
chosen_args["metrics"]
Expand Down Expand Up @@ -602,6 +625,7 @@ def test(
criterion: Optional[ABStatCriterion] = None,
correction_method: Union[str, None] = "bonferroni",
as_table: bool = True,
metric_funcs: Optional[Dict[str, Callable]] = None,
**kwargs,
) -> types.TesterResult:
"""
Expand Down Expand Up @@ -649,6 +673,10 @@ def test(
as_table : bool, default: ``True``
Return the test results as a pandas dataframe.
If ``False``, a list of dicts with results will be returned.
metric_funcs : Dict[str, Callable], optional
Dictionary mapping metric names to callable functions.
Each function receives a group ``pd.DataFrame`` and returns
array-like values. Only pandas DataFrames supported.
**kwargs : Dict
Other keyword arguments.

Expand All @@ -673,5 +701,6 @@ def test(
criterion=criterion,
correction_method=correction_method,
as_table=as_table,
metric_funcs=metric_funcs,
**kwargs,
)
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading