Skip to content

Analyze Module

The xpyrment.analyze module contains submodules and components for analyze.

analyze

Experiment analysis, variance reduction, multiple testing corrections, and statistical inference.

This package houses the core statistical analysis engine of xpyrment. It coordinates the calculation of treatment effects, computes variance-reduced adjusted statistics, corrects for multiple simultaneous comparisons, and provides a modular statistical inference suite (frequentist, bayesian, sequential, and bootstrap).

Fluent API Integration

To preserve elegant, object-oriented fluent API chaining, this package dynamically registers the run_analysis method on the main Experiment orchestrator class when imported:

# Equivalent to:
result = setup(data, "variant").run_analysis()

MODULE DESCRIPTION
confounding

Multi-Factor Fractional ANOVA Confounding Resolvers for sparse experimental designs.

copula

Non-Gaussian Copula-Based Multi-Metric Inference (Block 24).

corrections

Multiple testing correction (MTC) statistical engines.

extreme

Extreme Value Theory (EVT) for Heavy-Tailed Conversions (Block 32).

inference

Statistical inference engines, frameworks, and decision-making systems.

its

Interrupted Time Series (ITS) with HAC Standard Errors (Block 35).

markov

Multi-State Markov Transition Journey Modeling (Block 30).

meta_regression

Meta-Regression with Knapp-Hartung and Newey-West HAC Standard Errors (Block 38).

orchestrator

Experiment analysis orchestrator, results compiler, and setup entrypoints.

outliers

Winsorization & Outlier Stabilization (Block 43).

ratio

Ratio Metric Delta Method Delta-Variance Estimation (Block 45).

registry

Metric Registry & Directed Acyclic Graph (DAG) Evaluator (Block 49).

sequential

Group Sequential Lan-DeMets Alpha Spending Functions (Block 36).

srm

Sample Ratio Mismatch (SRM) Detection & Sequential Guardrails (Block 41).

streaming

Low-latency streaming OLS/Ridge regression via recursive least squares (RLS) and Woodbury updates.

variance_reduction

Variance reduction algorithms, focusing on continuous and ratio-level CUPED.

CLASS DESCRIPTION
AnalysisResult

Holds results from an experiment analysis and provides summary formatting and plotting interfaces.

StreamingOLS

Streaming Ordinary Least Squares and Ridge Regression.

AliasResolver

Computes and resolves the alias structure in fractional factorial designs.

CopulaMultiMetricInference

Joint inference engine for correlated non-Gaussian metrics using empirical Gaussian Copulas.

MarkovJourneyAnalyzer

Analyzes chronological user state transitions in experimentation datasets.

ExtremeValueTailEstimator

Estimates tail indices and expected extreme lift using Generalized Pareto Distributions.

InterruptedTimeSeries

Models segmented regression over system-wide updates with Newey-West HAC standard errors.

GroupSequentialMonitor

Computes O'Brien-Fleming and Pocock sequential boundaries via Lan-DeMets spending functions.

MetaRegressor

Solves random-effects meta-regression models with Knapp-Hartung or Newey-West HAC standard errors.

SampleRatioMismatchDetector

Detects Sample Ratio Mismatch (SRM) using frequentist retrospective and online sequential tests.

WinsorizationEngine

Provides symmetric or asymmetric percentile-based capping boundaries.

RatioMetricDeltaMethod

Estimates ratio values, delta-variance bounds, and Wald treatment comparisons.

MetricRegistry

Manages a registry of metrics and evaluates them using topological sorting of a DAG.

FUNCTION DESCRIPTION
run_analysis

Executes the statistical analysis across all registered metrics in an Experiment container.

setup

Initializes the experimental setup container, serving as the library's primary entrypoint.

apply_cuped

Applies Controlled-experiments Using Pre-Experiment Data (CUPED) on a series.

apply_multiple_testing_correction

Applies multiple testing corrections on p-values using statsmodels.

AnalysisResult

AnalysisResult(
    raw_results: List[dict],
    alpha: float = 0.05,
    balance_checker: Optional[Any] = None,
)

Holds results from an experiment analysis and provides summary formatting and plotting interfaces.

This container aggregates the individual metric dictionaries calculated across control and treatment groups. It provides high-level APIs to compile clean summary tables and forward coordinates to the visualization engine.

ATTRIBUTE DESCRIPTION
raw_results

A list of metric calculation result dictionaries (keys: mean, lift, p_value, etc.).

TYPE: List[dict]

alpha

Nominal significance level (Type I error rate) used in the analysis. Defaults to 0.05.

TYPE: float

df_raw

The raw, unformatted results compiled into a pandas DataFrame.

TYPE: DataFrame

balance_checker

Fitted balance checker object if covariates were present.

TYPE: Optional[Any]

PARAMETER DESCRIPTION
raw_results

Raw list of metric results.

TYPE: List[dict]

alpha

Nominal significance level used.

TYPE: float DEFAULT: 0.05

balance_checker

Fitted CovariateBalanceChecker if covariates were specified.

TYPE: Optional[Any] DEFAULT: None

METHOD DESCRIPTION
love_plot

Returns the ASCII Love Plot visualization for baseline covariate balance.

to_dict

Converts the complete analysis results and metadata to a robust, serializable dictionary.

to_json

Converts the analysis results into a standardized, portable JSON string.

summary

Returns a summarized, human-readable DataFrame of the analysis.

plot

Generates and returns a forest plot of the relative metric lifts and confidence intervals.

Source code in src\xpyrment\analyze\orchestrator.py
def __init__(
    self,
    raw_results: List[dict],
    alpha: float = 0.05,
    balance_checker: Optional[Any] = None,
):
    """Initializes an AnalysisResult.

    Args:
        raw_results (List[dict]): Raw list of metric results.
        alpha (float): Nominal significance level used.
        balance_checker (Optional[Any]): Fitted CovariateBalanceChecker if covariates were specified.
    """
    self.raw_results = raw_results
    self.alpha = alpha
    self.df_raw = pd.DataFrame(raw_results)
    self.balance_checker = balance_checker

love_plot

love_plot() -> str

Returns the ASCII Love Plot visualization for baseline covariate balance.

RETURNS DESCRIPTION
str

An ASCII text-based representation or message.

TYPE: str

Source code in src\xpyrment\analyze\orchestrator.py
def love_plot(self) -> str:
    """Returns the ASCII Love Plot visualization for baseline covariate balance.

    Returns:
        str: An ASCII text-based representation or message.
    """
    if self.balance_checker is None:
        return "No covariate balance diagnostics were compiled (no covariates provided)."
    return self.balance_checker.generate_love_plot()

to_dict

to_dict() -> dict

Converts the complete analysis results and metadata to a robust, serializable dictionary.

Includes significance thresholds (alpha), raw metric outcomes, and covariate balance diagnostics if present.

RETURNS DESCRIPTION
dict

A nested dictionary with native Python types, guaranteed to be JSON serializable.

TYPE: dict

Source code in src\xpyrment\analyze\orchestrator.py
def to_dict(self) -> dict:
    """Converts the complete analysis results and metadata to a robust, serializable dictionary.

    Includes significance thresholds (alpha), raw metric outcomes, and covariate balance diagnostics if present.

    Returns:
        dict: A nested dictionary with native Python types, guaranteed to be JSON serializable.
    """
    from xpyrment.core.serialization import make_serializable

    state = {
        "alpha": self.alpha,
        "metrics": self.raw_results,
        "covariate_balance": (
            self.balance_checker.diagnostics_
            if (self.balance_checker is not None)
            else None
        ),
    }
    return make_serializable(state)

to_json

to_json(indent: Optional[int] = None) -> str

Converts the analysis results into a standardized, portable JSON string.

PARAMETER DESCRIPTION
indent

If provided, formats the JSON string with this indentation level.

TYPE: Optional[int] DEFAULT: None

RETURNS DESCRIPTION
str

Standardized JSON representation of the analysis results.

TYPE: str

Source code in src\xpyrment\analyze\orchestrator.py
def to_json(self, indent: Optional[int] = None) -> str:
    """Converts the analysis results into a standardized, portable JSON string.

    Args:
        indent (Optional[int]): If provided, formats the JSON string with this indentation level.

    Returns:
        str: Standardized JSON representation of the analysis results.
    """
    from xpyrment.core.serialization import serialize_to_json

    return serialize_to_json(self.to_dict(), indent=indent)

summary

summary(formatted: bool = True) -> DataFrame

Returns a summarized, human-readable DataFrame of the analysis.

Formats raw numeric statistics (standard errors, differences, variances) into readable percentage lifts, relative confidence intervals, power indicators, and significance star symbols.

Significance Star Mapping
  • *** : \(p < 0.001\) (Highly significant)
  • ** : \(p < 0.01\) (Significant)
  • * : \(p < 0.05\) (Significant)
  • No star : \(p \ge 0.05\) (Not statistically significant at the nominal level \(\alpha=0.05\))
PARAMETER DESCRIPTION
formatted

If True, returns nicely formatted strings for display (with percentage symbols, stars, and bracketed intervals). If False, returns the raw numeric values. Defaults to True.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
DataFrame

pd.DataFrame: A pandas DataFrame containing binned summaries of each analyzed metric.

Source code in src\xpyrment\analyze\orchestrator.py
def summary(self, formatted: bool = True) -> pd.DataFrame:
    r"""Returns a summarized, human-readable DataFrame of the analysis.

    Formats raw numeric statistics (standard errors, differences, variances) into readable
    percentage lifts, relative confidence intervals, power indicators, and significance star symbols.

    Significance Star Mapping:
        - `***` : $p < 0.001$ (Highly significant)
        - `**`  : $p < 0.01$ (Significant)
        - `*`   : $p < 0.05$ (Significant)
        - No star : $p \ge 0.05$ (Not statistically significant at the nominal level $\alpha=0.05$)

    Args:
        formatted (bool): If True, returns nicely formatted strings for display (with percentage symbols,
            stars, and bracketed intervals). If False, returns the raw numeric values. Defaults to True.

    Returns:
        pd.DataFrame: A pandas DataFrame containing binned summaries of each analyzed metric.
    """
    df = self.df_raw.copy()

    if not formatted:
        return df

    summary_df = pd.DataFrame()
    summary_df["Metric"] = df["metric_name"]
    summary_df["Type"] = df["metric_type"]

    summary_df["Control Mean"] = df["control_mean"].map("{:.4f}".format)
    summary_df["Treatment Mean"] = df["treatment_mean"].map("{:.4f}".format)

    summary_df["Relative Lift"] = df["relative_lift"].map("{:+.2%}".format, na_action="ignore").fillna("N/A")

    ci_mask = df["rel_ci_lower"].isna() | df["rel_ci_upper"].isna()
    lower_str = df["rel_ci_lower"].map("{:+.2%}".format, na_action="ignore")
    upper_str = df["rel_ci_upper"].map("{:+.2%}".format, na_action="ignore")
    summary_df["95% CI (Rel)"] = np.where(ci_mask, "N/A", "[" + lower_str.astype(str) + ", " + upper_str.astype(str) + "]")

    p_val = df["p_value"]
    p_str = p_val.map("{:.4f}".format, na_action="ignore")
    sig_symbol = np.select([p_val < 0.001, p_val < 0.01, p_val < 0.05], ["***", "**", "*"], default="")
    summary_df["p-value"] = np.where(p_val.isna(), "N/A", p_str.astype(str) + sig_symbol)

    summary_df["Post-hoc Power"] = df["power"].map("{:.1%}".format, na_action="ignore").fillna("N/A")

    summary_df["CUPED"] = df["cuped_applied"].map({True: "Yes", False: "No"})

    # Previously, the logic was: `if not cuped: return "-"`.
    # In python, `bool(np.nan)` is True, while `bool(None)` or `bool(False)` is False.
    # Using `.astype(bool)` exactly replicates this prior behavior.
    cuped_applied = df["cuped_applied"].astype(bool)
    var_red_str = df["variance_reduction"].map("{:.1%}".format, na_action="ignore").fillna("-")
    summary_df["Var Reduction"] = np.where(cuped_applied, var_red_str, "-")

    # Automatically raise an alert / print warning if covariate imbalance is detected
    if self.balance_checker is not None and self.balance_checker.diagnostics_:
        imbalanced = []
        for name, stats in self.balance_checker.diagnostics_.items():
            if abs(stats["smd"]) > 0.1:
                imbalanced.append(f"'{name}' (SMD={stats['smd']:+.4f})")
        if imbalanced:
            warnings.warn(
                f"COVARIATE IMBALANCE DETECTED: The following baseline covariates have standardized mean "
                f"differences (SMD) exceeding the standard 0.1 threshold: {', '.join(imbalanced)}. "
                f"Consider running CUPED adjustments, matching, or checking your randomization procedure."
            )

    return summary_df

plot

plot(**kwargs: Any) -> Any

Generates and returns a forest plot of the relative metric lifts and confidence intervals.

Forwards coordinates to the visualization module.

PARAMETER DESCRIPTION
**kwargs

