Monitor
monitor
Live telemetry monitoring, cumulative traffic accumulation, and telemetry audit feeds.
This module provides the LiveMonitor class, which aggregates exposure logs into time-series
bins. This enables real-time diagnostic auditing to detect mid-experiment routing anomalies
and telemetry dropouts.
| CLASS | DESCRIPTION |
|---|---|
WebhookAlertDispatcher |
Manages pluggable webhook registrations and broadcasts alerts to external systems. |
LiveMonitor |
Provides active monitoring of experimental groups to build diagnostics dashboards. |
WebhookAlertDispatcher
Manages pluggable webhook registrations and broadcasts alerts to external systems.
| METHOD | DESCRIPTION |
|---|---|
register_slack |
Registers a Slack webhook URL. |
register_email |
Registers an Email webhook URL (JSON POST to an email notification service). |
register_custom |
Registers a custom generic HTTP POST webhook URL. |
register_callback |
Registers a custom callback function for local testing or custom notification logic. |
dispatch |
Dispatches an alert to all registered handlers and callbacks. |
Source code in src\xpyrment\run\monitor.py
register_slack
Registers a Slack webhook URL.
Source code in src\xpyrment\run\monitor.py
register_email
Registers an Email webhook URL (JSON POST to an email notification service).
Source code in src\xpyrment\run\monitor.py
register_custom
Registers a custom generic HTTP POST webhook URL.
Source code in src\xpyrment\run\monitor.py
register_callback
Registers a custom callback function for local testing or custom notification logic.
dispatch
Dispatches an alert to all registered handlers and callbacks.
Guarantees failure isolation: an exception in one handler does not halt execution of others.
Source code in src\xpyrment\run\monitor.py
LiveMonitor
LiveMonitor(
df: DataFrame,
time_col: str,
dispatcher: Optional[WebhookAlertDispatcher] = None,
)
Provides active monitoring of experimental groups to build diagnostics dashboards.
Accumulates exposure logs chronologically to generate time-series metrics. By evaluating traffic trends in real time, experimenters can verify that the randomization splits remain stable and that no asymmetric telemetry dropouts or scheduling anomalies occur.
Temporal Binning and Accumulation Theory
Let there be \(k\) variants. Let the experimental logs be grouped into sequential, non-overlapping temporal intervals (bins) \(t \in \{1, 2, \dots, H\}\) (such as hours or days). - Let \(n_v(t)\) be the number of unique units newly exposed to variant \(v\) during time bin \(t\). - The cumulative traffic \(C_v(t)\) for variant \(v\) up to time bin \(t\) is calculated as: $$ C_v(t) = \sum_{\tau=1}^{t} n_v(\tau) $$ The ratio of cumulative traffic across variants should remain statistically stable and proportional to the designed allocation ratios. A sudden shift or step-function deviation in: $$ R(t) = \frac{C_{\text{treatment}}(t)}{C_{\text{control}}(t)} $$ indicates a critical operational failure (e.g., treatment servers crashing, CDN configuration issues, or regional tracking bugs).
Pseudocode for Binning and Accumulation
function get_cumulative_traffic(df, time_col, variant_col, bin_frequency):
1. Truncate timestamps in time_col to the specified bin_frequency (e.g., 'H' for Hour, 'D' for Day).
2. Group by binned time and variant_col, calculating count of unique units.
3. Pivot the grouped DataFrame to have binned time as index and variant names as columns.
4. Fill missing values with 0.
5. Compute cumulative sums along the rows (axis=0) for each column.
6. Return the resulting cumulative DataFrame.
| ATTRIBUTE | DESCRIPTION |
|---|---|
df |
Raw log DataFrame containing exposure details.
TYPE:
|
time_col |
Name of the column containing assignment timestamps.
TYPE:
|
dispatcher |
Alert dispatcher.
TYPE:
|
shutoff_triggered |
Status flag indicating whether an active SRM or critical alert has suspended assignment.
TYPE:
|
| PARAMETER | DESCRIPTION |
|---|---|
df
|
The experimental dataset.
TYPE:
|
time_col
|
The column representing assignment timestamps.
TYPE:
|
dispatcher
|
Alert dispatcher. Defaults to None.
TYPE:
|
| METHOD | DESCRIPTION |
|---|---|
get_cumulative_traffic |
Calculates cumulative traffic counts over time for each variant. |
get_binned_traffic |
Calculates binned (non-cumulative) traffic counts over time for each variant. |
check_traffic_anomaly |
Performs a Z-score anomaly test on non-cumulative temporal traffic volumes. |
check_live_srm |
Calculates cumulative chi-square p-value and SRM flagging on the latest cumulative traffic. |
check_sequential_srm |
Runs Wald's SPRT on the assignment series. |
run_telemetry_checks |
Runs traffic anomaly, cumulative SRM, and sequential SRM checks, dispatching alerts if needed. |
Source code in src\xpyrment\run\monitor.py
get_cumulative_traffic
Calculates cumulative traffic counts over time for each variant.
Processes and groups timestamps, returning a cumulative summation matrix suitable for charting and structural allocation audits.
| PARAMETER | DESCRIPTION |
|---|---|
variant_col
|
Column representing treatment assignment groups. Defaults to "variant".
TYPE:
|
freq
|
Binning frequency (e.g., "h" for Hour, "D" for Day). Defaults to "D".
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
DataFrame
|
pd.DataFrame: A pandas DataFrame indexed by time bins, with columns representing variants and cells containing cumulative exposure counts. |
Source code in src\xpyrment\run\monitor.py
get_binned_traffic
Calculates binned (non-cumulative) traffic counts over time for each variant.
Processes and groups timestamps, returning a non-cumulative summation matrix indexed by time bins with variants as columns.
| PARAMETER | DESCRIPTION |
|---|---|
variant_col
|
Column representing treatment assignment groups. Defaults to "variant".
TYPE:
|
freq
|
Binning frequency (e.g., "h" for Hour, "D" for Day). Defaults to "D".
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
DataFrame
|
pd.DataFrame: A pandas DataFrame indexed by time bins, with columns representing variants and cells containing binned exposure counts. |
Source code in src\xpyrment\run\monitor.py
check_traffic_anomaly
check_traffic_anomaly(
variant_col: str = "variant",
freq: str = "D",
window: int = 7,
z_threshold: float = 3.0,
drop_threshold: float = 0.5,
) -> Dict[str, Any]
Performs a Z-score anomaly test on non-cumulative temporal traffic volumes.
Calculates the unique units per time bin, and checks if the latest bin represents a significant traffic drop compared to the simple moving average and standard deviation of the preceding window.
Z-Score Traffic Drop Check Theory
Let \(x_t\) be the total unique units exposed across all variants in time bin \(t\). For a history window of size \(W\), the baseline mean \(\mu_T\) and sample standard deviation \(\sigma_T\) of the preceding window are computed as: $$ \mu_T = \frac{1}{K} \sum_{\tau=1}^{K} x_{T-\tau} $$ $$ \sigma_T = \sqrt{\frac{1}{K-1} \sum_{\tau=1}^{K} (x_{T-\tau} - \mu_T)^2} $$ where \(K = \min(T-1, W)\). The Z-score for the latest bin \(x_T\) is defined as: $$ Z = \frac{x_T - \mu_T}{\sigma_T} $$ If \(\sigma_T > 0\) and \(Z < -z_{\text{threshold}}\), a sudden traffic drop anomaly is flagged. If \(\sigma_T = 0\), a fallback check flags an anomaly if: $$ x_T < \mu_T \cdot (1 - \text{drop_threshold}) $$
| PARAMETER | DESCRIPTION |
|---|---|
variant_col
|
Column representing treatment groups.
TYPE:
|
freq
|
Binning frequency.
TYPE:
|
window
|
Moving window size for baseline computation.
TYPE:
|
z_threshold
|
Z-score threshold for drop detection (must be positive).
TYPE:
|
drop_threshold
|
Percentage threshold drop fallback if variance is 0.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Dict[str, Any]: Dict containing keys: - anomaly_detected (bool) - latest_volume (float) - historical_mean (float) - historical_std (float) - z_score (float) - message (str) |
Source code in src\xpyrment\run\monitor.py
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 | |
check_live_srm
Calculates cumulative chi-square p-value and SRM flagging on the latest cumulative traffic.
| PARAMETER | DESCRIPTION |
|---|---|
expected_ratios
|
Target allocation proportions/ratios.
TYPE:
|
variant_col
|
Column representing treatment assignment groups.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Dict[str, Any]: Dict containing keys: - srm_detected (bool) - p_value (float) - observed_counts (List[int]) - expected_ratios (List[float]) - message (str) |
Source code in src\xpyrment\run\monitor.py
check_sequential_srm
check_sequential_srm(
variant_col: str = "variant",
target_treatment_ratio: float = 0.5,
delta: float = 0.02,
alpha: float = 0.01,
) -> Dict[str, Any]
Runs Wald's SPRT on the assignment series.
Sequential Sample Ratio Mismatch SPRT Theory
Wald's Sequential Probability Ratio Test (SPRT) is applied to the assignment sequence \(y_1, y_2, \dots, y_N\) where \(y_i \in \{0, 1\}\) (0 = control, 1 = treatment). The likelihood ratio \(LR_N\) at step \(N\) is calculated under a mixture of alternative hypotheses: $$ LR_N = 0.5 \cdot \prod_{i=1}^N \frac{f(y_i \mid p_0 + \delta)}{f(y_i \mid p_0)} + 0.5 \cdot \prod_{i=1}^N \frac{f(y_i \mid p_0 - \delta)}{f(y_i \mid p_0)} $$ where \(p_0\) is the target treatment allocation ratio. If \(LR_N \ge \frac{1}{\alpha}\), the null hypothesis of balanced allocation is rejected, indicating a sample ratio mismatch.
| PARAMETER | DESCRIPTION |
|---|---|
variant_col
|
Column representing treatment assignment groups.
TYPE:
|
target_treatment_ratio
|
Target allocation ratio for the treatment group.
TYPE:
|
delta
|
SPRT mismatch shift boundary.
TYPE:
|
alpha
|
SPRT Type I error threshold boundary.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Dict[str, Any]: Dict containing keys: - srm_detected (bool) - running_likelihood_ratios (np.ndarray) - stopped_index (int) - message (str) |
Source code in src\xpyrment\run\monitor.py
run_telemetry_checks
run_telemetry_checks(
expected_ratios: List[float],
variant_col: str = "variant",
freq: str = "D",
window: int = 7,
z_threshold: float = 3.0,
) -> Dict[str, Any]
Runs traffic anomaly, cumulative SRM, and sequential SRM checks, dispatching alerts if needed.
| PARAMETER | DESCRIPTION |
|---|---|
expected_ratios
|
Target allocation proportions/ratios.
TYPE:
|
variant_col
|
Column representing treatment assignment groups.
TYPE:
|
freq
|
Binning frequency.
TYPE:
|
window
|
Moving window size for traffic anomaly check.
TYPE:
|
z_threshold
|
Z-score threshold for drop detection.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
Dict[str, Any]
|
Dict[str, Any]: Combined results of all checks. |