Skip to content

Ingestion

ingestion

Data ingestion adapters, validation gates, and normalization utilities.

This module provides standard connectors and validation gates for feeding raw client-side or server-side datasets (such as SQL tables or CSVs) into the xpyrment experimental workflow.

CLASS DESCRIPTION
DuckDBIngester

High-performance out-of-core data ingestion and computation adapter using DuckDB.

FUNCTION DESCRIPTION
load_from_sql

Loads experimental telemetry and assignment logs from an external relational SQL database.

ingest_dataframe

Ingests, validates, and copies an in-memory pandas DataFrame into the xpyrment lifecycle.

ingest_chunks

Ingests and yields an iterable of pandas DataFrames (chunks) for out-of-core processing.

ingest_dask_dataframe

Ingests, validates, and sets up a computation graph for a Dask DataFrame.

DuckDBIngester

DuckDBIngester(db_path: str = ':memory:')

High-performance out-of-core data ingestion and computation adapter using DuckDB.

Provides streaming, memory-efficient statistical computations on parquet files/folders, such as covariate balance checks and Welch's t-test statistics, completely avoiding loading entire massive datasets into RAM.

Mathematical Specifications
  1. Standardized Mean Difference (SMD) for continuous/numeric covariates: Let \(\bar{X}_1\) and \(\bar{X}_0\) be the sample means of a covariate \(X\) in the treatment and control groups, and let \(s_1^2\) and \(s_0^2\) be their sample variances. $$ \text{SMD} = \frac{\bar{X}_1 - \bar{X}_0}{\sqrt{\frac{s_1^2 + s_0^2}{2}}} $$
  2. Welch's \(t\)-test for unequal variances: Let \(N_0, N_1\) be the sample sizes, \(\bar{X}_0, \bar{X}_1\) be the sample means, and \(s_0^2, s_1^2\) be the sample variances. $$ t = \frac{\bar{X}1 - \bar{X}_0}{\sqrt{\frac{s_0^2}{N_0} + \frac{s_1^2}{N_1}}} $$ The Welch-Satterthwaite degrees of freedom \(\nu\) is calculated as: $$ \nu = \frac{\left( \frac{s_0^2}{N_0} + \frac{s_1^2}{N_1} \right)^2}{\frac{\left( \frac{s_0^2}{N_0} \right)^2}{N_0 - 1} + \frac{\left( \frac{s_1^2}{N_1} \right)^2}{N_1 - 1}} $$ The two-sided \(p\)-value is: $$ p = 2 \times \left(1 - F{t, \nu}(|t|)\right) $$ where \(F_{t, \nu}\) is the cumulative distribution function (CDF) of the Student's \(t\)-distribution with \(\nu\) degrees of freedom.
  3. Pearson \(\chi^2\) Test of Independence for categorical covariates: $$ \chi^2 = \sum_{i=1}^R \sum_{j=1}^C \frac{(O_{i,j} - E_{i,j})^2}{E_{i,j}} $$ with degrees of freedom: $$ \text{df} = (R - 1)(C - 1) $$ where \(O_{i,j}\) and \(E_{i,j}\) are the observed and expected frequency counts, respectively.
PARAMETER DESCRIPTION
db_path

The file path to a persistent DuckDB database, or ':memory:' for transient sessions.

TYPE: str DEFAULT: ':memory:'

RAISES DESCRIPTION
ImportError

If duckdb or pyarrow are not installed.

METHOD DESCRIPTION
close

Closes the underlying DuckDB connection if open to release locks.

query

Executes a raw SQL query against DuckDB and returns the result as a pandas DataFrame.

compute_covariate_balance

Computes covariate balance statistics out-of-core using DuckDB.

compute_welch_statistics

Computes Welch's t-test statistics for experimental metrics out-of-core using DuckDB.

Source code in src\xpyrment\run\ingestion.py
def __init__(self, db_path: str = ":memory:"):
    """Initializes the DuckDBIngester and opens a connection.

    Args:
        db_path (str): The file path to a persistent DuckDB database, or ':memory:' for transient sessions.

    Raises:
        ImportError: If duckdb or pyarrow are not installed.
    """
    try:
        import duckdb
        import pyarrow
    except ImportError:
        raise ImportError(
            "Both 'duckdb' and 'pyarrow' packages are required for DuckDBIngester. "
            "Please install them via: pip install duckdb pyarrow"
        )

    self.db_path = db_path
    self._conn = duckdb.connect(database=self.db_path)

close

close()

Closes the underlying DuckDB connection if open to release locks.

Source code in src\xpyrment\run\ingestion.py
def close(self):
    """Closes the underlying DuckDB connection if open to release locks."""
    if hasattr(self, "_conn") and self._conn is not None:
        try:
            self._conn.close()
        except Exception:
            pass
        self._conn = None