Plot customization arguments forwarded to plot_forest (e.g., figure size, colors).

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Any

matplotlib.axes.Axes or plotly.graph_objects.Figure: The generated relative lift forest plot.

Source code in src\xpyrment\analyze\orchestrator.py
def plot(self, **kwargs: Any) -> Any:
    """Generates and returns a forest plot of the relative metric lifts and confidence intervals.

    Forwards coordinates to the visualization module.

    Args:
        **kwargs: Plot customization arguments forwarded to `plot_forest` (e.g., figure size, colors).

    Returns:
        matplotlib.axes.Axes or plotly.graph_objects.Figure: The generated relative lift forest plot.
    """
    # Re-routed to the reporting/export layer dynamically
    from xpyrment.report.export import plot_forest

    return plot_forest(self.df_raw, alpha=self.alpha, **kwargs)

StreamingOLS

StreamingOLS(
    n_features: int,
    l2_penalty: float = 1.0,
    fit_intercept: bool = True,
)

Streaming Ordinary Least Squares and Ridge Regression.

Maintains the running coefficients (beta) and the inverse of the covariance matrix using the Woodbury matrix identity (Sherman-Morrison formula). Updates take O(P^2) time complexity per sample rather than O(P^3) offline inversion time.

PARAMETER DESCRIPTION
n_features

Number of independent features (excluding bias).

TYPE: int

l2_penalty

Ridge L2 regularization multiplier (lambda). Defaults to 1.0.

TYPE: float DEFAULT: 1.0

fit_intercept

If True, automatically appends a bias term. Defaults to True.

TYPE: bool DEFAULT: True

METHOD DESCRIPTION
update

Performs a single-sample recursive least squares update.

update_batch

Updates the running model with a batch of observations.

predict

Predicts the target value using the active coefficient weights.

ATTRIBUTE DESCRIPTION
coefficients

Returns the current estimated regression coefficients.

TYPE: ndarray

Source code in src\xpyrment\analyze\streaming.py
def __init__(self, n_features: int, l2_penalty: float = 1.0, fit_intercept: bool = True) -> None:
    """Initializes the streaming OLS regression model.

    Args:
        n_features (int): Number of independent features (excluding bias).
        l2_penalty (float): Ridge L2 regularization multiplier (lambda). Defaults to 1.0.
        fit_intercept (bool): If True, automatically appends a bias term. Defaults to True.
    """
    self.n_features = n_features
    self.l2_penalty = l2_penalty
    self.fit_intercept = fit_intercept

    # Total model parameters (including bias if fit_intercept is True)
    self.p_dims = n_features + 1 if fit_intercept else n_features

    # Initialize the coefficient vector to zero
    self.beta = np.zeros(self.p_dims)

    # Initialize the inverse covariance matrix: P = (1 / l2_penalty) * I
    # Using a small positive regularization constant prevents singular matrix issues on init
    self.P = (1.0 / max(l2_penalty, 1e-9)) * np.eye(self.p_dims)
    if fit_intercept:
        # Set the intercept's diagonal element to a very large prior variance (1e9)
        # which corresponds to a prior precision (L2 penalty) of 1e-9 (effectively 0.0)
        self.P[0, 0] = 1e9
    self.n_samples = 0

coefficients property

coefficients: ndarray

Returns the current estimated regression coefficients.

update

update(x: ndarray, y: float) -> None

Performs a single-sample recursive least squares update.

Updates the running coefficient vector (beta) and covariance inverse (P) in O(P^2) time.

PARAMETER DESCRIPTION
x

1D array of features of shape (n_features,).

TYPE: ndarray

y

Numeric target value.

TYPE: float

Source code in src\xpyrment\analyze\streaming.py
def update(self, x: np.ndarray, y: float) -> None:
    """Performs a single-sample recursive least squares update.

    Updates the running coefficient vector (beta) and covariance inverse (P) in O(P^2) time.

    Args:
        x (np.ndarray): 1D array of features of shape (n_features,).
        y (float): Numeric target value.
    """
    x_vec = self._prepare_vector(x)
    self.n_samples += 1

    # Compute intermediate gain components: P * x
    Px = np.dot(self.P, x_vec)

    # Denominator scalar: 1 + x^T * P * x
    denom = 1.0 + np.dot(x_vec, Px)

    # Update gain vector g
    gain = Px / denom

    # Update the coefficients: beta_new = beta_old + gain * (y - x^T * beta_old)
    error = y - np.dot(x_vec, self.beta)
    self.beta += gain * error

    # Update covariance inverse via Sherman-Morrison: P = P - gain * (x^T * P)
    self.P -= np.outer(gain, Px)

update_batch

update_batch(X: ndarray, y: ndarray) -> None

Updates the running model with a batch of observations.

Loops through each row to perform recursive rank-1 updates.

PARAMETER DESCRIPTION
X

Feature matrix of shape (M, n_features).

TYPE: ndarray

y

Target outcomes of shape (M,).

TYPE: ndarray

Source code in src\xpyrment\analyze\streaming.py
def update_batch(self, X: np.ndarray, y: np.ndarray) -> None:
    """Updates the running model with a batch of observations.

    Loops through each row to perform recursive rank-1 updates.

    Args:
        X (np.ndarray): Feature matrix of shape (M, n_features).
        y (np.ndarray): Target outcomes of shape (M,).
    """
    X = np.atleast_2d(X)
    y = y.ravel()
    if X.shape[0] != y.shape[0]:
        raise ValueError("Input features and targets must have matching sample dimensions.")

    for i in range(X.shape[0]):
        self.update(X[i], float(y[i]))

predict

predict(X: ndarray) -> ndarray

Predicts the target value using the active coefficient weights.

PARAMETER DESCRIPTION
X

Feature matrix of shape (M, n_features).

TYPE: ndarray

RETURNS DESCRIPTION
ndarray

np.ndarray: Vector of predictions of shape (M,).

Source code in src\xpyrment\analyze\streaming.py
def predict(self, X: np.ndarray) -> np.ndarray:
    """Predicts the target value using the active coefficient weights.

    Args:
        X (np.ndarray): Feature matrix of shape (M, n_features).

    Returns:
        np.ndarray: Vector of predictions of shape (M,).
    """
    X = np.atleast_2d(X)
    if self.fit_intercept:
        X_bias = np.hstack([np.ones((X.shape[0], 1)), X])
        return np.dot(X_bias, self.beta)
    return np.dot(X, self.beta)

AliasResolver

AliasResolver(
    primary_cols: List[str],
    potential_confounding_cols: List[str],
)

Computes and resolves the alias structure in fractional factorial designs.

TODO: Implement sequential D-optimal design updates that minimize the trace of the alias matrix projection error

covariance, Cov(beta_1_true) = sigma^2 (X_1^T X_1)^{-1} + A Cov(beta_2) A^T.

PARAMETER DESCRIPTION
primary_cols

Column names representing primary effects of interest (e.g., main effects).

TYPE: List[str]

potential_confounding_cols

Column names representing potential higher-order confounding effects (e.g., 2-way or 3-way interactions).

TYPE: List[str]

METHOD DESCRIPTION
compute_alias_matrix

Computes the alias matrix: A = (X1^T X1 + lambda * I)^{-1} X1^T X2.

get_alias_report

Generates a structured report detailing which primary effects are aliased with which confounding effects.

resolve_coefficients

Resolves/decouples true primary coefficients given biased estimates and prior confounding parameters.

Source code in src\xpyrment\analyze\confounding.py
def __init__(self, primary_cols: List[str], potential_confounding_cols: List[str]) -> None:
    """Initializes the AliasResolver.

    Args:
        primary_cols (List[str]): Column names representing primary effects of interest (e.g., main effects).
        potential_confounding_cols (List[str]): Column names representing potential higher-order confounding
            effects (e.g., 2-way or 3-way interactions).
    """
    self.primary_cols = primary_cols
    self.potential_confounding_cols = potential_confounding_cols
    self.alias_matrix_ = None

compute_alias_matrix

compute_alias_matrix(
    df: DataFrame, l2_penalty: float = 1e-06
) -> ndarray

Computes the alias matrix: A = (X1^T X1 + lambda * I)^{-1} X1^T X2.

PARAMETER DESCRIPTION
df

Input design or experimental data containing all specified columns.

TYPE: DataFrame

l2_penalty

Regularization parameter for the inverse computation to ensure numerical stability. Defaults to 1e-6.

TYPE: float DEFAULT: 1e-06

RETURNS DESCRIPTION
ndarray

np.ndarray: The alias matrix A of shape (len(primary_cols) + 1, len(potential_confounding_cols)). The first row corresponds to the unpenalized intercept/bias term.

Source code in src\xpyrment\analyze\confounding.py
def compute_alias_matrix(self, df: pd.DataFrame, l2_penalty: float = 1e-6) -> np.ndarray:
    """Computes the alias matrix: A = (X1^T X1 + lambda * I)^{-1} X1^T X2.

    Args:
        df (pd.DataFrame): Input design or experimental data containing all specified columns.
        l2_penalty (float): Regularization parameter for the inverse computation to ensure numerical stability.
            Defaults to 1e-6.

    Returns:
        np.ndarray: The alias matrix A of shape (len(primary_cols) + 1, len(potential_confounding_cols)).
            The first row corresponds to the unpenalized intercept/bias term.
    """
    N = len(df)

    # Build X1 with bias term (intercept)
    X1 = np.hstack([np.ones((N, 1)), df[self.primary_cols].to_numpy()])
    X2 = df[self.potential_confounding_cols].to_numpy()

    # Regularized normal equations solver: (X1^T X1 + lambda * I) A = X1^T X2
    XTX = np.dot(X1.T, X1)
    XTX_reg = XTX + l2_penalty * np.eye(X1.shape[1])
    # Do not regularize the intercept term
    XTX_reg[0, 0] = XTX[0, 0]

    X1TX2 = np.dot(X1.T, X2)
    self.alias_matrix_ = np.linalg.solve(XTX_reg, X1TX2)
    return self.alias_matrix_

get_alias_report

get_alias_report(
    df: DataFrame, threshold: float = 0.001
) -> Dict[str, List[Tuple[str, float]]]

Generates a structured report detailing which primary effects are aliased with which confounding effects.

PARAMETER DESCRIPTION
df

Input design or experimental data.

TYPE: DataFrame

threshold

Only reports alias coefficients whose absolute value exceeds this threshold. Defaults to 1e-3.

TYPE: float DEFAULT: 0.001

RETURNS DESCRIPTION
Dict[str, List[Tuple[str, float]]]

Dict[str, List[Tuple[str, float]]]: A dictionary mapping primary effect names (and 'intercept') to lists of (confounding_col_name, alias_coefficient) tuples.

Source code in src\xpyrment\analyze\confounding.py
def get_alias_report(self, df: pd.DataFrame, threshold: float = 1e-3) -> Dict[str, List[Tuple[str, float]]]:
    """Generates a structured report detailing which primary effects are aliased with which confounding effects.

    Args:
        df (pd.DataFrame): Input design or experimental data.
        threshold (float): Only reports alias coefficients whose absolute value exceeds this threshold.
            Defaults to 1e-3.

    Returns:
        Dict[str, List[Tuple[str, float]]]: A dictionary mapping primary effect names (and 'intercept')
            to lists of (confounding_col_name, alias_coefficient) tuples.
    """
    A = self.compute_alias_matrix(df)
    report = {}

    labels = ["intercept"] + self.primary_cols

    for i, primary_label in enumerate(labels):
        row_aliases = []
        for j, confounding_col in enumerate(self.potential_confounding_cols):
            coef = float(A[i, j])
            if abs(coef) > threshold:
                row_aliases.append((confounding_col, coef))
        if row_aliases:
            report[primary_label] = row_aliases

    return report

resolve_coefficients

resolve_coefficients(
    beta1_biased: ndarray, beta2_prior: ndarray
) -> ndarray

Resolves/decouples true primary coefficients given biased estimates and prior confounding parameters.

Uses the relationship: beta1_true = beta1_biased - A * beta2_prior

PARAMETER DESCRIPTION
beta1_biased

Biased primary coefficients (including intercept) of shape (len(primary_cols) + 1,).

TYPE: ndarray

beta2_prior

Prior known or estimated higher-order confounding coefficients of shape (len(potential_confounding_cols),).

TYPE: ndarray

RETURNS DESCRIPTION
ndarray

np.ndarray: Cleaned/debiased primary coefficients of shape (len(primary_cols) + 1,).

Source code in src\xpyrment\analyze\confounding.py
def resolve_coefficients(self, beta1_biased: np.ndarray, beta2_prior: np.ndarray) -> np.ndarray:
    """Resolves/decouples true primary coefficients given biased estimates and prior confounding parameters.

    Uses the relationship: beta1_true = beta1_biased - A * beta2_prior

    Args:
        beta1_biased (np.ndarray): Biased primary coefficients (including intercept) of shape (len(primary_cols) + 1,).
        beta2_prior (np.ndarray): Prior known or estimated higher-order confounding coefficients
            of shape (len(potential_confounding_cols),).

    Returns:
        np.ndarray: Cleaned/debiased primary coefficients of shape (len(primary_cols) + 1,).
    """
    if self.alias_matrix_ is None:
        raise ValueError("Alias matrix must be computed using compute_alias_matrix before resolving coefficients.")

    return beta1_biased - np.dot(self.alias_matrix_, beta2_prior)

