pdstools.decision_analyzer.utils¶
Attributes¶
Classes¶
Resolves column mappings between raw data and a standardized schema. |
Functions¶
|
Apply a global set of filters. Kept outside of the DecisionData class as |
|
|
|
|
|
Returns first-level stats of a dataframe for the filter summary. |
|
Rename alias columns to their canonical raw key names before validation. |
|
Detect whether the data is a Decision Analyzer (v2) or Explainability Extract (v1). |
|
Rename columns and cast data types based on table definition. |
|
Cast columns to their target types. |
|
|
|
Create hierarchical filter options and calculate indices for selectbox widgets. |
|
Generate scope configuration for lever application and plotting based on user selections. |
|
Build the set of possible interaction ID column names from the schema. |
|
Return the first matching interaction ID column name from the data. |
|
Determine the best output directory for cached/sampled files. |
|
Sample interactions from a LazyFrame before ingestion. |
|
Prepare data for analysis by sampling or caching, and persist as parquet. |
|
Parse the |
|
Format an interaction count for use in filenames. |
|
Return True if source should be cached as parquet. |
|
Read pdstools metadata from a parquet file if it exists. |
Module Contents¶
- class ColumnResolver¶
Resolves column mappings between raw data and a standardized schema.
Raw decision data can come from multiple sources with different schemas: - Explainability Extract vs Decision Analyzer exports - Inbound vs Outbound channel data
For example, channel information may appear as: - ‘Channel’ (already using the display name) - ‘pyChannel’ (an alias for the display name) - ‘Primary_ContainerPayload_Channel’ (raw name needing rename) - Both raw key and display_name present (conflict requiring resolution)
This class normalizes these variations by: - Mapping raw column names to standardized display names - Resolving conflicts when both raw and display_name columns exist - Building the final schema with consistent column names
- __post_init__()¶
- resolve() ColumnResolver¶
Resolve all column mappings and conflicts.
- Returns:
Self, for method chaining
- Return type:
- SCOPE_HIERARCHY = ['Issue', 'Group', 'Action']¶
- PRIO_FACTORS = ['Propensity', 'Value', 'Context Weight', 'Levers']¶
- PRIO_COMPONENTS = ['Propensity', 'Value', 'Context Weight', 'Levers', 'Priority']¶
- apply_filter(df: polars.LazyFrame, filters: polars.Expr | list[polars.Expr] | None = None)¶
Apply a global set of filters. Kept outside of the DecisionData class as this is really more of a utility function, not bound to that class at all.
- Parameters:
df (polars.LazyFrame)
filters (polars.Expr | list[polars.Expr] | None)
- get_first_level_stats(interaction_data: polars.LazyFrame, filters: list[polars.Expr] | None = None)¶
Returns first-level stats of a dataframe for the filter summary.
Shows unique actions (Issue/Group/Action combinations), unique interactions (decisions), and total rows so users understand the impact of their filters.
- Parameters:
interaction_data (polars.LazyFrame)
filters (list[polars.Expr] | None)
- resolve_aliases(df: polars.LazyFrame, *table_definitions: dict) polars.LazyFrame¶
Rename alias columns to their canonical raw key names before validation.
Scans all table definitions for
aliasesentries. If an alias is found in the data but neither the raw key nor the display_name is present, the column is renamed to the raw key so downstream processing can find it.- Parameters:
df (pl.LazyFrame) – Raw data that may use alternative column names.
*table_definitions (dict) – One or more table definition dicts (DecisionAnalyzer, ExplainabilityExtract).
- Returns:
Data with alias columns renamed to canonical raw key names.
- Return type:
pl.LazyFrame
- determine_extract_type(raw_data)¶
Detect whether the data is a Decision Analyzer (v2) or Explainability Extract (v1).
V2 data must have both a strategy name column and stage pipeline columns (
Stage_pyStageGroup/Stage Group). Data that has strategy names but no stage information (e.g. pre-aggregated or anonymized exports) is treated as v1 so the synthetic-stage fallback is used.
- rename_and_cast_types(df: polars.LazyFrame, table_definition: dict) polars.LazyFrame¶
Rename columns and cast data types based on table definition.
Performs a single-pass rename from raw column keys to display names, then casts types for default columns.
- Parameters:
df (pl.LazyFrame) – The input dataframe to process
table_definition (dict) – Dictionary containing column definitions with ‘display_name’, ‘default’, and ‘type’ keys
- Returns:
Processed dataframe with renamed columns and cast types
- Return type:
pl.LazyFrame
- _cast_columns(df: polars.LazyFrame, type_mapping: dict[str, type[polars.DataType]]) polars.LazyFrame¶
Cast columns to their target types.
- create_hierarchical_selectors(data: polars.LazyFrame, selected_issue: str | None = None, selected_group: str | None = None, selected_action: str | None = None) dict[str, dict[str, list[str] | int]]¶
Create hierarchical filter options and calculate indices for selectbox widgets.
- Args:
data: LazyFrame with hierarchical data (should be pre-filtered to desired stage) selected_issue: Currently selected issue (optional) selected_group: Currently selected group (optional) selected_action: Currently selected action (optional)
- Returns:
dict with structure: {
“issues”: {“options”: […], “index”: 0}, “groups”: {“options”: [“All”, …], “index”: 0}, “actions”: {“options”: [“All”, …], “index”: 0}
}
- get_scope_config(selected_issue: str, selected_group: str, selected_action: str) dict[str, str | polars.Expr | list[str]]¶
Generate scope configuration for lever application and plotting based on user selections.
- Parameters:
- Returns:
Configuration dictionary containing: - level: “Action”, “Group”, or “Issue” indicating scope level - lever_condition: Polars expression for filtering selected actions - group_cols: List of column names for grouping operations - x_col: Column name to use for x-axis in plots - selected_value: The actual selected value for highlighting - plot_title_prefix: Prefix for plot titles
- Return type:
- logger¶
- _INTERACTION_ID_RAW_KEY = 'pxInteractionID'¶
- _get_interaction_id_candidates() list[str]¶
Build the set of possible interaction ID column names from the schema.
Collects the raw key, display name, and aliases from both table definitions so this stays in sync with
column_schema.py.
- _find_interaction_id_column(columns: set[str]) str¶
Return the first matching interaction ID column name from the data.
- _determine_output_directory(source_path: str | None, output_dir: str | None) pathlib.Path¶
Determine the best output directory for cached/sampled files.
Priority order: 1. If output_dir is explicitly provided, use that 2. Otherwise, if source_path is a file, use its parent directory (if writeable) 3. Otherwise, if source_path is a directory, use its parent directory (if writeable) 4. Otherwise, fall back to current directory
- sample_interactions(df: polars.LazyFrame, n: int | None = None, fraction: float | None = None, id_column: str | None = None, use_random: bool = False, total_interactions: int | None = None) polars.LazyFrame¶
Sample interactions from a LazyFrame before ingestion.
By default, uses deterministic hash-based filtering so the same data and limit always produce the same sample. When sampling already-sampled data, uses random sampling to avoid bias from repeated deterministic sampling.
All rows belonging to a selected interaction are kept (stratified on interaction ID).
Exactly one of n or fraction must be provided.
- Parameters:
df (pl.LazyFrame) – Raw data to sample from.
n (int, optional) – Maximum number of unique interactions to keep.
fraction (float, optional) – Fraction of interactions to keep (0.0–1.0).
id_column (str, optional) – Name of the interaction ID column. Auto-detected if not given.
use_random (bool, default False) – If True, use random sampling instead of deterministic hash-based sampling. This should be set when sampling already-sampled data to avoid bias.
total_interactions (int, optional) – Pre-computed total number of unique interactions. If provided, avoids an expensive full-data scan to count them.
- Returns:
Filtered LazyFrame containing only the sampled interactions.
- Return type:
pl.LazyFrame
Examples
Sample ~50 000 interactions from a large export and save to parquet:
>>> from pdstools.pega_io.File import read_ds_export >>> from pdstools.decision_analyzer.utils import sample_interactions >>> df = read_ds_export("big_export.zip") >>> sampled = sample_interactions(df, n=50_000) >>> sampled.collect().write_parquet("my_sample.parquet")
Sample 10 % of interactions (lazy — no full scan needed):
>>> sampled = sample_interactions(df, fraction=0.10)
See also
prepare_and_saveSample and persist as parquet with source metadata.
- prepare_and_save(df: polars.LazyFrame, n: int | None = None, fraction: float | None = None, output_dir: str | None = None, source_path: str | None = None) tuple[polars.LazyFrame, pathlib.Path | None]¶
Prepare data for analysis by sampling or caching, and persist as parquet.
Sampling mode (when n or fraction provided): Writes
decision_analyzer_sample_<count>.parquetinto output_dir (defaults to the current working directory). Returns a LazyFrame scanning the written file plus the file path.Caching mode (when neither n nor fraction provided): Writes
decision_analyzer_cache_<count>.parquetinto output_dir with 100% sample metadata. Useful for caching non-parquet sources (CSV, JSON, ZIP) for faster reloading.The parquet file includes metadata tracking: - Original source file path - Sample percentage relative to original data (100% for caching mode) - Whether percentage was calculated exactly or approximated
If sampling is requested but the data is smaller than the requested sample, sampling is skipped and the original LazyFrame is returned unchanged (no file is written).
- Parameters:
df (pl.LazyFrame) – Raw data to process.
n (int, optional) – Maximum number of unique interactions to keep (sampling mode).
fraction (float, optional) – Fraction of interactions to keep 0.0–1.0 (sampling mode).
output_dir (str, optional) – Directory for the output parquet file. If not provided, defaults to the source file’s directory (when source is a file and directory is writeable), otherwise current directory
".".source_path (str, optional) – Path to the original source file for metadata tracking and determining output directory.
- Returns:
The (possibly sampled/cached) LazyFrame and the path to the written parquet file, or
Nonewhen no file was written.- Return type:
tuple[pl.LazyFrame, Path | None]
Examples
Sample ~50 000 interactions from a large zip and save as parquet (equivalent to
pdstools da --data-path big_export.zip --sample 50000but usable without the Streamlit app):>>> from pdstools.pega_io.File import read_ds_export >>> from pdstools.decision_analyzer.utils import prepare_and_save >>> df = read_ds_export("big_export.zip") >>> sampled, path = prepare_and_save( ... df, n=50_000, source_path="big_export.zip" ... ) >>> print(path) decision_analyzer_sample_50k.parquet
Sample from a parquet file:
>>> df = pl.scan_parquet("large_data.parquet") >>> sampled, path = prepare_and_save( ... df, ... n=100000, ... source_path="large_data.parquet" ... )
Cache non-parquet data (no sampling, just convert to parquet):
>>> df = read_ds_export("export.csv") >>> cached, path = prepare_and_save(df, source_path="export.csv")
Read metadata from a prepared file:
>>> metadata = pl.read_parquet_metadata("decision_analyzer_sample_50k.parquet") >>> print(metadata["pdstools:source_file"]) big_export.zip >>> print(metadata["pdstools:sample_percentage"]) 5.0
See also
sample_interactionsLower-level sampling without file persistence.
- parse_sample_flag(value: str) dict[str, int | float]¶
Parse the
--sampleCLI flag value into keyword arguments.Delegates to
pdstools.utils.streamlit_utils.parse_sample_spec().
- format_count_for_filename(count: int) str¶
Format an interaction count for use in filenames.
Uses human-readable abbreviations with 2 significant figures.
- Parameters:
count (int) – Number of interactions.
- Returns:
Formatted count (e.g., “87k”, “1.2M”, “2.5B”).
- Return type:
Examples
>>> format_count_for_filename(42) '42' >>> format_count_for_filename(1500) '1.5k' >>> format_count_for_filename(87432) '87k' >>> format_count_for_filename(1234567) '1.2M'
- should_cache_source(source_path: str | None) bool¶
Return True if source should be cached as parquet.
Caching is beneficial for non-parquet sources (CSV, JSON, ZIP, directories) but unnecessary for single parquet files which are already optimized.
- Parameters:
source_path (str | None) – Path to the source file or directory.
- Returns:
True if source should be cached, False otherwise.
- Return type:
Examples
>>> should_cache_source("/data/export.csv") True >>> should_cache_source("/data/export.parquet") False >>> should_cache_source(None) False