query

query(sql_query: str, params: list = None) -> DataFrame

Executes a raw SQL query against DuckDB and returns the result as a pandas DataFrame.

PARAMETER DESCRIPTION
sql_query

A standard SQL query.

TYPE: str

params

Parameters to pass into the query safely.

TYPE: list DEFAULT: None

RETURNS DESCRIPTION
DataFrame

pd.DataFrame: The queried records.

Source code in src\xpyrment\run\ingestion.py
def query(self, sql_query: str, params: list = None) -> pd.DataFrame:
    """Executes a raw SQL query against DuckDB and returns the result as a pandas DataFrame.

    Args:
        sql_query (str): A standard SQL query.
        params (list, optional): Parameters to pass into the query safely.

    Returns:
        pd.DataFrame: The queried records.
    """
    if self._conn is None:
        raise RuntimeError("DuckDB connection is closed.")
    if params is not None:
        return self._conn.execute(sql_query, params).df()
    return self._conn.execute(sql_query).df()

compute_covariate_balance

compute_covariate_balance(
    parquet_path: str,
    treatment_col: str,
    covariate_cols: list,
    control_group: str = None,
    treatment_group: str = None,
) -> dict

Computes covariate balance statistics out-of-core using DuckDB.

Performs streaming, out-of-core scans to calculate Standardized Mean Difference (SMD) for continuous/numeric covariates and Pearson Chi-Square tests of independence for categorical covariates.

PARAMETER DESCRIPTION
parquet_path

Absolute or relative path to the Parquet file or directory.

TYPE: str

treatment_col

Column name identifying experimental groups/arms.

TYPE: str

covariate_cols

List of column names representing categorical or continuous pre-experiment covariates.

TYPE: list

control_group

The label/value of the control group. Defaults to None.

TYPE: str DEFAULT: None

treatment_group

The label/value of the treatment group. Defaults to None.

TYPE: str DEFAULT: None

RETURNS DESCRIPTION
dict

A dictionary mapping each covariate to its balance statistics: - For numeric: {"type": "numeric", "smd": float, "p_value": float} - For categorical: {"type": "categorical", "p_value": float}

TYPE: dict

RAISES DESCRIPTION
FileNotFoundError

If the parquet path does not exist.

KeyError

If columns are not found in the Parquet schema.

ValueError

If the dataset is empty, has fewer than 2 distinct treatment arms, or contains invalid/degenerate data.