CopulaMultiMetricInference

CopulaMultiMetricInference(l2_penalty: float = 1e-06)

Joint inference engine for correlated non-Gaussian metrics using empirical Gaussian Copulas.

Estimates univariate empirical cumulative distribution functions (CDFs) to project marginal observations onto uniform space [0, 1], maps to standard normal space, estimates the latent correlation structure, and performs a joint Wald test.

TODO: Support parametric copula families (such as Clayton or Gumbel) to capture asymmetric tail dependencies.

TODO: Add multivariate p-value corrections (e.g. step-down procedures) to control Family-Wise Error Rate (FWER) under copula.

PARAMETER DESCRIPTION
l2_penalty

Regularization for covariance matrix inversion. Defaults to 1e-6.

TYPE: float DEFAULT: 1e-06

METHOD DESCRIPTION
fit_copula

Fits empirical marginals and constructs the latent copula correlation matrix.

test_joint_shift

Performs a joint Wald test of treatment effects across all correlated metrics.

Source code in src\xpyrment\analyze\copula.py
def __init__(self, l2_penalty: float = 1e-6) -> None:
    """Initializes the copula inference engine.

    Args:
        l2_penalty (float): Regularization for covariance matrix inversion. Defaults to 1e-6.
    """
    self.l2_penalty = l2_penalty
    self.copula_correlation_: np.ndarray = None
    self.marginal_cdfs_: List[Callable[[np.ndarray], np.ndarray]] = []

fit_copula

fit_copula(
    df: DataFrame, metric_cols: List[str]
) -> CopulaMultiMetricInference

Fits empirical marginals and constructs the latent copula correlation matrix.

PARAMETER DESCRIPTION
df

DataFrame containing experimental metric columns.

TYPE: DataFrame

metric_cols

List of metric column names to include in the joint copula.

TYPE: List[str]

RETURNS DESCRIPTION
CopulaMultiMetricInference

Fitted engine.

TYPE: CopulaMultiMetricInference

Source code in src\xpyrment\analyze\copula.py
def fit_copula(self, df: pd.DataFrame, metric_cols: List[str]) -> "CopulaMultiMetricInference":
    """Fits empirical marginals and constructs the latent copula correlation matrix.

    Args:
        df (pd.DataFrame): DataFrame containing experimental metric columns.
        metric_cols (List[str]): List of metric column names to include in the joint copula.

    Returns:
        CopulaMultiMetricInference: Fitted engine.
    """
    D = len(metric_cols)
    N = len(df)

    Z = np.zeros((N, D))

    for j, col in enumerate(metric_cols):
        val = df[col].to_numpy().astype(float)
        # Map values to uniform empirical ranks
        u = self._compute_empirical_ranks(val)
        # Map uniform ranks to standard normal scores
        Z[:, j] = norm.ppf(u)

    # Compute latent Pearson correlation matrix of standard normal scores
    self.copula_correlation_ = np.corrcoef(Z, rowvar=False)
    if D == 1:
        self.copula_correlation_ = np.array([[1.0]])

    return self

test_joint_shift

test_joint_shift(
    df: DataFrame,
    treatment_col: str,
    metric_cols: List[str],
) -> Dict[str, Union[float, ndarray]]

Performs a joint Wald test of treatment effects across all correlated metrics.

H0: delta_1 = delta_2 = ... = delta_D = 0

PARAMETER DESCRIPTION
df

Experimental dataset.

TYPE: DataFrame

treatment_col

Column indicating binary treatment assignment (0 = control, 1 = treated).

TYPE: str

metric_cols

Correlated metric column names.

TYPE: List[str]

RETURNS DESCRIPTION
Dict

Dictionary containing joint Wald statistic, joint p-value, individual lifts, and covariance.

TYPE: Dict[str, Union[float, ndarray]]

Source code in src\xpyrment\analyze\copula.py
def test_joint_shift(
    self, df: pd.DataFrame, treatment_col: str, metric_cols: List[str]
) -> Dict[str, Union[float, np.ndarray]]:
    """Performs a joint Wald test of treatment effects across all correlated metrics.

    H0: delta_1 = delta_2 = ... = delta_D = 0

    Args:
        df (pd.DataFrame): Experimental dataset.
        treatment_col (str): Column indicating binary treatment assignment (0 = control, 1 = treated).
        metric_cols (List[str]): Correlated metric column names.

    Returns:
        Dict: Dictionary containing joint Wald statistic, joint p-value, individual lifts, and covariance.
    """
    D = len(metric_cols)
    if self.copula_correlation_ is None or self.copula_correlation_.shape[0] != D:
        self.fit_copula(df, metric_cols)

    # Split into control and treatment groups
    control_df = df[df[treatment_col] == 0]
    treated_df = df[df[treatment_col] == 1]

    N_c = len(control_df)
    N_t = len(treated_df)

    if N_c < 2 or N_t < 2:
        raise ValueError("Control and treated groups must contain at least 2 observations.")

    # 1. Compute individual treatment effects (mean differences) and standard errors
    delta = np.zeros(D)
    se = np.zeros(D)

    for j, col in enumerate(metric_cols):
        y_c = control_df[col].to_numpy().astype(float)
        y_t = treated_df[col].to_numpy().astype(float)

        delta[j] = np.mean(y_t) - np.mean(y_c)
        var_c = np.var(y_c, ddof=1)
        var_t = np.var(y_t, ddof=1)
        se[j] = np.sqrt(var_c / N_c + var_t / N_t)

    # 2. Reconstruct the joint covariance matrix of the treatment effect estimates
    # Cov(delta_j, delta_l) = R_jl * se_j * se_l
    Sigma_delta = np.zeros((D, D))
    for j in range(D):
        for l in range(D):
            Sigma_delta[j, l] = self.copula_correlation_[j, l] * se[j] * se[l]

    # Regularize to prevent singular matrix errors
    Sigma_delta_reg = Sigma_delta + self.l2_penalty * np.eye(D)

    # 3. Compute joint Wald statistic: W = delta^T * Sigma^-1 * delta
    try:
        inv_sigma = np.linalg.inv(Sigma_delta_reg)
        wald_stat = float(np.dot(delta, np.dot(inv_sigma, delta)))
    except np.linalg.LinAlgError:
        # Fallback using pseudoinverse if inversion fails
        inv_sigma = np.linalg.pinv(Sigma_delta_reg)
        wald_stat = float(np.dot(delta, np.dot(inv_sigma, delta)))

    # 4. Compute joint p-value under asymptotic Chi-squared distribution
    p_val = float(1.0 - chi2.cdf(wald_stat, df=D))

    return {
        "wald_statistic": wald_stat,
        "p_value": p_val,
        "deltas": delta,
        "standard_errors": se,
        "copula_correlation": self.copula_correlation_,
        "covariance_matrix": Sigma_delta,
    }

MarkovJourneyAnalyzer

MarkovJourneyAnalyzer(states: List[str])

Analyzes chronological user state transitions in experimentation datasets.

Extracts transition probability matrices, stationary distributions (long-term states), and executes Chi-squared transition homogeneity tests.

TODO: Add continuous-time Markov intensity matrix (Q) estimations to model exact duration stay times within states.

TODO: Implement bootstrap confidence interval approximations for stationary distribution probability shifts.

PARAMETER DESCRIPTION
states

Unique ordered labels of possible states in the funnel.

TYPE: List[str]

METHOD DESCRIPTION
extract_transitions

Helper to extract chronological pairwise transitions from a DataFrame grouped by user.

compute_transition_matrix

Computes transition count and probability matrices.

compute_stationary_distribution

Computes the stationary distribution pi of the Markov transition matrix via power iteration.

test_transition_homogeneity

Runs a Chi-squared homogeneity test of transition matrices between Control and Treatment.

Source code in src\xpyrment\analyze\markov.py
def __init__(self, states: List[str]) -> None:
    """Initializes the Markov journey analyzer.

    Args:
        states (List[str]): Unique ordered labels of possible states in the funnel.
    """
    self.states = sorted(list(set(states)))
    self.state_to_idx = {state: idx for idx, state in enumerate(self.states)}
    self.S = len(self.states)

extract_transitions

extract_transitions(
    df: DataFrame, user_col: str, state_col: str
) -> List[Tuple[str, str]]

Helper to extract chronological pairwise transitions from a DataFrame grouped by user.

PARAMETER DESCRIPTION
df

DataFrame containing chronological sequence of states per user.

TYPE: DataFrame

user_col

Column indicating user ID.

TYPE: str

state_col

Column indicating user state.

TYPE: str

RETURNS DESCRIPTION
List[Tuple[str, str]]

List[Tuple[str, str]]: Chronological transitions.

Source code in src\xpyrment\analyze\markov.py
def extract_transitions(self, df: pd.DataFrame, user_col: str, state_col: str) -> List[Tuple[str, str]]:
    """Helper to extract chronological pairwise transitions from a DataFrame grouped by user.

    Args:
        df (pd.DataFrame): DataFrame containing chronological sequence of states per user.
        user_col (str): Column indicating user ID.
        state_col (str): Column indicating user state.

    Returns:
        List[Tuple[str, str]]: Chronological transitions.
    """
    # Factorize the user column to get an integer representation that preserves the original appearance order
    user_codes, _ = pd.factorize(df[user_col])

    # Stably sort by these integer codes to mimic groupby(sort=False) ordering exactly
    sort_idx = np.argsort(user_codes, kind="stable")

    users = df[user_col].to_numpy()[sort_idx]
    states = df[state_col].to_numpy()[sort_idx]

    # Mask where the user is the same in the next row
    mask = users[:-1] == users[1:]

    from_states = states[:-1][mask]
    to_states = states[1:][mask]

    # Filter to only valid transitions using vectorized isin
    valid_states = np.array(list(self.state_to_idx.keys()))
    valid_mask = np.isin(from_states, valid_states) & np.isin(to_states, valid_states)

    final_from = from_states[valid_mask]
    final_to = to_states[valid_mask]

    transitions = list(zip(final_from, final_to))

    return transitions

compute_transition_matrix

compute_transition_matrix(
    transitions: List[Tuple[str, str]],
) -> Tuple[ndarray, ndarray]

Computes transition count and probability matrices.

PARAMETER DESCRIPTION
transitions

List of state-to-state transitions.

TYPE: List[Tuple[str, str]]

RETURNS DESCRIPTION
Tuple[ndarray, ndarray]

Tuple[np.ndarray, np.ndarray]: Transition count matrix (S, S) and probability matrix (S, S).

Source code in src\xpyrment\analyze\markov.py
def compute_transition_matrix(self, transitions: List[Tuple[str, str]]) -> Tuple[np.ndarray, np.ndarray]:
    """Computes transition count and probability matrices.

    Args:
        transitions (List[Tuple[str, str]]): List of state-to-state transitions.

    Returns:
        Tuple[np.ndarray, np.ndarray]: Transition count matrix (S, S) and probability matrix (S, S).
    """
    counts = np.zeros((self.S, self.S))
    for s_from, s_to in transitions:
        i = self.state_to_idx[s_from]
        j = self.state_to_idx[s_to]
        counts[i, j] += 1.0

    probs = np.zeros((self.S, self.S))
    for i in range(self.S):
        row_sum = np.sum(counts[i])
        if row_sum > 0:
            probs[i] = counts[i] / row_sum
        else:
            # Absorbing state behavior (transition to itself with probability 1.0)
            probs[i, i] = 1.0

    return counts, probs

compute_stationary_distribution

compute_stationary_distribution(
    probs: ndarray, max_iters: int = 150, tol: float = 1e-07
) -> ndarray

Computes the stationary distribution pi of the Markov transition matrix via power iteration.

Solves pi * P = pi subject to sum(pi) = 1.0.

Source code in src\xpyrment\analyze\markov.py
def compute_stationary_distribution(self, probs: np.ndarray, max_iters: int = 150, tol: float = 1e-7) -> np.ndarray:
    """Computes the stationary distribution pi of the Markov transition matrix via power iteration.

    Solves pi * P = pi subject to sum(pi) = 1.0.
    """
    # Start with a uniform distribution
    pi = np.full(self.S, 1.0 / self.S)

    for _ in range(max_iters):
        pi_new = np.dot(pi, probs)
        diff = np.linalg.norm(pi_new - pi, 1)
        pi = pi_new
        if diff < tol:
            break

    return pi

test_transition_homogeneity

test_transition_homogeneity(
    control_transitions: List[Tuple[str, str]],
    treatment_transitions: List[Tuple[str, str]],
) -> Dict[str, Union[float, ndarray]]

Runs a Chi-squared homogeneity test of transition matrices between Control and Treatment.

