pdstools.decision_analyzer.utils

Attributes

Classes

ColumnResolver

Resolves column mappings between raw data and a standardized schema.

Functions

apply_filter(df[, filters])

Apply a global set of filters. Kept outside of the DecisionData class as

area_under_curve(df, col_x, col_y)

gini_coefficient(df, col_x, col_y)

get_first_level_stats(interaction_data[, filters])

Returns first-level stats of a dataframe for the filter summary.

resolve_aliases(→ polars.LazyFrame)

Rename alias columns to their canonical raw key names before validation.

determine_extract_type(raw_data)

Detect whether the data is a Decision Analyzer (v2) or Explainability Extract (v1).

rename_and_cast_types(→ polars.LazyFrame)

Rename columns and cast data types based on table definition.

_cast_columns(→ polars.LazyFrame)

Cast columns to their target types.

get_table_definition(table)

create_hierarchical_selectors(→ dict[str, dict[str, ...)

Create hierarchical filter options and calculate indices for selectbox widgets.

get_scope_config(→ dict[str, ...)

Generate scope configuration for lever application and plotting based on user selections.

_get_interaction_id_candidates(→ list[str])

Build the set of possible interaction ID column names from the schema.

_find_interaction_id_column(→ str)

Return the first matching interaction ID column name from the data.

_determine_output_directory(→ pathlib.Path)

Determine the best output directory for cached/sampled files.

sample_interactions(→ polars.LazyFrame)

Sample interactions from a LazyFrame before ingestion.

prepare_and_save(→ tuple[polars.LazyFrame, ...)

Prepare data for analysis by sampling or caching, and persist as parquet.

parse_sample_flag(→ dict[str, int | float])

Parse the --sample CLI flag value into keyword arguments.

format_count_for_filename(→ str)

Format an interaction count for use in filenames.

should_cache_source(→ bool)

Return True if source should be cached as parquet.

_read_source_metadata(→ dict[str, str | float] | None)

Read pdstools metadata from a parquet file if it exists.

Module Contents

class ColumnResolver

Resolves column mappings between raw data and a standardized schema.

Raw decision data can come from multiple sources with different schemas: - Explainability Extract vs Decision Analyzer exports - Inbound vs Outbound channel data

For example, channel information may appear as: - ‘Channel’ (already using the display name) - ‘pyChannel’ (an alias for the display name) - ‘Primary_ContainerPayload_Channel’ (raw name needing rename) - Both raw key and display_name present (conflict requiring resolution)

This class normalizes these variations by: - Mapping raw column names to standardized display names - Resolving conflicts when both raw and display_name columns exist - Building the final schema with consistent column names

table_definition

Column definitions with ‘display_name’, ‘default’, and ‘type’ keys

Type:

dict

raw_columns

Column names present in the raw data

Type:

set[str]

table_definition: dict
raw_columns: set[str]
rename_mapping: dict[str, str]
type_mapping: dict[str, type[polars.DataType]]
columns_to_drop: list[str] = []
final_columns: list[str] = []
_resolved: bool = False
__post_init__()
resolve() ColumnResolver

Resolve all column mappings and conflicts.

Returns:

Self, for method chaining

Return type:

ColumnResolver

get_missing_columns() list[str]

Get list of required columns missing from the raw data.

Returns:

Column names that are marked as default but not found in raw data

Return type:

list[str]

SCOPE_HIERARCHY = ['Issue', 'Group', 'Action']
PRIO_FACTORS = ['Propensity', 'Value', 'Context Weight', 'Levers']
PRIO_COMPONENTS = ['Propensity', 'Value', 'Context Weight', 'Levers', 'Priority']
apply_filter(df: polars.LazyFrame, filters: polars.Expr | list[polars.Expr] | None = None)

Apply a global set of filters. Kept outside of the DecisionData class as this is really more of a utility function, not bound to that class at all.

Parameters:
  • df (polars.LazyFrame)

  • filters (polars.Expr | list[polars.Expr] | None)

area_under_curve(df: polars.DataFrame, col_x: str, col_y: str)
Parameters:
  • df (polars.DataFrame)

  • col_x (str)

  • col_y (str)

gini_coefficient(df: polars.DataFrame, col_x: str, col_y: str)
Parameters:
  • df (polars.DataFrame)

  • col_x (str)

  • col_y (str)

get_first_level_stats(interaction_data: polars.LazyFrame, filters: list[polars.Expr] | None = None)

Returns first-level stats of a dataframe for the filter summary.

Shows unique actions (Issue/Group/Action combinations), unique interactions (decisions), and total rows so users understand the impact of their filters.

Parameters:
  • interaction_data (polars.LazyFrame)

  • filters (list[polars.Expr] | None)

resolve_aliases(df: polars.LazyFrame, *table_definitions: dict) polars.LazyFrame

Rename alias columns to their canonical raw key names before validation.

Scans all table definitions for aliases entries. If an alias is found in the data but neither the raw key nor the display_name is present, the column is renamed to the raw key so downstream processing can find it.

Parameters:
  • df (pl.LazyFrame) – Raw data that may use alternative column names.

  • *table_definitions (dict) – One or more table definition dicts (DecisionAnalyzer, ExplainabilityExtract).

Returns:

Data with alias columns renamed to canonical raw key names.

Return type:

pl.LazyFrame

determine_extract_type(raw_data)

Detect whether the data is a Decision Analyzer (v2) or Explainability Extract (v1).

V2 data must have both a strategy name column and stage pipeline columns (Stage_pyStageGroup / Stage Group). Data that has strategy names but no stage information (e.g. pre-aggregated or anonymized exports) is treated as v1 so the synthetic-stage fallback is used.

rename_and_cast_types(df: polars.LazyFrame, table_definition: dict) polars.LazyFrame

Rename columns and cast data types based on table definition.

Performs a single-pass rename from raw column keys to display names, then casts types for default columns.

Parameters:
  • df (pl.LazyFrame) – The input dataframe to process

  • table_definition (dict) – Dictionary containing column definitions with ‘display_name’, ‘default’, and ‘type’ keys

Returns:

Processed dataframe with renamed columns and cast types

Return type:

pl.LazyFrame

_cast_columns(df: polars.LazyFrame, type_mapping: dict[str, type[polars.DataType]]) polars.LazyFrame

Cast columns to their target types.

Parameters:
  • df (pl.LazyFrame) – The dataframe to process

  • type_mapping (dict[str, type[pl.DataType]]) – Mapping of column names to their target types

Returns:

Dataframe with columns cast to target types

Return type:

pl.LazyFrame

get_table_definition(table: str)
Parameters:

table (str)

create_hierarchical_selectors(data: polars.LazyFrame, selected_issue: str | None = None, selected_group: str | None = None, selected_action: str | None = None) dict[str, dict[str, list[str] | int]]

Create hierarchical filter options and calculate indices for selectbox widgets.

Args:

data: LazyFrame with hierarchical data (should be pre-filtered to desired stage) selected_issue: Currently selected issue (optional) selected_group: Currently selected group (optional) selected_action: Currently selected action (optional)

Returns:

dict with structure: {

“issues”: {“options”: […], “index”: 0}, “groups”: {“options”: [“All”, …], “index”: 0}, “actions”: {“options”: [“All”, …], “index”: 0}

}

Parameters:
  • data (polars.LazyFrame)

  • selected_issue (str | None)

  • selected_group (str | None)

  • selected_action (str | None)

Return type:

dict[str, dict[str, list[str] | int]]

get_scope_config(selected_issue: str, selected_group: str, selected_action: str) dict[str, str | polars.Expr | list[str]]

Generate scope configuration for lever application and plotting based on user selections.

Parameters:
  • selected_issue (str) – Selected issue value from dropdown (can be “All”)

  • selected_group (str) – Selected group value from dropdown (can be “All”)

  • selected_action (str) – Selected action value from dropdown (can be “All”)

Returns:

Configuration dictionary containing: - level: “Action”, “Group”, or “Issue” indicating scope level - lever_condition: Polars expression for filtering selected actions - group_cols: List of column names for grouping operations - x_col: Column name to use for x-axis in plots - selected_value: The actual selected value for highlighting - plot_title_prefix: Prefix for plot titles

Return type:

dict[str, str | pl.Expr | list[str]]

logger
_INTERACTION_ID_RAW_KEY = 'pxInteractionID'
_get_interaction_id_candidates() list[str]

Build the set of possible interaction ID column names from the schema.

Collects the raw key, display name, and aliases from both table definitions so this stays in sync with column_schema.py.

Return type:

list[str]

_find_interaction_id_column(columns: set[str]) str

Return the first matching interaction ID column name from the data.

Parameters:

columns (set[str])

Return type:

str

_determine_output_directory(source_path: str | None, output_dir: str | None) pathlib.Path

Determine the best output directory for cached/sampled files.

Priority order: 1. If output_dir is explicitly provided, use that 2. Otherwise, if source_path is a file, use its parent directory (if writeable) 3. Otherwise, if source_path is a directory, use its parent directory (if writeable) 4. Otherwise, fall back to current directory

Parameters:
  • source_path (str | None) – Path to the source file or directory.

  • output_dir (str | None) – Explicitly requested output directory (takes precedence if provided).

Returns:

Directory to use for output.

Return type:

Path

sample_interactions(df: polars.LazyFrame, n: int | None = None, fraction: float | None = None, id_column: str | None = None, use_random: bool = False, total_interactions: int | None = None) polars.LazyFrame

Sample interactions from a LazyFrame before ingestion.

By default, uses deterministic hash-based filtering so the same data and limit always produce the same sample. When sampling already-sampled data, uses random sampling to avoid bias from repeated deterministic sampling.

All rows belonging to a selected interaction are kept (stratified on interaction ID).

Exactly one of n or fraction must be provided.

Parameters:
  • df (pl.LazyFrame) – Raw data to sample from.

  • n (int, optional) – Maximum number of unique interactions to keep.

  • fraction (float, optional) – Fraction of interactions to keep (0.0–1.0).

  • id_column (str, optional) – Name of the interaction ID column. Auto-detected if not given.

  • use_random (bool, default False) – If True, use random sampling instead of deterministic hash-based sampling. This should be set when sampling already-sampled data to avoid bias.

  • total_interactions (int, optional) – Pre-computed total number of unique interactions. If provided, avoids an expensive full-data scan to count them.

Returns:

Filtered LazyFrame containing only the sampled interactions.

Return type:

pl.LazyFrame

Examples

Sample ~50 000 interactions from a large export and save to parquet:

>>> from pdstools.pega_io.File import read_ds_export
>>> from pdstools.decision_analyzer.utils import sample_interactions
>>> df = read_ds_export("big_export.zip")
>>> sampled = sample_interactions(df, n=50_000)
>>> sampled.collect().write_parquet("my_sample.parquet")

Sample 10 % of interactions (lazy — no full scan needed):

>>> sampled = sample_interactions(df, fraction=0.10)

See also

prepare_and_save

Sample and persist as parquet with source metadata.

prepare_and_save(df: polars.LazyFrame, n: int | None = None, fraction: float | None = None, output_dir: str | None = None, source_path: str | None = None) tuple[polars.LazyFrame, pathlib.Path | None]

Prepare data for analysis by sampling or caching, and persist as parquet.

Sampling mode (when n or fraction provided): Writes decision_analyzer_sample_<count>.parquet into output_dir (defaults to the current working directory). Returns a LazyFrame scanning the written file plus the file path.

Caching mode (when neither n nor fraction provided): Writes decision_analyzer_cache_<count>.parquet into output_dir with 100% sample metadata. Useful for caching non-parquet sources (CSV, JSON, ZIP) for faster reloading.

The parquet file includes metadata tracking: - Original source file path - Sample percentage relative to original data (100% for caching mode) - Whether percentage was calculated exactly or approximated

If sampling is requested but the data is smaller than the requested sample, sampling is skipped and the original LazyFrame is returned unchanged (no file is written).

Parameters:
  • df (pl.LazyFrame) – Raw data to process.

  • n (int, optional) – Maximum number of unique interactions to keep (sampling mode).

  • fraction (float, optional) – Fraction of interactions to keep 0.0–1.0 (sampling mode).

  • output_dir (str, optional) – Directory for the output parquet file. If not provided, defaults to the source file’s directory (when source is a file and directory is writeable), otherwise current directory ".".

  • source_path (str, optional) – Path to the original source file for metadata tracking and determining output directory.

Returns:

The (possibly sampled/cached) LazyFrame and the path to the written parquet file, or None when no file was written.

Return type:

tuple[pl.LazyFrame, Path | None]

Examples

Sample ~50 000 interactions from a large zip and save as parquet (equivalent to pdstools da --data-path big_export.zip --sample 50000 but usable without the Streamlit app):

>>> from pdstools.pega_io.File import read_ds_export
>>> from pdstools.decision_analyzer.utils import prepare_and_save
>>> df = read_ds_export("big_export.zip")
>>> sampled, path = prepare_and_save(
...     df, n=50_000, source_path="big_export.zip"
... )
>>> print(path)
decision_analyzer_sample_50k.parquet

Sample from a parquet file:

>>> df = pl.scan_parquet("large_data.parquet")
>>> sampled, path = prepare_and_save(
...     df,
...     n=100000,
...     source_path="large_data.parquet"
... )

Cache non-parquet data (no sampling, just convert to parquet):

>>> df = read_ds_export("export.csv")
>>> cached, path = prepare_and_save(df, source_path="export.csv")

Read metadata from a prepared file:

>>> metadata = pl.read_parquet_metadata("decision_analyzer_sample_50k.parquet")
>>> print(metadata["pdstools:source_file"])
big_export.zip
>>> print(metadata["pdstools:sample_percentage"])
5.0

See also

sample_interactions

Lower-level sampling without file persistence.

parse_sample_flag(value: str) dict[str, int | float]

Parse the --sample CLI flag value into keyword arguments.

Delegates to pdstools.utils.streamlit_utils.parse_sample_spec().

Parameters:

value (str)

Return type:

dict[str, int | float]

format_count_for_filename(count: int) str

Format an interaction count for use in filenames.

Uses human-readable abbreviations with 2 significant figures.

Parameters:

count (int) – Number of interactions.

Returns:

Formatted count (e.g., “87k”, “1.2M”, “2.5B”).

Return type:

str

Examples

>>> format_count_for_filename(42)
'42'
>>> format_count_for_filename(1500)
'1.5k'
>>> format_count_for_filename(87432)
'87k'
>>> format_count_for_filename(1234567)
'1.2M'
should_cache_source(source_path: str | None) bool

Return True if source should be cached as parquet.

Caching is beneficial for non-parquet sources (CSV, JSON, ZIP, directories) but unnecessary for single parquet files which are already optimized.

Parameters:

source_path (str | None) – Path to the source file or directory.

Returns:

True if source should be cached, False otherwise.

Return type:

bool

Examples

>>> should_cache_source("/data/export.csv")
True
>>> should_cache_source("/data/export.parquet")
False
>>> should_cache_source(None)
False
_read_source_metadata(source_path: str) dict[str, str | float] | None

Read pdstools metadata from a parquet file if it exists.

Parameters:

source_path (str) – Path to the parquet file to check.

Returns:

Dictionary with keys: source_file, sample_percentage, method Returns None if file doesn’t exist, is not parquet, or lacks metadata.

Return type:

dict or None