Source code in src\xpyrment\run\ingestion.py
def compute_covariate_balance(
    self,
    parquet_path: str,
    treatment_col: str,
    covariate_cols: list,
    control_group: str = None,
    treatment_group: str = None,
) -> dict:
    r"""Computes covariate balance statistics out-of-core using DuckDB.

    Performs streaming, out-of-core scans to calculate Standardized Mean Difference (SMD)
    for continuous/numeric covariates and Pearson Chi-Square tests of independence for
    categorical covariates.

    Args:
        parquet_path (str): Absolute or relative path to the Parquet file or directory.
        treatment_col (str): Column name identifying experimental groups/arms.
        covariate_cols (list): List of column names representing categorical or continuous pre-experiment covariates.
        control_group (str, optional): The label/value of the control group. Defaults to None.
        treatment_group (str, optional): The label/value of the treatment group. Defaults to None.

    Returns:
        dict: A dictionary mapping each covariate to its balance statistics:
            - For numeric: {"type": "numeric", "smd": float, "p_value": float}
            - For categorical: {"type": "categorical", "p_value": float}

    Raises:
        FileNotFoundError: If the parquet path does not exist.
        KeyError: If columns are not found in the Parquet schema.
        ValueError: If the dataset is empty, has fewer than 2 distinct treatment arms,
                    or contains invalid/degenerate data.
    """
    import os
    from pathlib import Path
    import numpy as np
    from scipy import stats

    # 1. Verify file existence
    if not os.path.exists(parquet_path):
        raise FileNotFoundError(
            f"Parquet file/directory not found at path: {parquet_path}"
        )

    path_str = (
        str(Path(parquet_path).resolve()).replace("\\", "/")
    )

    # 2. Schema pre-validation & column presence verification
    safe_path_str = _quote_string(path_str)
    try:
        schema_df = self.query(f"DESCRIBE SELECT * FROM read_parquet({safe_path_str})")
    except Exception as e:
        raise ValueError(f"Failed to parse Parquet schema at {parquet_path}: {e}")

    col_types = dict(zip(schema_df["column_name"], schema_df["column_type"]))

    if treatment_col not in col_types:
        raise KeyError(
            f"Treatment column '{treatment_col}' not found in Parquet schema."
        )

    for cov in covariate_cols:
        if cov not in col_types:
            raise KeyError(f"Covariate column '{cov}' not found in Parquet schema.")

    # 3. Check if dataset is empty and get total row count
    count_df = self.query(f"SELECT COUNT(*) as cnt FROM read_parquet({safe_path_str})")
    if count_df.empty or count_df.iloc[0]["cnt"] == 0:
        raise ValueError("Dataset is empty.")

    # 4. Retrieve distinct treatment arms and validate groups
    safe_treatment_col = _quote_identifier(treatment_col)
    groups_df = self.query(
        f"SELECT DISTINCT {safe_treatment_col} FROM read_parquet({safe_path_str}) WHERE {safe_treatment_col} IS NOT NULL"
    )
    groups = sorted(groups_df[treatment_col].tolist())
    if len(groups) < 2:
        raise ValueError(
            f"Balance check requires at least 2 distinct groups in '{treatment_col}'. Found {len(groups)}."
        )

    if control_group is not None and treatment_group is not None:
        if control_group not in groups or treatment_group not in groups:
            raise ValueError(
                f"Specified control_group '{control_group}' or treatment_group '{treatment_group}' "
                f"not found in distinct groups: {groups}."
            )
        comp_groups = [control_group, treatment_group]
    elif len(groups) == 2:
        comp_groups = groups
    else:
        raise ValueError(
            f"Multiple treatment arms detected: {groups}. "
            f"You must specify both 'control_group' and 'treatment_group' parameters."
        )

    # Helper to convert python value to SQL literal
    def to_sql_val(val):
        if isinstance(val, str):
            return _quote_string(val)
        return str(val)

    # 5. Partition covariates into numeric vs categorical
    def is_duckdb_numeric(col_name: str) -> bool:
        t = col_types.get(col_name)
        if not t:
            return False
        t_upper = str(t).upper()
        for kw in ["INT", "FLOAT", "DOUBLE", "DECIMAL", "NUMERIC", "REAL"]:
            if kw in t_upper:
                return True
        return False

    numeric_covs = []
    categorical_covs = []
    for cov in covariate_cols:
        if is_duckdb_numeric(cov):
            numeric_covs.append(cov)
        else:
            categorical_covs.append(cov)

    results = {}

    # 6. Compute statistics for continuous/numeric covariates
    if numeric_covs:
        # Build and run optimized group aggregation SQL query for all numeric covariates in a single scan
        select_parts = []
        for cov in numeric_covs:
            safe_cov = _quote_identifier(cov)
            # We also need to quote the aliases so we can reliably fetch them
            safe_count_alias = _quote_identifier(f"count_{cov}")
            safe_mean_alias = _quote_identifier(f"mean_{cov}")
            safe_var_alias = _quote_identifier(f"var_{cov}")
            select_parts.append(f"COUNT({safe_cov}) as {safe_count_alias}")
            select_parts.append(f"AVG({safe_cov}) as {safe_mean_alias}")
            select_parts.append(f"VAR_SAMP({safe_cov}) as {safe_var_alias}")

        safe_treatment_col = _quote_identifier(treatment_col)

        sql = f"""
            SELECT
                {safe_treatment_col},
                {', '.join(select_parts)}
            FROM read_parquet({safe_path_str})
            WHERE {safe_treatment_col} IN ({to_sql_val(comp_groups[0])}, {to_sql_val(comp_groups[1])})
            GROUP BY {safe_treatment_col}
        """
        group_stats_df = self.query(sql)

        row_0 = (
            group_stats_df[group_stats_df[treatment_col] == comp_groups[0]].iloc[0]
            if comp_groups[0] in group_stats_df[treatment_col].values
            else None
        )
        row_1 = (
            group_stats_df[group_stats_df[treatment_col] == comp_groups[1]].iloc[0]
            if comp_groups[1] in group_stats_df[treatment_col].values
            else None
        )

        for cov in numeric_covs:
            n_0 = (
                int(row_0[f"count_{cov}"])
                if (row_0 is not None and not pd.isna(row_0[f"count_{cov}"]))
                else 0
            )
            mean_0 = (
                float(row_0[f"mean_{cov}"])
                if (row_0 is not None and not pd.isna(row_0[f"mean_{cov}"]))
                else 0.0
            )
            var_0 = (
                float(row_0[f"var_{cov}"])
                if (row_0 is not None and not pd.isna(row_0[f"var_{cov}"]))
                else 0.0
            )

            n_1 = (
                int(row_1[f"count_{cov}"])
                if (row_1 is not None and not pd.isna(row_1[f"count_{cov}"]))
                else 0
            )
            mean_1 = (
                float(row_1[f"mean_{cov}"])
                if (row_1 is not None and not pd.isna(row_1[f"mean_{cov}"]))
                else 0.0
            )
            var_1 = (
                float(row_1[f"var_{cov}"])
                if (row_1 is not None and not pd.isna(row_1[f"var_{cov}"]))
                else 0.0
            )

            # Guard against degenerate / small sample size edge cases
            if n_0 < 2 or n_1 < 2:
                raise ValueError(
                    f"Sample size too small for covariate '{cov}': control_N={n_0}, treatment_N={n_1}. Must be >= 2."
                )

            if var_0 < 0.0 or var_1 < 0.0:
                raise ValueError(
                    f"Negative variance detected for covariate '{cov}'."
                )

            if var_0 == 0.0 and var_1 == 0.0:
                raise ValueError(
                    f"Degenerate variance (zero variance) in treatment arms for covariate '{cov}'."
                )

            pooled_sd = np.sqrt((var_0 + var_1) / 2.0)
            if pooled_sd == 0.0:
                smd = 0.0
            else:
                smd = (mean_1 - mean_0) / pooled_sd

            # Welch's t-test
            se_diff = np.sqrt(var_0 / n_0 + var_1 / n_1)
            diff = mean_1 - mean_0
            if se_diff > 0.0:
                t_stat = diff / se_diff
                num = (var_0 / n_0 + var_1 / n_1) ** 2
                den = ((var_0 / n_0) ** 2) / (n_0 - 1) + ((var_1 / n_1) ** 2) / (
                    n_1 - 1
                )
                df_val = num / den if den > 0 else (n_0 + n_1 - 2)
                p_val = 2 * (1.0 - stats.t.cdf(np.abs(t_stat), df=df_val))
            else:
                p_val = 1.0

            results[cov] = {
                "type": "numeric",
                "smd": float(smd),
                "p_value": float(p_val),
            }

    # 7. Compute statistics for categorical covariates
    for cov in categorical_covs:
        safe_cov = _quote_identifier(cov)
        safe_treatment_col = _quote_identifier(treatment_col)
        sql = f"""
            SELECT
                {safe_cov},
                {safe_treatment_col},
                COUNT(*) as cnt
            FROM read_parquet({safe_path_str})
            WHERE {safe_treatment_col} IN ({to_sql_val(comp_groups[0])}, {to_sql_val(comp_groups[1])}) AND {safe_cov} IS NOT NULL
            GROUP BY {safe_cov}, {safe_treatment_col}
        """
        cat_df = self.query(sql)

        if not cat_df.empty:
            contingency = cat_df.pivot(
                index=cov, columns=treatment_col, values="cnt"
            ).fillna(0)
            for g in [comp_groups[0], comp_groups[1]]:
                if g not in contingency.columns:
                    contingency[g] = 0.0

            # CRITICAL: chi2_contingency requires df > 0 -> shape must be >= (2, 2)
            if (
                contingency.shape[0] >= 2
                and contingency.shape[1] >= 2
                and contingency.values.sum() > 0
            ):
                chi2_res = stats.chi2_contingency(contingency.values)
                p_val = chi2_res.pvalue
            else:
                p_val = 1.0  # Completely balanced (e.g., only 1 category exists across all arms)
        else:
            p_val = 1.0

        results[cov] = {"type": "categorical", "p_value": float(p_val)}

    return results