H0: P^(C) = P^(T) (transition probabilities are identical across groups)

Source code in src\xpyrment\analyze\markov.py
def test_transition_homogeneity(
    self, control_transitions: List[Tuple[str, str]], treatment_transitions: List[Tuple[str, str]]
) -> Dict[str, Union[float, np.ndarray]]:
    """Runs a Chi-squared homogeneity test of transition matrices between Control and Treatment.

    H0: P^(C) = P^(T) (transition probabilities are identical across groups)
    """
    c_counts, c_probs = self.compute_transition_matrix(control_transitions)
    t_counts, t_probs = self.compute_transition_matrix(treatment_transitions)

    total_chi_stat = 0.0
    total_df = 0

    # Perform test row by row (state by state)
    for i in range(self.S):
        # For state i, look at the counts of transitions to all j states
        c_row = c_counts[i]
        t_row = t_counts[i]

        sum_c = np.sum(c_row)
        sum_t = np.sum(t_row)
        sum_total = sum_c + sum_t

        if sum_c > 0 and sum_t > 0:
            # Expected probabilities under homogeneous transitions
            pooled_row = (c_row + t_row) / sum_total

            # Only test over non-zero expected bins to avoid division by zero
            valid_mask = (pooled_row > 0.0)
            n_valid_bins = np.sum(valid_mask)

            if n_valid_bins > 1:
                exp_c = sum_c * pooled_row[valid_mask]
                exp_t = sum_t * pooled_row[valid_mask]

                obs_c = c_row[valid_mask]
                obs_t = t_row[valid_mask]

                chi_stat = np.sum(((obs_c - exp_c) ** 2) / exp_c) + np.sum(((obs_t - exp_t) ** 2) / exp_t)
                total_chi_stat += chi_stat
                # df = (rows - 1) * (cols - 1) = (2 - 1) * (n_valid_bins - 1)
                total_df += (n_valid_bins - 1)

    p_value = 1.0 - chi2.cdf(total_chi_stat, df=total_df) if total_df > 0 else 1.0

    return {
        "chi2_statistic": total_chi_stat,
        "degrees_of_freedom": float(total_df),
        "p_value": float(p_value),
        "control_transition_probabilities": c_probs,
        "treatment_transition_probabilities": t_probs,
    }

ExtremeValueTailEstimator

ExtremeValueTailEstimator(percentile: float = 0.95)

Estimates tail indices and expected extreme lift using Generalized Pareto Distributions.

TODO: Add a profile likelihood fallback optimizer to support shape parameter estimation when xi is outside the MOM boundary [0, 0.5].