compute_welch_statistics

compute_welch_statistics(
    parquet_path: str,
    treatment_col: str,
    metric_cols: list,
    control_group: str = None,
    treatment_group: str = None,
    alpha: float = 0.05,
) -> dict

Computes Welch's t-test statistics for experimental metrics out-of-core using DuckDB.

Calculates means, sample variances, sample sizes, standard errors, Welch's t-statistic, degrees of freedom, p-value, confidence intervals, and statistical significance.

PARAMETER DESCRIPTION
parquet_path

Absolute or relative path to the Parquet file or directory.

TYPE: str

treatment_col

Column name identifying experimental groups/arms.

TYPE: str

metric_cols

List of continuous metric column names.

TYPE: list

control_group

The label/value of the control group. Defaults to None.

TYPE: str DEFAULT: None

treatment_group

The label/value of the treatment group. Defaults to None.

TYPE: str DEFAULT: None

alpha

Significance level for confidence interval calculation. Defaults to 0.05.

TYPE: float DEFAULT: 0.05

RETURNS DESCRIPTION
dict

A dictionary mapping each metric name to a sub-dictionary of Welch's statistics.

TYPE: dict

RAISES DESCRIPTION
FileNotFoundError

If the parquet path does not exist.

KeyError

If columns are not found in the Parquet schema.

ValueError

If the dataset is empty, has fewer than 2 distinct treatment arms, or contains zero-variance/degenerate data in treatment arms.

Source code in src\xpyrment\run\ingestion.py
def compute_welch_statistics(
    self,
    parquet_path: str,
    treatment_col: str,
    metric_cols: list,
    control_group: str = None,
    treatment_group: str = None,
    alpha: float = 0.05,
) -> dict:
    r"""Computes Welch's t-test statistics for experimental metrics out-of-core using DuckDB.

    Calculates means, sample variances, sample sizes, standard errors, Welch's t-statistic,
    degrees of freedom, p-value, confidence intervals, and statistical significance.

    Args:
        parquet_path (str): Absolute or relative path to the Parquet file or directory.
        treatment_col (str): Column name identifying experimental groups/arms.
        metric_cols (list): List of continuous metric column names.
        control_group (str, optional): The label/value of the control group. Defaults to None.
        treatment_group (str, optional): The label/value of the treatment group. Defaults to None.
        alpha (float): Significance level for confidence interval calculation. Defaults to 0.05.

    Returns:
        dict: A dictionary mapping each metric name to a sub-dictionary of Welch's statistics.

    Raises:
        FileNotFoundError: If the parquet path does not exist.
        KeyError: If columns are not found in the Parquet schema.
        ValueError: If the dataset is empty, has fewer than 2 distinct treatment arms,
                    or contains zero-variance/degenerate data in treatment arms.
    """
    import os
    from pathlib import Path
    import numpy as np
    from scipy import stats

    # 1. Verify file existence
    if not os.path.exists(parquet_path):
        raise FileNotFoundError(
            f"Parquet file/directory not found at path: {parquet_path}"
        )

    path_str = (
        str(Path(parquet_path).resolve()).replace("\\", "/")
    )

    # 2. Schema pre-validation & column presence verification
    safe_path_str = _quote_string(path_str)
    try:
        schema_df = self.query(f"DESCRIBE SELECT * FROM read_parquet({safe_path_str})")
    except Exception as e:
        raise ValueError(f"Failed to parse Parquet schema at {parquet_path}: {e}")

    col_types = dict(zip(schema_df["column_name"], schema_df["column_type"]))

    if treatment_col not in col_types:
        raise KeyError(
            f"Treatment column '{treatment_col}' not found in Parquet schema."
        )

    for m in metric_cols:
        if m not in col_types:
            raise KeyError(f"Metric column '{m}' not found in Parquet schema.")

    # 3. Check if dataset is empty
    count_df = self.query(f"SELECT COUNT(*) as cnt FROM read_parquet({safe_path_str})")
    if count_df.empty or count_df.iloc[0]["cnt"] == 0:
        raise ValueError("Dataset is empty.")

    # 4. Retrieve distinct treatment arms and validate groups
    safe_treatment_col = _quote_identifier(treatment_col)
    groups_df = self.query(
        f"SELECT DISTINCT {safe_treatment_col} FROM read_parquet({safe_path_str}) WHERE {safe_treatment_col} IS NOT NULL"
    )
    groups = sorted(groups_df[treatment_col].tolist())
    if len(groups) < 2:
        raise ValueError(
            f"Welch statistics calculation requires at least 2 distinct groups in '{treatment_col}'. Found {len(groups)}."
        )

    if control_group is not None and treatment_group is not None:
        if control_group not in groups or treatment_group not in groups:
            raise ValueError(
                f"Specified control_group '{control_group}' or treatment_group '{treatment_group}' "
                f"not found in distinct groups: {groups}."
            )
        comp_groups = [control_group, treatment_group]
    elif len(groups) == 2:
        comp_groups = groups
    else:
        raise ValueError(
            f"Multiple treatment arms detected: {groups}. "
            f"You must specify both 'control_group' and 'treatment_group' parameters."
        )

    # Helper to convert python value to SQL literal
    def to_sql_val(val):
        if isinstance(val, str):
            return _quote_string(val)
        return str(val)

    # 5. Construct SQL query to aggregate all metrics at once
    select_parts = []
    for m in metric_cols:
        safe_m = _quote_identifier(m)
        safe_count_alias = _quote_identifier(f"count_{m}")
        safe_mean_alias = _quote_identifier(f"mean_{m}")
        safe_var_alias = _quote_identifier(f"var_{m}")

        select_parts.append(f"COUNT({safe_m}) as {safe_count_alias}")
        select_parts.append(f"AVG({safe_m}) as {safe_mean_alias}")
        select_parts.append(f"VAR_SAMP({safe_m}) as {safe_var_alias}")

    safe_treatment_col = _quote_identifier(treatment_col)

    sql = f"""
        SELECT
            {safe_treatment_col},
            {', '.join(select_parts)}
        FROM read_parquet({safe_path_str})
        WHERE {safe_treatment_col} IN ({to_sql_val(comp_groups[0])}, {to_sql_val(comp_groups[1])})
        GROUP BY {safe_treatment_col}
    """
    stats_df = self.query(sql)

    row_0 = (
        stats_df[stats_df[treatment_col] == comp_groups[0]].iloc[0]
        if comp_groups[0] in stats_df[treatment_col].values
        else None
    )
    row_1 = (
        stats_df[stats_df[treatment_col] == comp_groups[1]].iloc[0]
        if comp_groups[1] in stats_df[treatment_col].values
        else None
    )

    results = {}
    for m in metric_cols:
        n_0 = (
            int(row_0[f"count_{m}"])
            if (row_0 is not None and not pd.isna(row_0[f"count_{m}"]))
            else 0
        )
        mean_0 = (
            float(row_0[f"mean_{m}"])
            if (row_0 is not None and not pd.isna(row_0[f"mean_{m}"]))
            else 0.0
        )
        var_0 = (
            float(row_0[f"var_{m}"])
            if (row_0 is not None and not pd.isna(row_0[f"var_{m}"]))
            else 0.0
        )

        n_1 = (
            int(row_1[f"count_{m}"])
            if (row_1 is not None and not pd.isna(row_1[f"count_{m}"]))
            else 0
        )
        mean_1 = (
            float(row_1[f"mean_{m}"])
            if (row_1 is not None and not pd.isna(row_1[f"mean_{m}"]))
            else 0.0
        )
        var_1 = (
            float(row_1[f"var_{m}"])
            if (row_1 is not None and not pd.isna(row_1[f"var_{m}"]))
            else 0.0
        )

        # Guard against degenerate / small sample sizes
        if n_0 < 2 or n_1 < 2:
            raise ValueError(
                f"Sample size too small for metric '{m}': control_N={n_0}, treatment_N={n_1}. Must be >= 2."
            )

        if var_0 < 0.0 or var_1 < 0.0:
            raise ValueError(f"Negative variance detected for metric '{m}'.")

        if var_0 == 0.0 and var_1 == 0.0:
            raise ValueError(
                f"Degenerate variance (zero variance) in treatment arms for metric '{m}'."
            )

        diff = mean_1 - mean_0
        se_diff = np.sqrt(var_0 / n_0 + var_1 / n_1)

        if se_diff > 0.0:
            t_stat = diff / se_diff
            num = (var_0 / n_0 + var_1 / n_1) ** 2
            den = ((var_0 / n_0) ** 2) / (n_0 - 1) + ((var_1 / n_1) ** 2) / (
                n_1 - 1
            )
            df_val = num / den if den > 0 else (n_0 + n_1 - 2)
            p_val = 2 * (1.0 - stats.t.cdf(np.abs(t_stat), df=df_val))

            # Confidence interval calculation
            ci_half = stats.t.ppf(1.0 - alpha / 2.0, df=df_val) * se_diff
            ci_lower = diff - ci_half
            ci_upper = diff + ci_half
            significant = bool(p_val < alpha)
        else:
            t_stat = 0.0
            p_val = 1.0
            df_val = float(n_0 + n_1 - 2)
            ci_lower = diff
            ci_upper = diff
            significant = False

        results[m] = {
            "control_mean": float(mean_0),
            "treatment_mean": float(mean_1),
            "control_var": float(var_0),
            "treatment_var": float(var_1),
            "control_n": int(n_0),
            "treatment_n": int(n_1),
            "t_statistic": float(t_stat),
            "p_value": float(p_val),
            "df": float(df_val),
            "difference": float(diff),
            "ci_lower": float(ci_lower),
            "ci_upper": float(ci_upper),
            "significant": significant,
        }

    return results