TODO: Implement automated threshold choice heuristics (e.g., using Hill plots or Gertensgarbe's sequential tests).

PARAMETER DESCRIPTION
percentile

Percentile threshold to define the tail cutoff. Defaults to 0.95.

TYPE: float DEFAULT: 0.95

METHOD DESCRIPTION
fit

Fits the GPD parameters on outcomes exceeding the pre-specified percentile.

expected_shortfall

Calculates Expected Shortfall (conditional tail expectation) for fitted GPD.

ATTRIBUTE DESCRIPTION
metrics

Returns GPD parameters and tail indices.

TYPE: Dict[str, float]

Source code in src\xpyrment\analyze\extreme.py
def __init__(self, percentile: float = 0.95) -> None:
    """Initializes the extreme value estimator.

    Args:
        percentile (float): Percentile threshold to define the tail cutoff. Defaults to 0.95.
    """
    self.percentile = percentile
    self.threshold_: float = 0.0
    self.scale_: float = 0.0  # sigma parameter
    self.shape_: float = 0.0  # xi tail index parameter

metrics property

metrics: Dict[str, float]

Returns GPD parameters and tail indices.

fit

fit(outcomes: ndarray) -> ExtremeValueTailEstimator

Fits the GPD parameters on outcomes exceeding the pre-specified percentile.

Uses the Method of Moments (MOM) for GPD estimation (valid for shape xi < 0.5): sigma_MOM = 1/2 * mean_exceed * (1 + mean_exceed^2 / var_exceed) xi_MOM = 1/2 * (1 - mean_exceed^2 / var_exceed)

Source code in src\xpyrment\analyze\extreme.py
def fit(self, outcomes: np.ndarray) -> "ExtremeValueTailEstimator":
    """Fits the GPD parameters on outcomes exceeding the pre-specified percentile.

    Uses the Method of Moments (MOM) for GPD estimation (valid for shape xi < 0.5):
        sigma_MOM = 1/2 * mean_exceed * (1 + mean_exceed^2 / var_exceed)
        xi_MOM = 1/2 * (1 - mean_exceed^2 / var_exceed)
    """
    y = outcomes.astype(float)
    self.threshold_ = float(np.percentile(y, self.percentile * 100))

    exceedances = y[y > self.threshold_] - self.threshold_
    if len(exceedances) < 5:
        # Fallback if too few tail samples
        self.scale_ = 1.0
        self.shape_ = 0.1
        return self

    mean_ex = np.mean(exceedances)
    var_ex = np.var(exceedances, ddof=1)
    if var_ex <= 1e-9:
        var_ex = 1e-9

    # Method of Moments formulas
    ratio = (mean_ex ** 2) / var_ex
    self.scale_ = float(0.5 * mean_ex * (1.0 + ratio))
    self.shape_ = float(0.5 * (1.0 - ratio))

    # Enforce physical constraints for heavy tails
    if self.scale_ <= 1e-5:
        self.scale_ = 1e-5
    self._clip_shape()

    return self

expected_shortfall

expected_shortfall() -> float

Calculates Expected Shortfall (conditional tail expectation) for fitted GPD.

ES_u = u + (sigma + xi * (VaR - u)) / (1 - xi) At the threshold itself (VaR = u), ES is simply: ES = u + sigma / (1 - xi)

Source code in src\xpyrment\analyze\extreme.py
def expected_shortfall(self) -> float:
    """Calculates Expected Shortfall (conditional tail expectation) for fitted GPD.

    ES_u = u + (sigma + xi * (VaR - u)) / (1 - xi)
    At the threshold itself (VaR = u), ES is simply:
        ES = u + sigma / (1 - xi)
    """
    if self.shape_ >= 1.0:
        return float("inf")  # Tail variance is infinite
    return self.threshold_ + self.scale_ / (1.0 - self.shape_)

InterruptedTimeSeries

InterruptedTimeSeries(treatment_index: int)

Models segmented regression over system-wide updates with Newey-West HAC standard errors.

TODO: Support autoregressive integrated moving average (ARIMA) error components integrated with the segmented OLS model.

TODO: Add dynamic lag order selection using AIC/BIC information criteria to optimize the Newey-West HAC spectral bandwidth.

PARAMETER DESCRIPTION
treatment_index

Step/index where the policy intervention was activated.

TYPE: int

METHOD DESCRIPTION
fit

Fits the interrupted time series model.

ATTRIBUTE DESCRIPTION
results

Returns coefficient summaries, standard errors, and p-values.

TYPE: Dict[str, Dict[str, float]]

Source code in src\xpyrment\analyze\its.py
def __init__(self, treatment_index: int) -> None:
    """Initializes the ITS model.

    Args:
        treatment_index (int): Step/index where the policy intervention was activated.
    """
    self.treatment_index = treatment_index
    self.beta_: np.ndarray = np.array([])
    self.standard_errors_: np.ndarray = np.array([])
    self.t_statistics_: np.ndarray = np.array([])
    self.p_values_: np.ndarray = np.array([])

results property

results: Dict[str, Dict[str, float]]

Returns coefficient summaries, standard errors, and p-values.

fit

fit(outcomes: ndarray) -> InterruptedTimeSeries

Fits the interrupted time series model.

PARAMETER DESCRIPTION
outcomes

Chronological series of outcomes.

TYPE: ndarray

Source code in src\xpyrment\analyze\its.py
def fit(self, outcomes: np.ndarray) -> "InterruptedTimeSeries":
    """Fits the interrupted time series model.

    Args:
        outcomes (np.ndarray): Chronological series of outcomes.
    """
    T = len(outcomes)
    if T <= 4:
        raise ValueError("Interrupted time series requires more than 4 data points.")

    # Construct segmented regressors
    t_arr = np.arange(T, dtype=float)
    D_arr = (t_arr >= self.treatment_index).astype(float)
    P_arr = np.maximum(0.0, t_arr - self.treatment_index)

    # Design matrix X: [intercept, time, level_shift, slope_shift]
    X = np.vstack([np.ones(T), t_arr, D_arr, P_arr]).T
    Y = outcomes.astype(float)

    # Solve OLS: beta = (X^T X)^-1 X^T Y
    XTX = np.dot(X.T, X)
    try:
        self.beta_ = np.linalg.solve(XTX, np.dot(X.T, Y))
    except np.linalg.LinAlgError:
        self.beta_ = np.linalg.pinv(XTX).dot(np.dot(X.T, Y))

    # Compute residuals
    residuals = Y - np.dot(X, self.beta_)

    # --- Compute Newey-West HAC Covariance Matrix ---
    # Automatically choose lag threshold L = floor(4 * (T / 100)^(2/9))
    L = int(np.floor(4.0 * (T / 100.0) ** (2.0 / 9.0)))
    if L >= T:
        L = T - 1
    if L < 1:
        L = 1

    # Gamma_0 (standard White heteroskedasticity matrix)
    Gamma_0 = np.zeros((4, 4))
    for t in range(T):
        x_t = X[t].reshape(-1, 1)
        Gamma_0 += (residuals[t] ** 2) * np.dot(x_t, x_t.T)

    # Adding weighted auto-covariance terms
    Gamma_lag_sum = np.zeros((4, 4))
    for l in range(1, L + 1):
        # Bartlett weight: 1 - l / (L + 1)
        w_l = 1.0 - (l / (L + 1.0))
        Gamma_l = np.zeros((4, 4))
        for t in range(l, T):
            x_t = X[t].reshape(-1, 1)
            x_t_lag = X[t - l].reshape(-1, 1)
            Gamma_l += residuals[t] * residuals[t - l] * np.dot(x_t, x_t_lag.T)
        Gamma_lag_sum += w_l * (Gamma_l + Gamma_l.T)

    # Omega_HAC pooled spectral matrix
    Omega_HAC = Gamma_0 + Gamma_lag_sum

    # Covariance matrix: (X^T X)^-1 * Omega_HAC * (X^T X)^-1
    XTX_inv = np.linalg.pinv(XTX)
    cov_matrix = np.dot(np.dot(XTX_inv, Omega_HAC), XTX_inv)

    # Standard errors & statistics
    df = T - 4
    from scipy.stats import t as t_dist

    self.standard_errors_ = np.sqrt(np.maximum(0.0, np.diag(cov_matrix)))
    self.t_statistics_ = np.zeros(4)
    self.p_values_ = np.zeros(4)

    for i in range(4):
        se = self.standard_errors_[i]
        if se > 0:
            t_stat = self.beta_[i] / se
            self.t_statistics_[i] = t_stat
            self.p_values_[i] = float(2 * (1 - t_dist.cdf(abs(t_stat), df))) if df > 0 else 1.0
        else:
            self.t_statistics_[i] = 0.0
            self.p_values_[i] = 1.0

    return self

GroupSequentialMonitor

GroupSequentialMonitor(
    alpha: float = 0.05,
    spending_type: str = "obrien_fleming",
)

Computes O'Brien-Fleming and Pocock sequential boundaries via Lan-DeMets spending functions.

TODO: Implement exact multivariate normal integration (e.g. using Genz-Bretz algorithms) to solve multi-look joint covariance critical bounds.

TODO: Support binding and non-binding futility boundaries using beta-spending formulations to support early stopping for futility.

PARAMETER DESCRIPTION
alpha

Total Type I error budget. Defaults to 0.05.

TYPE: float DEFAULT: 0.05

spending_type

'obrien_fleming' or 'pocock' type spending.

TYPE: str DEFAULT: 'obrien_fleming'

METHOD DESCRIPTION
alpha_spent

Computes cumulative alpha spent at information fraction t in [0, 1].

compute_boundaries

Computes sequential critical boundary values Z_crit and p-value boundaries for each look.

Source code in src\xpyrment\analyze\sequential.py
def __init__(self, alpha: float = 0.05, spending_type: str = "obrien_fleming") -> None:
    """Initializes the group sequential monitor.

    Args:
        alpha (float): Total Type I error budget. Defaults to 0.05.
        spending_type (str): 'obrien_fleming' or 'pocock' type spending.
    """
    self.alpha = alpha
    if spending_type not in ("obrien_fleming", "pocock"):
        raise ValueError("spending_type must be either 'obrien_fleming' or 'pocock'.")
    self.spending_type = spending_type

alpha_spent

alpha_spent(t: float) -> float

Computes cumulative alpha spent at information fraction t in [0, 1].

Source code in src\xpyrment\analyze\sequential.py
def alpha_spent(self, t: float) -> float:
    """Computes cumulative alpha spent at information fraction t in [0, 1]."""
    if t <= 0.0:
        return 0.0
    if t >= 1.0:
        return self.alpha

    if self.spending_type == "obrien_fleming":
        # Lan-DeMets (1983) O'Brien-Fleming approximation:
        # alpha(t) = 2 * (1 - Phi(Z_{alpha/2} / sqrt(t)))
        z = norm.ppf(1.0 - self.alpha / 2.0)
        return float(2.0 * (1.0 - norm.cdf(z / np.sqrt(t))))
    else:
        # Lan-DeMets (1983) Pocock approximation:
        # alpha(t) = alpha * ln(1 + (e - 1) * t)
        return float(self.alpha * np.log(1.0 + (np.e - 1.0) * t))

compute_boundaries

compute_boundaries(
    information_fractions: List[float],
) -> Dict[str, Union[List[float], ndarray]]

Computes sequential critical boundary values Z_crit and p-value boundaries for each look.

Using sequential spent alpha increments, the critical z-score at look k satisfies: P(Reject at look k | No previous rejections) = alpha(t_k) - alpha(t_{k-1})

Using a standard sequential spending boundary approximation

Z_crit(k) = Phi^-1(1 - (alpha(t_k) - alpha(t_{k-1})) / 2)

Source code in src\xpyrment\analyze\sequential.py
def compute_boundaries(self, information_fractions: List[float]) -> Dict[str, Union[List[float], np.ndarray]]:
    """Computes sequential critical boundary values Z_crit and p-value boundaries for each look.

    Using sequential spent alpha increments, the critical z-score at look k satisfies:
        P(Reject at look k | No previous rejections) = alpha(t_k) - alpha(t_{k-1})

    Using a standard sequential spending boundary approximation:
        Z_crit(k) = Phi^-1(1 - (alpha(t_k) - alpha(t_{k-1})) / 2)
    """
    t_list = sorted(information_fractions)
    K = len(t_list)

    cumulative_alpha = []
    incremental_alpha = []
    z_boundaries = []
    p_boundaries = []

    last_spent = 0.0
    for t in t_list:
        if not (0.0 < t <= 1.0):
            raise ValueError("Information fractions must be strictly in the range (0, 1].")

        spent = self.alpha_spent(t)
        inc = spent - last_spent

        # Enforce non-negativity and small floor values
        if inc < 1e-9:
            inc = 1e-9

        cumulative_alpha.append(spent)
        incremental_alpha.append(inc)

        # Two-sided critical boundary approximation
        z_crit = float(norm.ppf(1.0 - inc / 2.0))
        p_crit = float(2.0 * (1.0 - norm.cdf(z_crit)))

        z_boundaries.append(z_crit)
        p_boundaries.append(p_crit)

        last_spent = spent

    return {
        "information_fractions": t_list,
        "cumulative_alpha_spent": cumulative_alpha,
        "incremental_alpha_spent": incremental_alpha,
        "critical_z_boundaries": z_boundaries,
        "critical_p_boundaries": p_boundaries,
    }

MetaRegressor

MetaRegressor(l2_penalty: float = 1e-05)

Solves random-effects meta-regression models with Knapp-Hartung or Newey-West HAC standard errors.

Mathematical Specifications of Meta-Regression

Meta-regression aggregates study-level treatment effect estimates across various trials and cohorts.

Let \(y_j\) be the estimated treatment effect of study \(j \in \{1, \dots, J\}\), let \(v_j\) be the within-study variance, and let \(X_j\) be a \(P\)-dimensional vector of study-level covariates. The random-effects meta-regression model is: $$ y_j = X_j \beta + u_j + \epsilon_j $$ where \(u_j \sim N(0, \tau^2)\) is the between-study random effect, and \(\epsilon_j \sim N(0, v_j)\) is the within-study error.

  1. DerSimonian-Laird Heterogeneity Variance Estimation: The between-study variance parameter \(\tau^2\) is estimated using a closed-form method of moments: $$ \tau^2 = \max\left(0, \frac{Q - (J - P)}{\sum_{j=1}^J w_{FE, j} - \text{tr}((X^T W_{FE} X)^{-1} X^T W_{FE}^2 X)}\right) $$ where \(Q = \sum_{j=1}^J (y_j - X_j \hat{\beta}_{FE})^2 / v_j\) is Cochran's Q statistic, and \(W_{FE} = \text{diag}(1/v_j)\) represents fixed-effect weights.

  2. Coefficients WLS Solver: With the estimated \(\tau^2\), the final random-effects weights are \(w_j = 1 / (v_j + \tau^2)\). The coefficients are solved via: $$ \hat{\beta} = (X^T W X)^{-1} X^T W y $$ where \(W = \text{diag}(w_j)\).

  3. Knapp-Hartung (KH) Covariance Adjustment: Standard random-effects models can underestimate standard errors when \(J\) is small. KH adjusts the variance-covariance matrix: $$ \text{Cov}(\hat{\beta}){KH} = q \cdot (X^T W X)^{-1} $$ where \(q\) is a scaling multiplier based on weighted residual sum of squares: $$ q = \max\left(1.0, \frac{\sum{j=1}^J w_j (y_j - X_j \hat{\beta})^2}{J - P}\right) $$

  4. Newey-West HAC Covariance Correction: If the studies are ordered chronologically and exhibit serial correlation, Newey-West HAC standard errors can be computed. The scores (estimating function contributions) for each study \(j\) are: $$ g_j = w_j e_j X_j^T $$ where \(e_j = y_j - X_j \hat{\beta}\) is the study-level residual. The Heteroskedasticity and Autocorrelation Consistent (HAC) long-run covariance of these scores is estimated with the Bartlett kernel: $$ \Omega_{HAC} = \Gamma_0 + \sum_{l=1}^L \left(1 - \frac{l}{L+1}\right) (\Gamma_l + \Gamma_l^T) $$ where \(\Gamma_l = \sum_{j=l+1}^J g_j g_{j-l}^T\). The final sandwich covariance estimator is: $$ \text{Cov}(\hat{\beta}){HAC} = (X^T W X)^{-1} \Omega{HAC} (X^T W X)^{-1} $$

PARAMETER DESCRIPTION
l2_penalty

Ridge penalty for numerical stability. Defaults to 1e-5.

TYPE: float DEFAULT: 1e-05

METHOD DESCRIPTION
fit

Fits the random-effects meta-regression model.

ATTRIBUTE DESCRIPTION
results

Returns fitted parameters, tau^2, and coefficient stats.

TYPE: Dict[str, Union[float, ndarray, str]]

Source code in src\xpyrment\analyze\meta_regression.py
def __init__(self, l2_penalty: float = 1e-5) -> None:
    """Initializes the meta-regressor.

    Args:
        l2_penalty (float): Ridge penalty for numerical stability. Defaults to 1e-5.
    """
    self.l2_penalty = l2_penalty
    self.beta_: np.ndarray = np.array([])
    self.se_: np.ndarray = np.array([])
    self.p_values_: np.ndarray = np.array([])
    self.tau_sq_: float = 0.0  # Between-study variance parameter
    self.cov_type_: str = "classic"

results property

results: Dict[str, Union[float, ndarray, str]]

Returns fitted parameters, tau^2, and coefficient stats.

fit

fit(
    outcomes: ndarray,
    variances: ndarray,
    covariates: ndarray,
    cov_type: str = "classic",
    hac_lag: Optional[int] = None,
) -> MetaRegressor

Fits the random-effects meta-regression model.

PARAMETER DESCRIPTION
outcomes

Study-level estimated treatment effects of shape (J,).

TYPE: ndarray

variances

Study-level within-study variances (SE^2) of shape (J,).

TYPE: ndarray

covariates

Covariate matrix of shape (J, P).

TYPE: ndarray

cov_type

Covariance/standard error adjustment type ("classic" for Knapp-Hartung, "hac" for Newey-West). Defaults to "classic".

TYPE: str DEFAULT: 'classic'

hac_lag

Spectral lag bandwidth parameter (\(L\)). If None, computed automatically. Defaults to None.

TYPE: Optional[int] DEFAULT: None

Source code in src\xpyrment\analyze\meta_regression.py
def fit(
    self,
    outcomes: np.ndarray,
    variances: np.ndarray,
    covariates: np.ndarray,
    cov_type: str = "classic",
    hac_lag: Optional[int] = None,
) -> "MetaRegressor":
    """Fits the random-effects meta-regression model.

    Args:
        outcomes (np.ndarray): Study-level estimated treatment effects of shape (J,).
        variances (np.ndarray): Study-level within-study variances (SE^2) of shape (J,).
        covariates (np.ndarray): Covariate matrix of shape (J, P).
        cov_type (str): Covariance/standard error adjustment type (`"classic"` for Knapp-Hartung, `"hac"` for Newey-West). Defaults to `"classic"`.
        hac_lag (Optional[int]): Spectral lag bandwidth parameter ($L$). If None, computed automatically. Defaults to None.
    """
    J = len(outcomes)
    P = covariates.shape[1] if len(covariates.shape) > 1 else 1

    if J == 0:
        raise ValueError("Cannot fit meta-regression on empty dataset.")

    y = outcomes.astype(float)
    v = variances.astype(float)
    X = covariates.reshape(J, P).astype(float)

    if np.any(v <= 0.0):
        raise ValueError("Study within-study variances must be positive.")

    self.cov_type_ = cov_type

    # 1. Estimate initial fixed-effects coefficients to get residuals
    W_fe = np.diag(1.0 / v)
    XTWX_fe = np.dot(np.dot(X.T, W_fe), X) + self.l2_penalty * np.eye(P)
    beta_fe = np.linalg.solve(XTWX_fe, np.dot(np.dot(X.T, W_fe), y))

    # Heterogeneity statistic Q
    residuals_fe = y - np.dot(X, beta_fe)
    Q = np.sum((residuals_fe ** 2) / v)

    # 2. Compute between-study variance tau^2 using DerSimonian-Laird (DL) formula
    # Trace parameter: trace( W - W * X * (X^T * W * X)^-1 * X^T * W )
    inv_XTWX_fe = np.linalg.pinv(XTWX_fe)
    term1 = np.sum(1.0 / v)

    W_X = X / v.reshape(-1, 1)
    term2 = np.trace(np.dot(np.dot(W_X.T, W_X), inv_XTWX_fe))

    denom = term1 - term2
    if denom <= 0.0:
        denom = 1e-9

    self.tau_sq_ = max(0.0, (Q - (J - P)) / denom)

    # 3. Compute final random-effects coefficients using weights w_j = 1 / (v_j + tau^2)
    w_re = 1.0 / (v + self.tau_sq_)
    W_re = np.diag(w_re)

    XTWX_re = np.dot(np.dot(X.T, W_re), X) + self.l2_penalty * np.eye(P)
    self.beta_ = np.linalg.solve(XTWX_re, np.dot(np.dot(X.T, W_re), y))

    # 4. Compute standard error adjustments
    inv_XTWX_re = np.linalg.pinv(XTWX_re)
    residuals_re = y - np.dot(X, self.beta_)
    df = J - P

    if cov_type == "classic":
        # Knapp-Hartung (KH) scale factor q
        if df > 0:
            q = np.sum(w_re * (residuals_re ** 2)) / df
        else:
            q = 1.0

        # Enforce positive scale factor
        if q < 1.0:
            q = 1.0

        cov_matrix = q * inv_XTWX_re
    elif cov_type == "hac":
        # Newey-West HAC covariance calculations
        # Incorporate weights into scores g_j = w_j * e_j * X_j
        G = (w_re * residuals_re)[:, np.newaxis] * X  # shape (J, P)

        # Determine lag bandwidth L
        if hac_lag is not None:
            L = int(hac_lag)
        else:
            L = int(np.floor(4.0 * (J / 100.0) ** (2.0 / 9.0)))

        if L >= J:
            L = J - 1
        if L < 0:
            L = 0

        # Gamma_0 (standard White heteroskedasticity matrix)
        Gamma_0 = np.dot(G.T, G)

        # Bartlett kernel auto-covariance lagged sum
        Gamma_lag_sum = np.zeros((P, P))
        for l in range(1, L + 1):
            w_l = 1.0 - (l / (L + 1.0))
            Gamma_l = np.dot(G[l:].T, G[:-l])
            Gamma_lag_sum += w_l * (Gamma_l + Gamma_l.T)

        Omega_HAC = Gamma_0 + Gamma_lag_sum

        # Sandwich estimator: (X^T W X)^-1 * Omega_HAC * (X^T W X)^-1
        cov_matrix = np.dot(np.dot(inv_XTWX_re, Omega_HAC), inv_XTWX_re)
        # Small sample degrees-of-freedom correction
        df_corr = J / (J - P) if J > P else 1.0
        cov_matrix = cov_matrix * df_corr
    else:
        raise ValueError(f"Unknown cov_type: {cov_type}. Choose 'classic' or 'hac'.")

    self.se_ = np.sqrt(np.maximum(0.0, np.diag(cov_matrix)))

    # 5. Two-tailed p-values using Student's t-distribution with (J - P) df
    from scipy.stats import t as t_dist
    self.p_values_ = np.zeros(P)
    for i in range(P):
        se = self.se_[i]
        if se > 0 and df > 0:
            t_stat = self.beta_[i] / se
            self.p_values_[i] = float(2 * (1 - t_dist.cdf(abs(t_stat), df)))
        else:
            self.p_values_[i] = 1.0

    return self

SampleRatioMismatchDetector

SampleRatioMismatchDetector(
    target_treatment_ratio: float = 0.5,
)

Detects Sample Ratio Mismatch (SRM) using frequentist retrospective and online sequential tests.

TODO: Implement sequential binomial SPRT variance boundaries to handle dynamic sample-rate drift under multi-arm scenarios. TODO: Add Monte Carlo power estimation diagnostics for retrospective sample size mismatch sensitivity.

PARAMETER DESCRIPTION
target_treatment_ratio

Targeted assignment fraction for the treatment group. Defaults to 0.5.

TYPE: float DEFAULT: 0.5

METHOD DESCRIPTION
test_retrospective

Performs a retrospective Pearson Chi-Squared goodness-of-fit test.

test_sequential

Performs Wald's Sequential Probability Ratio Test (SPRT) over binomial allocations.

Source code in src\xpyrment\analyze\srm.py
def __init__(self, target_treatment_ratio: float = 0.5) -> None:
    """Initializes the SRM detector.

    Args:
        target_treatment_ratio (float): Targeted assignment fraction for the treatment group. Defaults to 0.5.
    """
    if not (0.0 < target_treatment_ratio < 1.0):
        raise ValueError("Target treatment ratio must be strictly in the range (0, 1).")
    self.target_trt = target_treatment_ratio
    self.target_ctrl = 1.0 - target_treatment_ratio

test_retrospective

test_retrospective(
    observed_counts: Tuple[int, int],
) -> Dict[str, Union[float, bool]]

Performs a retrospective Pearson Chi-Squared goodness-of-fit test.

PARAMETER DESCRIPTION
observed_counts

A tuple of (observed_control, observed_treatment).

TYPE: Tuple[int, int]

RETURNS DESCRIPTION
Dict[str, Union[float, bool]]

Dict[str, Union[float, bool]]: SRM test statistics, p-value, and flag indicating mismatch.

Source code in src\xpyrment\analyze\srm.py
def test_retrospective(self, observed_counts: Tuple[int, int]) -> Dict[str, Union[float, bool]]:
    """Performs a retrospective Pearson Chi-Squared goodness-of-fit test.

    Args:
        observed_counts (Tuple[int, int]): A tuple of (observed_control, observed_treatment).

    Returns:
        Dict[str, Union[float, bool]]: SRM test statistics, p-value, and flag indicating mismatch.
    """
    obs_ctrl, obs_trt = observed_counts
    if obs_ctrl < 0 or obs_trt < 0:
        raise ValueError("Observed counts must be non-negative integers.")

    N = obs_ctrl + obs_trt
    if N == 0:
        logger.warning("SRM check bypassed: total observed counts is 0.")
        return {
            "chi_squared_statistic": 0.0,
            "p_value": 1.0,
            "srm_detected": False,
        }

    # Expected counts
    exp_ctrl = N * self.target_ctrl
    exp_trt = N * self.target_trt

    if exp_ctrl < 5 or exp_trt < 5:
        logger.warning("SRM chi-square approximation may be invalid because expected counts are < 5.")

    # Pearson Chi-Squared formula
    chi_sq = ((obs_ctrl - exp_ctrl) ** 2) / exp_ctrl + ((obs_trt - exp_trt) ** 2) / exp_trt

    # 1 Degree of Freedom for a 2-class goodness-of-fit test
    p_val = float(1.0 - chi2.cdf(chi_sq, df=1))

    # SRM is conventionally flagged at highly conservative significance levels (e.g. alpha = 0.001)
    srm_detected = p_val < 0.001

    return {
        "chi_squared_statistic": float(chi_sq),
        "p_value": p_val,
        "srm_detected": bool(srm_detected),
    }

test_sequential

test_sequential(
    assignments: ndarray,
    delta: float = 0.02,
    alpha: float = 0.01,
) -> Dict[str, Union[ndarray, bool]]

Performs Wald's Sequential Probability Ratio Test (SPRT) over binomial allocations.

Determines if the running allocation ratio departs significantly from target ratio. Null Hypothesis (H_0): p = target_treatment_ratio Alternative Hypotheses (H_1): p = target_treatment_ratio + delta OR target_treatment_ratio - delta

RETURNS DESCRIPTION
Dict[str, Union[ndarray, bool]]

Dict[str, Union[np.ndarray, bool]]: Running likelihood ratios and stopping decisions.

Source code in src\xpyrment\analyze\srm.py
def test_sequential(self, assignments: np.ndarray, delta: float = 0.02, alpha: float = 0.01) -> Dict[str, Union[np.ndarray, bool]]:
    """Performs Wald's Sequential Probability Ratio Test (SPRT) over binomial allocations.

    Determines if the running allocation ratio departs significantly from target ratio.
    Null Hypothesis (H_0): p = target_treatment_ratio
    Alternative Hypotheses (H_1): p = target_treatment_ratio + delta OR target_treatment_ratio - delta

    Returns:
        Dict[str, Union[np.ndarray, bool]]: Running likelihood ratios and stopping decisions.
    """
    if delta <= 0.0 or self.target_trt + delta >= 1.0 or self.target_trt - delta <= 0.0:
        raise ValueError("Delta must satisfy 0 < target_treatment_ratio +/- delta < 1.")

    # Convert assignments to binary array where 1 = treatment, 0 = control
    arr = np.array(assignments, dtype=int)
    N = len(arr)
    if N == 0:
        return {
            "running_likelihood_ratios": np.array([]),
            "srm_detected": False,
            "stopped_index": -1,
        }

    # Define bounds for Wald's test
    # Accept H_0 if LR <= beta / (1 - alpha) [not stopping for SRM acceptance]
    # Reject H_0 if LR >= (1 - beta) / alpha ~ 1 / alpha (frequentist Type I bound)
    upper_threshold = 1.0 / alpha

    p_0 = self.target_trt
    p_alt_high = p_0 + delta
    p_alt_low = p_0 - delta

    # Precompute log-likelihood ratios for an assignment:
    # log_lr = x * ln(p_alt / p_0) + (1 - x) * ln((1 - p_alt) / (1 - p_0))
    log_ratio_high_1 = np.log(p_alt_high / p_0)
    log_ratio_high_0 = np.log((1.0 - p_alt_high) / (1.0 - p_0))

    log_ratio_low_1 = np.log(p_alt_low / p_0)
    log_ratio_low_0 = np.log((1.0 - p_alt_low) / (1.0 - p_0))

    running_log_lr_high = 0.0
    running_log_lr_low = 0.0

    lr_series = []
    srm_detected = False
    stopped_idx = -1

    for idx, val in enumerate(arr):
        if val == 1:
            running_log_lr_high += log_ratio_high_1
            running_log_lr_low += log_ratio_low_1
        else:
            running_log_lr_high += log_ratio_high_0
            running_log_lr_low += log_ratio_low_0

        # Two-sided mixture of likelihood ratios
        lr_mixed = 0.5 * np.exp(running_log_lr_high) + 0.5 * np.exp(running_log_lr_low)
        lr_series.append(lr_mixed)

        if lr_mixed >= upper_threshold and not srm_detected:
            srm_detected = True
            stopped_idx = idx

    return {
        "running_likelihood_ratios": np.array(lr_series),
        "srm_detected": srm_detected,
        "stopped_index": stopped_idx,
    }

WinsorizationEngine

WinsorizationEngine(
    bounds: Tuple[float, float] = (0.01, 0.99)
)

Provides symmetric or asymmetric percentile-based capping boundaries.

TODO: Support adaptive Hampel filters for time-series rolling outlier identification and correction. TODO: Implement Huber-loss robust M-estimation scaling as an alternative to hard Winsorized trimming.

PARAMETER DESCRIPTION
bounds

Percentile thresholds [lower, upper] in [0, 1]. Defaults to (0.01, 0.99).

TYPE: Tuple[float, float] DEFAULT: (0.01, 0.99)

METHOD DESCRIPTION
fit

Calculates value thresholds at the configured percentiles.

transform

Applies winsorization capping boundaries on the target data.

fit_transform

Fits thresholds and applies winsorization transformation.

Source code in src\xpyrment\analyze\outliers.py
def __init__(self, bounds: Tuple[float, float] = (0.01, 0.99)) -> None:
    """Initializes the Winsorization Engine.

    Args:
        bounds (Tuple[float, float]): Percentile thresholds [lower, upper] in [0, 1].
            Defaults to (0.01, 0.99).
    """
    lower, upper = bounds
    if not (0.0 <= lower < upper <= 1.0):
        raise ValueError("Winsorization bounds must satisfy 0.0 <= lower < upper <= 1.0.")
    self.lower = lower
    self.upper = upper
    self.lower_val_: float = 0.0
    self.upper_val_: float = 0.0

fit

fit(data: ndarray) -> WinsorizationEngine

Calculates value thresholds at the configured percentiles.

PARAMETER DESCRIPTION
data

Data array to extract percentiles from.

TYPE: ndarray

Source code in src\xpyrment\analyze\outliers.py
def fit(self, data: np.ndarray) -> "WinsorizationEngine":
    """Calculates value thresholds at the configured percentiles.

    Args:
        data (np.ndarray): Data array to extract percentiles from.
    """
    arr = np.asarray(data)
    if len(arr) == 0:
        self.lower_val_ = 0.0
        self.upper_val_ = 0.0
        return self

    self.lower_val_ = float(np.percentile(arr, self.lower * 100.0))
    self.upper_val_ = float(np.percentile(arr, self.upper * 100.0))
    return self

transform

transform(data: ndarray) -> ndarray

Applies winsorization capping boundaries on the target data.

PARAMETER DESCRIPTION
data

Target data to transform.

TYPE: ndarray

Source code in src\xpyrment\analyze\outliers.py
def transform(self, data: np.ndarray) -> np.ndarray:
    """Applies winsorization capping boundaries on the target data.

    Args:
        data (np.ndarray): Target data to transform.
    """
    arr = np.asarray(data).copy()
    if len(arr) == 0:
        return arr

    return np.clip(arr, self.lower_val_, self.upper_val_)

fit_transform

fit_transform(data: ndarray) -> ndarray

Fits thresholds and applies winsorization transformation.

Source code in src\xpyrment\analyze\outliers.py
def fit_transform(self, data: np.ndarray) -> np.ndarray:
    """Fits thresholds and applies winsorization transformation."""
    return self.fit(data).transform(data)

RatioMetricDeltaMethod

RatioMetricDeltaMethod()

Estimates ratio values, delta-variance bounds, and Wald treatment comparisons.

TODO: Integrate Fieller's Theorem confidence intervals to complement standard Delta Method Wald standard errors. TODO: Implement robust Huber-White sandwich estimator overrides for ratio cluster-correlated observations.

METHOD DESCRIPTION
compute_ratio_variance

Applies the Delta Method Taylor expansion to calculate variance of ratio of means:

fit

Fits the ratio delta method over control and treatment groups.

ATTRIBUTE DESCRIPTION
results

Returns point estimates, delta variances, standard errors, and Wald p-values.

TYPE: Dict[str, float]

Source code in src\xpyrment\analyze\ratio.py
def __init__(self) -> None:
    """Initializes the ratio delta-method estimator."""
    self.control_ratio_: float = 0.0
    self.treatment_ratio_: float = 0.0
    self.treatment_effect_: float = 0.0

    self.control_var_: float = 0.0
    self.treatment_var_: float = 0.0
    self.pooled_se_: float = 0.0

    self.z_statistic_: float = 0.0
    self.p_value_: float = 1.0

results property

results: Dict[str, float]

Returns point estimates, delta variances, standard errors, and Wald p-values.

compute_ratio_variance

compute_ratio_variance(
    num: ndarray, den: ndarray
) -> Tuple[float, float]

Applies the Delta Method Taylor expansion to calculate variance of ratio of means:

Var(Y_bar / X_bar) approx (1 / mu_x^2) * Var(Y_bar) + (mu_y^2 / mu_x^4) * Var(X_bar) - 2 * (mu_y / mu_x^3) * Cov(Y_bar, X_bar)

Source code in src\xpyrment\analyze\ratio.py
def compute_ratio_variance(self, num: np.ndarray, den: np.ndarray) -> Tuple[float, float]:
    """Applies the Delta Method Taylor expansion to calculate variance of ratio of means:

        Var(Y_bar / X_bar) approx (1 / mu_x^2) * Var(Y_bar) + (mu_y^2 / mu_x^4) * Var(X_bar)
                                 - 2 * (mu_y / mu_x^3) * Cov(Y_bar, X_bar)
    """
    N = len(num)
    if N < 2:
        return 0.0, 0.0

    y_mean = float(np.mean(num))
    x_mean = float(np.mean(den))

    if x_mean == 0.0:
        raise ZeroDivisionError("Mean of denominator (X) cannot be zero for ratio estimation.")

    ratio = y_mean / x_mean

    # Variance of sample means (sample variance divided by N)
    var_y_bar = float(np.var(num, ddof=1)) / N
    var_x_bar = float(np.var(den, ddof=1)) / N

    # Covariance of sample means: Cov(Y_bar, X_bar) = Cov(Y, X) / N
    cov_yx = float(np.cov(num, den)[0, 1])
    cov_yx_bar = cov_yx / N

    # Delta Method expansion terms
    term_num = var_y_bar / (x_mean ** 2)
    term_den = (var_x_bar * (y_mean ** 2)) / (x_mean ** 4)
    term_cov = 2.0 * y_mean * cov_yx_bar / (x_mean ** 3)

    ratio_variance = term_num + term_den - term_cov
    return ratio, float(ratio_variance)

fit

fit(
    num_ctrl: ndarray,
    den_ctrl: ndarray,
    num_trt: ndarray,
    den_trt: ndarray,
) -> RatioMetricDeltaMethod

Fits the ratio delta method over control and treatment groups.

Source code in src\xpyrment\analyze\ratio.py
def fit(self, num_ctrl: np.ndarray, den_ctrl: np.ndarray, num_trt: np.ndarray, den_trt: np.ndarray) -> "RatioMetricDeltaMethod":
    """Fits the ratio delta method over control and treatment groups."""
    if len(num_ctrl) != len(den_ctrl) or len(num_trt) != len(den_trt):
        raise ValueError("Numerator and denominator arrays must have identical length dimensions.")

    self.control_ratio_, self.control_var_ = self.compute_ratio_variance(num_ctrl, den_ctrl)
    self.treatment_ratio_, self.treatment_var_ = self.compute_ratio_variance(num_trt, den_trt)

    # Causal treatment shift: R_T - R_C
    self.treatment_effect_ = self.treatment_ratio_ - self.control_ratio_

    # Standard error of difference: sqrt( Var(R_C) + Var(R_T) )
    self.pooled_se_ = float(np.sqrt(self.control_var_ + self.treatment_var_))

    if self.pooled_se_ > 0.0:
        self.z_statistic_ = self.treatment_effect_ / self.pooled_se_
        self.p_value_ = float(2 * (1 - norm.cdf(abs(self.z_statistic_))))
    else:
        self.z_statistic_ = 0.0
        self.p_value_ = 1.0

    return self

MetricRegistry

MetricRegistry()

Manages a registry of metrics and evaluates them using topological sorting of a DAG.

TODO: Implement out-of-core streaming batch evaluations for high-velocity telemetry pipelines. TODO: Add symbolic differentiation support to automatically derive Delta Method variance paths for complex algebraic combinations of parents.

METHOD DESCRIPTION
add_raw

Registers a raw, primitive input metric name.

add_derived

Registers a derived metric calculated from dependency parents.

evaluate

Evaluates all registered metrics topologically, using caching.

Source code in src\xpyrment\analyze\registry.py
def __init__(self) -> None:
    """Initializes the metric registry."""
    self.nodes: Dict[str, Dict[str, Any]] = {}

add_raw

add_raw(name: str) -> MetricRegistry

Registers a raw, primitive input metric name.

Source code in src\xpyrment\analyze\registry.py
def add_raw(self, name: str) -> "MetricRegistry":
    """Registers a raw, primitive input metric name."""
    self.nodes[name] = {
        "type": "raw",
        "dependencies": [],
        "callable": None,
    }
    return self

add_derived

add_derived(
    name: str,
    dependencies: List[str],
    formula: Callable[..., ndarray],
) -> MetricRegistry

Registers a derived metric calculated from dependency parents.

PARAMETER DESCRIPTION
name

Name of the derived metric.

TYPE: str

dependencies

List of dependency metric names.

TYPE: List[str]

formula

A function returning an np.ndarray given dependency arrays.

TYPE: Callable

Source code in src\xpyrment\analyze\registry.py
def add_derived(self, name: str, dependencies: List[str], formula: Callable[..., np.ndarray]) -> "MetricRegistry":
    """Registers a derived metric calculated from dependency parents.

    Args:
        name (str): Name of the derived metric.
        dependencies (List[str]): List of dependency metric names.
        formula (Callable): A function returning an np.ndarray given dependency arrays.
    """
    self.nodes[name] = {
        "type": "derived",
        "dependencies": dependencies,
        "callable": formula,
    }
    return self

evaluate

evaluate(inputs: Dict[str, ndarray]) -> Dict[str, ndarray]

Evaluates all registered metrics topologically, using caching.

PARAMETER DESCRIPTION
inputs

Dictionary of raw primitive arrays.

TYPE: Dict[str, ndarray]

RETURNS DESCRIPTION
Dict[str, ndarray]

Dict[str, np.ndarray]: Complete dictionary containing both raw and derived arrays.

Source code in src\xpyrment\analyze\registry.py
def evaluate(self, inputs: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    """Evaluates all registered metrics topologically, using caching.

    Args:
        inputs (Dict[str, np.ndarray]): Dictionary of raw primitive arrays.

    Returns:
        Dict[str, np.ndarray]: Complete dictionary containing both raw and derived arrays.
    """
    # Sort node evaluation order
    eval_order = self._topological_sort()
    cache: Dict[str, np.ndarray] = {k: np.asarray(v) for k, v in inputs.items()}

    for name in eval_order:
        if name in cache:
            continue

        node = self.nodes.get(name)
        if node is None or node["type"] == "raw":
            # Raw node not provided in inputs
            raise ValueError(f"Raw input data is missing for required metric: {name}")

        # Collect arguments for the formula
        args = []
        for dep in node["dependencies"]:
            if dep not in cache:
                raise ValueError(f"Dependency metric {dep} was not evaluated for {name}.")
            args.append(cache[dep])

        # Compute and cache
        cache[name] = np.asarray(node["callable"](*args))

    return cache

run_analysis

run_analysis(
    experiment: Experiment,
    control: str = "control",
    treatment: str = "treatment",
    alpha: float = 0.05,
    multi_test_correction: Optional[str] = None,
    covariates: Optional[List[str]] = None,
) -> AnalysisResult

Executes the statistical analysis across all registered metrics in an Experiment container.

Iterates over each registered metric in the experiment, calculates means, relative lifts, p-values, confidence intervals, and power. If requested, applies multiple testing corrections across the p-values, updates the experiment state to ANALYZED, and returns a structured AnalysisResult.

PARAMETER DESCRIPTION
experiment

The initialized, pre-registered experiment setup container.

TYPE: Experiment

control

The label of the control variant in the treatment column. Defaults to "control".

TYPE: str DEFAULT: 'control'

treatment

The label of the treatment variant in the treatment column. Defaults to "treatment".

TYPE: str DEFAULT: 'treatment'

alpha

Significance level (Type I error probability) for confidence intervals. Defaults to 0.05.

TYPE: float DEFAULT: 0.05

multi_test_correction

Multiple testing correction algorithm to apply across the registered metrics. Options: "bonferroni", "holm", "fdr_bh", "fdr_by", "hochberg". Defaults to None.

TYPE: str DEFAULT: None

covariates

List of covariates to check balance and adjust.

TYPE: List[str] DEFAULT: None

RETURNS DESCRIPTION
AnalysisResult

A rich, summarized results container.

TYPE: AnalysisResult

RAISES DESCRIPTION
ValueError

If no metrics have been registered, or if control/treatment labels are missing from the active dataset.

PhaseOrderError

If the experiment is in an invalid state for running analysis.

Source code in src\xpyrment\analyze\orchestrator.py
def run_analysis(
    experiment: Experiment,
    control: str = "control",
    treatment: str = "treatment",
    alpha: float = 0.05,
    multi_test_correction: Optional[str] = None,
    covariates: Optional[List[str]] = None,
) -> AnalysisResult:
    """Executes the statistical analysis across all registered metrics in an Experiment container.

    Iterates over each registered metric in the experiment, calculates means, relative lifts, p-values,
    confidence intervals, and power. If requested, applies multiple testing corrections across the p-values,
    updates the experiment state to `ANALYZED`, and returns a structured `AnalysisResult`.

    Args:
        experiment (Experiment): The initialized, pre-registered experiment setup container.
        control (str): The label of the control variant in the treatment column. Defaults to `"control"`.
        treatment (str): The label of the treatment variant in the treatment column. Defaults to `"treatment"`.
        alpha (float): Significance level (Type I error probability) for confidence intervals. Defaults to 0.05.
        multi_test_correction (str, optional): Multiple testing correction algorithm to apply across the
            registered metrics. Options: `"bonferroni"`, `"holm"`, `"fdr_bh"`, `"fdr_by"`, `"hochberg"`. Defaults to None.
        covariates (List[str], optional): List of covariates to check balance and adjust.

    Returns:
        AnalysisResult: A rich, summarized results container.

    Raises:
        ValueError: If no metrics have been registered, or if control/treatment labels are missing from
            the active dataset.
        PhaseOrderError: If the experiment is in an invalid state for running analysis.
    """
    unique_variants = experiment.data[experiment.treatment_col].unique()
    if control not in unique_variants:
        raise ValueError(f"Control label '{control}' not found.")
    if treatment not in unique_variants:
        raise ValueError(f"Treatment label '{treatment}' not found.")

    # 1. Topological sorted evaluation of MetricRegistry DAG if present
    if getattr(experiment, "metric_registry", None) is not None:
        registry = experiment.metric_registry
        raw_inputs = {}
        for node_name, node_info in registry.nodes.items():
            if node_info["type"] == "raw" and node_name in experiment.data.columns:
                raw_inputs[node_name] = experiment.data[node_name].to_numpy()

        evaluated_cache = registry.evaluate(raw_inputs)
        for key, val in evaluated_cache.items():
            if (
                key not in experiment.data.columns
                or registry.nodes.get(key, {}).get("type") == "derived"
            ):
                if len(val) == len(experiment.data):
                    experiment.data[key] = val

        # Auto-populate metrics from DAG if metrics list is currently empty
        if not experiment.metrics:
            from xpyrment.metrics.taxonomy import MeanMetric

            for name in evaluated_cache.keys():
                experiment.metrics.append(MeanMetric(name, value_col=name))

    if not experiment.metrics:
        raise ValueError("No metrics have been added to the experiment.")

    # Resolve global and method-specific covariates
    covs_to_check = (
        covariates if covariates is not None else getattr(experiment, "covariates", [])
    )

    # 2. Automated Covariate-adjusted CUPED routing
    if covs_to_check:
        for metric in experiment.metrics:
            from xpyrment.metrics.taxonomy import MeanMetric

            if isinstance(metric, MeanMetric) and not getattr(
                metric, "pre_period_col", None
            ):
                possible_candidates = [
                    f"pre_{metric.value_col}",
                    f"{metric.value_col}_pre",
                    f"{metric.value_col}_baseline",
                ]
                for cand in possible_candidates:
                    if cand in covs_to_check and cand in experiment.data.columns:
                        metric.pre_period_col = cand
                        break

    # 3. Covariate imbalance checking in a single call
    balance_checker = None
    if covs_to_check:
        valid_covs = [c for c in covs_to_check if c in experiment.data.columns]
        if valid_covs:
            from xpyrment.quasi.balance import CovariateBalanceChecker

            sub_df = experiment.data[
                experiment.data[experiment.treatment_col].isin([control, treatment])
            ].dropna(subset=valid_covs)
            if len(sub_df) > 0:
                import numpy as np

                X = sub_df[valid_covs].to_numpy()
                T = (
                    (sub_df[experiment.treatment_col] == treatment)
                    .astype(int)
                    .to_numpy()
                )
                balance_checker = CovariateBalanceChecker(covariate_names=valid_covs)
                balance_checker.fit(X, T)

    # 4. Statistical Evaluation of each Metric
    results = []
    for metric in experiment.metrics:
        res = metric.calculate(
            experiment.data,
            treatment_col=experiment.treatment_col,
            control=control,
            treatment=treatment,
            alpha=alpha,
        )
        results.append(res)

    # Apply multiple testing corrections if requested
    if multi_test_correction and len(results) > 1:
        p_vals = [res["p_value"] for res in results]
        adjusted_p = apply_multiple_testing_correction(
            p_vals, alpha=alpha, method=multi_test_correction
        )
        for i, val in enumerate(adjusted_p):
            results[i]["p_value"] = val

    experiment.transition_to(ExperimentState.ANALYZED)
    return AnalysisResult(results, alpha=alpha, balance_checker=balance_checker)

setup

setup(
    data: DataFrame,
    treatment_col: str,
    id_col: Optional[str] = None,
    covariates: Optional[List[str]] = None,
) -> Experiment

Initializes the experimental setup container, serving as the library's primary entrypoint.

Sets up the Experiment object with the target dataset, identifying variant and unit columns, and locks the state machine to ExperimentState.DESIGNED.

PARAMETER DESCRIPTION
data

The main experiment dataset containing exposure logs and outcomes.

TYPE: DataFrame

treatment_col

Column name containing variant strings (e.g., "variant").

TYPE: str

id_col

Column name containing unique unit identifiers (e.g., "user_id").

TYPE: str DEFAULT: None

covariates

Optional list of baseline covariates.

TYPE: List[str] DEFAULT: None

RETURNS DESCRIPTION
Experiment

A state-gated Experiment orchestrator instance, ready for metric registration and planning.

TYPE: Experiment

Source code in src\xpyrment\analyze\orchestrator.py
def setup(
    data: pd.DataFrame,
    treatment_col: str,
    id_col: Optional[str] = None,
    covariates: Optional[List[str]] = None,
) -> Experiment:
    """Initializes the experimental setup container, serving as the library's primary entrypoint.

    Sets up the `Experiment` object with the target dataset, identifying variant and unit columns,
    and locks the state machine to `ExperimentState.DESIGNED`.

    Args:
        data (pd.DataFrame): The main experiment dataset containing exposure logs and outcomes.
        treatment_col (str): Column name containing variant strings (e.g., `"variant"`).
        id_col (str, optional): Column name containing unique unit identifiers (e.g., `"user_id"`).
        covariates (List[str], optional): Optional list of baseline covariates.

    Returns:
        Experiment: A state-gated `Experiment` orchestrator instance, ready for metric registration and planning.
    """
    print("==========================================")
    print("      Initializing xpyrment Setup         ")
    print("==========================================")
    print(f"Total rows in dataset:  {len(data)}")
    print(f"Treatment column:      {treatment_col}")
    if id_col:
        print(f"ID column:             {id_col}")

    exp = Experiment(data, treatment_col, id_col, covariates=covariates)
    return exp

apply_cuped

apply_cuped(
    df: DataFrame, target_col: str, pre_col: str
) -> Series

Applies Controlled-experiments Using Pre-Experiment Data (CUPED) on a series.

CUPED (Deng et al., 2013) is the standard variance reduction method in modern online experimentation. It uses pre-experiment covariate data to remove pre-existing user-level variation, leaving a highly concentrated treatment signal.

PARAMETER DESCRIPTION
df

The dataset containing both target and pre-period columns.

TYPE: DataFrame

target_col

Column name representing the post-experiment metric of interest (\(Y\)).

TYPE: str

pre_col

Column name representing the pre-period covariate (\(X\)).

TYPE: str

RETURNS DESCRIPTION
Series

pd.Series: A pandas Series containing the CUPED-adjusted values.

Source code in src\xpyrment\analyze\variance_reduction.py
def apply_cuped(df: pd.DataFrame, target_col: str, pre_col: str) -> pd.Series:
    """Applies Controlled-experiments Using Pre-Experiment Data (CUPED) on a series.

    CUPED (Deng et al., 2013) is the standard variance reduction method in modern online experimentation. It uses
    pre-experiment covariate data to remove pre-existing user-level variation, leaving a highly concentrated treatment signal.

    Args:
        df (pd.DataFrame): The dataset containing both target and pre-period columns.
        target_col (str): Column name representing the post-experiment metric of interest ($Y$).
        pre_col (str): Column name representing the pre-period covariate ($X$).

    Returns:
        pd.Series: A pandas Series containing the CUPED-adjusted values.
    """
    clean_df = df[[target_col, pre_col]].dropna()
    if len(clean_df) < 2:
        return df[target_col]

    cov_yx = clean_df[target_col].cov(clean_df[pre_col])
    var_x = clean_df[pre_col].var()

    if var_x > 0.0:
        theta = cov_yx / var_x
    else:
        theta = 0.0

    mean_x = clean_df[pre_col].mean()

    # Apply CUPED adjustment to all rows, maintaining the original Series shape and index
    adjusted = df[target_col] - theta * (df[pre_col] - mean_x)
    return adjusted

apply_multiple_testing_correction

apply_multiple_testing_correction(
    p_values: List[float],
    alpha: float = 0.05,
    method: str = "fdr_bh",
) -> List[float]

Applies multiple testing corrections on p-values using statsmodels.

TODO: Implement step-down Dunnett's correction procedure for multi-arm comparisons against a common control. TODO: Support family-wise bootstrap-based resampling corrections to account for non-normal dependency structures.

When performing multiple statistical tests simultaneously, the probability of obtaining at least one false positive (rejecting \(H_0\) when it is actually true) increases with the number of tests. This inflation of Type I error is known as the Multiple Testing Problem.

Mathematical Background of FWER Inflation

For \(m\) independent tests, each run at nominal significance level \(\\alpha\): $$ \text{FWER} = P(\text{at least one false positive}) = 1 - (1 - \alpha)^m $$ - If \(m = 1\) and \(\\alpha = 0.05\), \(\\text{FWER} = 0.05\). - If \(m = 10\) and \(\\alpha = 0.05\), \(\\text{FWER} = 1 - (0.95)^{10} \\approx 0.40\) (\(40\\%\) false positive probability). - If \(m = 50\) and \(\\alpha = 0.05\), \(\\text{FWER} \\approx 0.92\) (near-certainty of committing a false positive).

Supported Correction Methodologies
  1. Bonferroni Correction ("bonferroni"): Controls the Family-Wise Error Rate (FWER) in the strong sense. It adjusts each p-value by multiplying it by the total number of tests \(m\): $$ p^{\text{adj}}_i = \min(p_i \times m, \ 1.0) $$ Highly conservative; has low statistical power when \(m\) is large or when tests are highly correlated.
  2. Holm-Bonferroni Procedure ("holm"): A step-down FWER control method that is uniformly more powerful than the standard Bonferroni correction. It orders the raw p-values: \(p_{(1)} \\le p_{(2)} \\le \\dots \\le p_{(m)}\). The adjusted p-values are computed sequentially as: $$ p^{\text{adj}}{(i)} = \max \left( (m - i + 1) \times p{(i)}, \ p^{\text{adj}}_{(i-1)} \right) \quad \text{for } i \ge 1 $$ (with \(p^{\\text{adj}}_{(0)} = 0\), bounded above by \(1.0\)).
  3. Benjamini-Hochberg (BH) Procedure ("fdr_bh"): Controls the False Discovery Rate (FDR), which is the expected proportion of false positives among all rejections. This is the preferred method for digital product experimentation (A/B testing with multiple secondary metrics), as it provides vastly superior statistical power compared to FWER controllers. It orders raw p-values: \(p_{(1)} \\le p_{(2)} \\le \\dots \\le p_{(m)}\). The adjusted p-values are calculated as: $$ p^{\text{adj}}{(i)} = \min \left( \frac{m}{i} \times p{(i)}, \ p^{\text{adj}}_{(i+1)} \right) \quad \text{for } i \le m - 1 $$ (with \(p^{\\text{adj}}_{(m)} = p_{(m)}\), bounded above by \(1.0\)).
  4. Benjamini-Yekutieli (BY) Procedure ("fdr_by"): Controls the False Discovery Rate under arbitrary dependency structures (i.e. positive regression dependency or negative correlation) among test statistics. BY applies an additional harmonic penalty: $$ P_{(i)} \le \frac{i}{m \sum_{j=1}^m \frac{1}{j}} \alpha $$
  5. Hochberg Step-up Procedure ("hochberg"): A step-up FWER controlling procedure that is uniformly more powerful than Holm-Bonferroni, but requires the test statistics to be independent or satisfy Simes' inequality. It starts from the largest p-value down to the smallest.
PARAMETER DESCRIPTION
p_values

List of raw, unadjusted p-values calculated from various metric tests.

TYPE: List[float]

alpha

Nominal significance level (e.g., 0.05). Defaults to 0.05.

TYPE: float DEFAULT: 0.05

method

Correction algorithm. Options include "bonferroni", "holm", "fdr_bh", "fdr_by", "hochberg". Defaults to "fdr_bh".

TYPE: str DEFAULT: 'fdr_bh'

RETURNS DESCRIPTION
List[float]

List[float]: A list of adjusted p-values, in the same index order as the input.

Source code in src\xpyrment\analyze\corrections.py
def apply_multiple_testing_correction(
    p_values: List[float], alpha: float = 0.05, method: str = "fdr_bh"
) -> List[float]:
    r"""Applies multiple testing corrections on p-values using statsmodels.

    TODO: Implement step-down Dunnett's correction procedure for multi-arm comparisons against a common control.
    TODO: Support family-wise bootstrap-based resampling corrections to account for non-normal dependency structures.

    When performing multiple statistical tests simultaneously, the probability of obtaining at least one
    false positive (rejecting $H_0$ when it is actually true) increases with the number of tests.
    This inflation of Type I error is known as the **Multiple Testing Problem**.

    Mathematical Background of FWER Inflation:
        For $m$ independent tests, each run at nominal significance level $\\alpha$:
        $$
        \\text{FWER} = P(\\text{at least one false positive}) = 1 - (1 - \\alpha)^m
        $$
        - If $m = 1$ and $\\alpha = 0.05$, $\\text{FWER} = 0.05$.
        - If $m = 10$ and $\\alpha = 0.05$, $\\text{FWER} = 1 - (0.95)^{10} \\approx 0.40$ ($40\\%$ false positive probability).
        - If $m = 50$ and $\\alpha = 0.05$, $\\text{FWER} \\approx 0.92$ (near-certainty of committing a false positive).

    Supported Correction Methodologies:
        1. **Bonferroni Correction** (`"bonferroni"`):
           Controls the Family-Wise Error Rate (FWER) in the strong sense. It adjusts each p-value by multiplying
           it by the total number of tests $m$:
           $$
           p^{\\text{adj}}_i = \\min(p_i \\times m, \\ 1.0)
           $$
           Highly conservative; has low statistical power when $m$ is large or when tests are highly correlated.
        2. **Holm-Bonferroni Procedure** (`"holm"`):
           A step-down FWER control method that is uniformly more powerful than the standard Bonferroni correction.
           It orders the raw p-values: $p_{(1)} \\le p_{(2)} \\le \\dots \\le p_{(m)}$.
           The adjusted p-values are computed sequentially as:
           $$
           p^{\\text{adj}}_{(i)} = \\max \\left( (m - i + 1) \\times p_{(i)}, \\ p^{\\text{adj}}_{(i-1)} \\right) \\quad \\text{for } i \\ge 1
           $$
           (with $p^{\\text{adj}}_{(0)} = 0$, bounded above by $1.0$).
        3. **Benjamini-Hochberg (BH) Procedure** (`"fdr_bh"`):
           Controls the **False Discovery Rate (FDR)**, which is the expected proportion of false positives among all
           rejections. This is the preferred method for digital product experimentation (A/B testing with multiple secondary metrics),
           as it provides vastly superior statistical power compared to FWER controllers.
           It orders raw p-values: $p_{(1)} \\le p_{(2)} \\le \\dots \\le p_{(m)}$.
           The adjusted p-values are calculated as:
           $$
           p^{\\text{adj}}_{(i)} = \\min \\left( \\frac{m}{i} \\times p_{(i)}, \\ p^{\\text{adj}}_{(i+1)} \\right) \\quad \\text{for } i \\le m - 1
           $$
           (with $p^{\\text{adj}}_{(m)} = p_{(m)}$, bounded above by $1.0$).
        4. **Benjamini-Yekutieli (BY) Procedure** (`"fdr_by"`):
           Controls the False Discovery Rate under arbitrary dependency structures (i.e. positive regression dependency or negative correlation)
           among test statistics. BY applies an additional harmonic penalty:
           $$
           P_{(i)} \\le \\frac{i}{m \\sum_{j=1}^m \\frac{1}{j}} \\alpha
           $$
        5. **Hochberg Step-up Procedure** (`"hochberg"`):
           A step-up FWER controlling procedure that is uniformly more powerful than Holm-Bonferroni, but requires the test statistics
           to be independent or satisfy Simes' inequality. It starts from the largest p-value down to the smallest.

    Args:
        p_values (List[float]): List of raw, unadjusted p-values calculated from various metric tests.
        alpha (float): Nominal significance level (e.g., 0.05). Defaults to 0.05.
        method (str): Correction algorithm. Options include `"bonferroni"`, `"holm"`, `"fdr_bh"`, `"fdr_by"`, `"hochberg"`.
            Defaults to `"fdr_bh"`.

    Returns:
        List[float]: A list of adjusted p-values, in the same index order as the input.
    """
    if not p_values:
        return []

    # Handle NaNs
    import numpy as np
    p_array = np.array(p_values)
    mask = ~np.isnan(p_array)

    if not np.any(mask):
        return p_values

    adjusted_p = p_array.copy()

    # Map friendly names to statsmodels internal keys
    statsmodels_method = method
    if method.lower() == "hochberg":
        statsmodels_method = "simes-hochberg"

    _, adj, _, _ = multipletests(p_array[mask], alpha=alpha, method=statsmodels_method)
    adjusted_p[mask] = adj

    return adjusted_p.tolist()