load_from_sql

load_from_sql(
    query: str, connection_string: str
) -> DataFrame

Loads experimental telemetry and assignment logs from an external relational SQL database.

Fetches exposure matrices, pre-period metrics, and covariate vectors using high-performance database adapters.

PARAMETER DESCRIPTION
query

The SQL retrieval query (e.g., "SELECT user_id, variant, revenue FROM experimental_ledger").

TYPE: str

connection_string

The database connection URI.

TYPE: str

RETURNS DESCRIPTION
DataFrame

pd.DataFrame: A cleaned pandas DataFrame containing the queried records.

Source code in src\xpyrment\run\ingestion.py
def load_from_sql(query: str, connection_string: str) -> pd.DataFrame:
    r"""Loads experimental telemetry and assignment logs from an external relational SQL database.

    Fetches exposure matrices, pre-period metrics, and covariate vectors using high-performance
    database adapters.

    Args:
        query (str): The SQL retrieval query (e.g., `"SELECT user_id, variant, revenue FROM experimental_ledger"`).
        connection_string (str): The database connection URI.

    Returns:
        pd.DataFrame: A cleaned pandas DataFrame containing the queried records.
    """
    import sqlite3

    if (
        ":memory:" in connection_string
        or "sqlite" in connection_string
        or connection_string == ""
    ):
        db_path = (
            ":memory:"
            if (connection_string == "" or ":memory:" in connection_string)
            else connection_string.replace("sqlite:///", "")
        )
        conn = sqlite3.connect(db_path)
        try:
            df = pd.read_sql_query(query, conn)
            conn.close()
            return df
        except Exception as e:
            conn.close()
            raise e
    else:
        raise ValueError(
            "Non-SQLite databases are not supported because sqlalchemy has been removed from the project."
        )

ingest_dataframe

ingest_dataframe(
    df: DataFrame,
    unit_id_col: Optional[str] = None,
    time_col: Optional[str] = None,
    metric_cols: Optional[List[str]] = None,
    categorical_cols: Optional[List[str]] = None,
    schema=None,
) -> DataFrame

Ingests, validates, and copies an in-memory pandas DataFrame into the xpyrment lifecycle.

Performs localized validation checks on the pandas DataFrame, ensuring all required column signatures are mapped correctly.

PARAMETER DESCRIPTION
df

The raw source DataFrame.

TYPE: DataFrame

unit_id_col

Column representing unit identifiers (nulls will be dropped).

TYPE: str DEFAULT: None

time_col

Column representing event timestamps (will be parsed to datetime).

TYPE: str DEFAULT: None

metric_cols

Continuous metric columns (nulls will be imputed to 0.0).

TYPE: list DEFAULT: None

categorical_cols

Categorical covariate columns (nulls will be imputed to "UNKNOWN").

TYPE: list DEFAULT: None

schema

A user-provided Pandera schema to validate against. If None, a schema is built dynamically based on the provided columns.

TYPE: DataFrameSchema DEFAULT: None

RETURNS DESCRIPTION
DataFrame

pd.DataFrame: An audited, isolated copy of the DataFrame ready for downstream operations.

Source code in src\xpyrment\run\ingestion.py
def ingest_dataframe(
    df: pd.DataFrame,
    unit_id_col: Optional[str] = None,
    time_col: Optional[str] = None,
    metric_cols: Optional[List[str]] = None,
    categorical_cols: Optional[List[str]] = None,
    schema=None,
) -> pd.DataFrame:
    """Ingests, validates, and copies an in-memory pandas DataFrame into the xpyrment lifecycle.

    Performs localized validation checks on the pandas DataFrame, ensuring all required column signatures
    are mapped correctly.

    Args:
        df (pd.DataFrame): The raw source DataFrame.
        unit_id_col (str): Column representing unit identifiers (nulls will be dropped).
        time_col (str): Column representing event timestamps (will be parsed to datetime).
        metric_cols (list): Continuous metric columns (nulls will be imputed to 0.0).
        categorical_cols (list): Categorical covariate columns (nulls will be imputed to "UNKNOWN").
        schema (pandera.DataFrameSchema, optional): A user-provided Pandera schema to validate against.
            If None, a schema is built dynamically based on the provided columns.

    Returns:
        pd.DataFrame: An audited, isolated copy of the DataFrame ready for downstream operations.
    """
    return _clean_dataframe_like(
        df=df,
        unit_id_col=unit_id_col,
        time_col=time_col,
        metric_cols=metric_cols,
        categorical_cols=categorical_cols,
        to_datetime=pd.to_datetime,
        frame_label="DataFrame",
        copy_frame=True,
        schema=schema,
    )

ingest_chunks

ingest_chunks(
    chunks: Iterable[DataFrame],
    unit_id_col: str = None,
    time_col: str = None,
    metric_cols: list = None,
    categorical_cols: list = None,
    schema=None,
) -> Iterator[DataFrame]

Ingests and yields an iterable of pandas DataFrames (chunks) for out-of-core processing.

Applies the same localized validation and imputation checks as ingest_dataframe to each chunk. This is highly memory efficient for massive datasets when used with e.g. pd.read_csv(..., chunksize=N).

PARAMETER DESCRIPTION
chunks

An iterable or generator of raw pandas DataFrames.

TYPE: Iterable[DataFrame]

unit_id_col

Column representing unit identifiers (nulls will be dropped).

TYPE: str DEFAULT: None

time_col

Column representing event timestamps (will be parsed to datetime).

TYPE: str DEFAULT: None

metric_cols

Continuous metric columns (nulls will be imputed to 0.0).

TYPE: list DEFAULT: None

categorical_cols

Categorical covariate columns (nulls will be imputed to "UNKNOWN").

TYPE: list DEFAULT: None

schema

A user-provided Pandera schema to validate against.

TYPE: DataFrameSchema DEFAULT: None

YIELDS DESCRIPTION
DataFrame

pd.DataFrame: An audited, isolated chunk of the dataset ready for downstream operations.

Source code in src\xpyrment\run\ingestion.py
def ingest_chunks(
    chunks: Iterable[pd.DataFrame],
    unit_id_col: str = None,
    time_col: str = None,
    metric_cols: list = None,
    categorical_cols: list = None,
    schema=None,
) -> Iterator[pd.DataFrame]:
    """Ingests and yields an iterable of pandas DataFrames (chunks) for out-of-core processing.

    Applies the same localized validation and imputation checks as `ingest_dataframe` to each chunk.
    This is highly memory efficient for massive datasets when used with e.g. `pd.read_csv(..., chunksize=N)`.

    Args:
        chunks (Iterable[pd.DataFrame]): An iterable or generator of raw pandas DataFrames.
        unit_id_col (str): Column representing unit identifiers (nulls will be dropped).
        time_col (str): Column representing event timestamps (will be parsed to datetime).
        metric_cols (list): Continuous metric columns (nulls will be imputed to 0.0).
        categorical_cols (list): Categorical covariate columns (nulls will be imputed to "UNKNOWN").
        schema (pandera.DataFrameSchema, optional): A user-provided Pandera schema to validate against.

    Yields:
        pd.DataFrame: An audited, isolated chunk of the dataset ready for downstream operations.
    """
    for chunk in chunks:
        yield ingest_dataframe(
            df=chunk,
            unit_id_col=unit_id_col,
            time_col=time_col,
            metric_cols=metric_cols,
            categorical_cols=categorical_cols,
            schema=schema,
        )

ingest_dask_dataframe

ingest_dask_dataframe(
    ddf: Any,
    unit_id_col: Optional[str] = None,
    time_col: Optional[str] = None,
    metric_cols: Optional[List[str]] = None,
    categorical_cols: Optional[List[str]] = None,
    schema=None,
) -> Any

Ingests, validates, and sets up a computation graph for a Dask DataFrame.

Performs localized validation checks on the Dask DataFrame, similar to ingest_dataframe, using lazy Dask operations without triggering computation.

PARAMETER DESCRIPTION
ddf

The raw source Dask DataFrame.

TYPE: DataFrame

unit_id_col

Column representing unit identifiers (nulls will be dropped).

TYPE: str DEFAULT: None

time_col

Column representing event timestamps (will be parsed to datetime).

TYPE: str DEFAULT: None

metric_cols

Continuous metric columns (nulls will be imputed to 0.0).

TYPE: list DEFAULT: None

categorical_cols

Categorical covariate columns (nulls will be imputed to "UNKNOWN").

TYPE: list DEFAULT: None

schema

A user-provided Pandera schema to validate against.

TYPE: DataFrameSchema DEFAULT: None

RETURNS DESCRIPTION
Any

dask.dataframe.DataFrame: A lazy Dask DataFrame with data cleaning operations appended to its graph.

RAISES DESCRIPTION
ImportError

If the 'dask' library is not installed.

TypeError

If the input is not a dask.dataframe.DataFrame.

Source code in src\xpyrment\run\ingestion.py
def ingest_dask_dataframe(
    ddf: Any,
    unit_id_col: Optional[str] = None,
    time_col: Optional[str] = None,
    metric_cols: Optional[List[str]] = None,
    categorical_cols: Optional[List[str]] = None,
    schema=None,
) -> Any:
    """Ingests, validates, and sets up a computation graph for a Dask DataFrame.

    Performs localized validation checks on the Dask DataFrame, similar to `ingest_dataframe`,
    using lazy Dask operations without triggering computation.

    Args:
        ddf (dask.dataframe.DataFrame): The raw source Dask DataFrame.
        unit_id_col (str): Column representing unit identifiers (nulls will be dropped).
        time_col (str): Column representing event timestamps (will be parsed to datetime).
        metric_cols (list): Continuous metric columns (nulls will be imputed to 0.0).
        categorical_cols (list): Categorical covariate columns (nulls will be imputed to "UNKNOWN").
        schema (pandera.DataFrameSchema, optional): A user-provided Pandera schema to validate against.

    Returns:
        dask.dataframe.DataFrame: A lazy Dask DataFrame with data cleaning operations appended to its graph.

    Raises:
        ImportError: If the 'dask' library is not installed.
        TypeError: If the input is not a dask.dataframe.DataFrame.
    """
    try:
        import dask.dataframe as dd
    except ImportError:
        raise ImportError(
            "The 'dask' library is required to use 'ingest_dask_dataframe'. "
            "Please install it via: pip install dask"
        )

    if not isinstance(ddf, dd.DataFrame):
        raise TypeError("ingest_dask_dataframe expects a dask.dataframe.DataFrame.")

    return _clean_dataframe_like(
        df=ddf,
        unit_id_col=unit_id_col,
        time_col=time_col,
        metric_cols=metric_cols,
        categorical_cols=categorical_cols,
        to_datetime=dd.to_datetime,
        frame_label="Dask DataFrame",
        copy_frame=False,
        schema=schema,
    )