pdstools.decision_analyzer.utils ================================ .. py:module:: pdstools.decision_analyzer.utils Attributes ---------- .. autoapisummary:: pdstools.decision_analyzer.utils.logger pdstools.decision_analyzer.utils.SCOPE_HIERARCHY pdstools.decision_analyzer.utils.PRIO_FACTORS pdstools.decision_analyzer.utils.PRIO_COMPONENTS Classes ------- .. autoapisummary:: pdstools.decision_analyzer.utils.ColumnResolver Functions --------- .. autoapisummary:: pdstools.decision_analyzer.utils.apply_filter pdstools.decision_analyzer.utils.area_under_curve pdstools.decision_analyzer.utils.gini_coefficient pdstools.decision_analyzer.utils.get_first_level_stats pdstools.decision_analyzer.utils.resolve_aliases pdstools.decision_analyzer.utils.determine_extract_type pdstools.decision_analyzer.utils.rename_and_cast_types pdstools.decision_analyzer.utils.get_table_definition pdstools.decision_analyzer.utils.create_hierarchical_selectors pdstools.decision_analyzer.utils.get_scope_config pdstools.decision_analyzer.utils.sample_interactions pdstools.decision_analyzer.utils.prepare_and_save pdstools.decision_analyzer.utils.parse_sample_flag pdstools.decision_analyzer.utils.resolve_filter_column pdstools.decision_analyzer.utils.parse_filter_specs pdstools.decision_analyzer.utils.format_count_for_filename pdstools.decision_analyzer.utils.should_cache_source Module Contents --------------- .. py:data:: logger .. py:class:: ColumnResolver Resolves column mappings between raw data and a standardized schema. Raw decision data can come from multiple sources with different schemas: - Explainability Extract vs Decision Analyzer exports - Inbound vs Outbound channel data For example, channel information may appear as: - 'Channel' (already using the display name) - 'pyChannel' (an alias for the display name) - 'Primary_ContainerPayload_Channel' (raw name needing rename) - Both raw key and display_name present (conflict requiring resolution) This class normalizes these variations by: - Mapping raw column names to standardized display names - Resolving conflicts when both raw and display_name columns exist - Building the final schema with consistent column names .. attribute:: table_definition Column definitions with 'display_name', 'default', and 'type' keys :type: dict .. attribute:: raw_columns Column names present in the raw data :type: set[str] .. py:attribute:: table_definition :type: dict .. py:attribute:: raw_columns :type: set[str] .. py:attribute:: rename_mapping :type: dict[str, str] .. py:attribute:: type_mapping :type: dict[str, type[polars.DataType]] .. py:attribute:: columns_to_drop :type: list[str] :value: [] .. py:attribute:: final_columns :type: list[str] :value: [] .. py:method:: __post_init__() .. py:method:: resolve() -> ColumnResolver Resolve all column mappings and conflicts. :returns: Self, for method chaining :rtype: ColumnResolver .. py:method:: get_missing_columns() -> list[str] Get list of required columns missing from the raw data. :returns: Column names that are marked as default but not found in raw data :rtype: list[str] .. py:data:: SCOPE_HIERARCHY :value: ['Issue', 'Group', 'Action'] .. py:data:: PRIO_FACTORS :value: ['Propensity', 'Value', 'Context Weight', 'Levers'] .. py:data:: PRIO_COMPONENTS :value: ['Propensity', 'Value', 'Context Weight', 'Levers', 'Priority'] .. py:function:: apply_filter(df: polars.LazyFrame, filters: polars.Expr | list[polars.Expr] | None = None) Apply a global set of filters. Kept outside of the DecisionData class as this is really more of a utility function, not bound to that class at all. .. py:function:: area_under_curve(df: polars.DataFrame, col_x: str, col_y: str) Area under curve. .. py:function:: gini_coefficient(df: polars.DataFrame, col_x: str, col_y: str) Gini coefficient. .. py:function:: get_first_level_stats(interaction_data: polars.LazyFrame, filters: list[polars.Expr] | None = None) Returns first-level stats of a dataframe for the filter summary. Shows unique actions (Issue/Group/Action combinations), unique interactions (decisions), and total rows so users understand the impact of their filters. .. py:function:: resolve_aliases(df: polars.LazyFrame, *table_definitions: dict) -> polars.LazyFrame Rename alias columns to their canonical raw key names before validation. Scans all table definitions for ``aliases`` entries. If an alias is found in the data but neither the raw key nor the display_name is present, the column is renamed to the raw key so downstream processing can find it. :param df: Raw data that may use alternative column names. :type df: pl.LazyFrame :param \*table_definitions: One or more table definition dicts (DecisionAnalyzer, ExplainabilityExtract). :type \*table_definitions: dict :returns: Data with alias columns renamed to canonical raw key names. :rtype: pl.LazyFrame .. py:function:: determine_extract_type(raw_data) Detect whether the data is a Decision Analyzer (v2) or Explainability Extract (v1). V2 data must have both a strategy name column *and* stage pipeline columns (``Stage_pyStageGroup`` / ``Stage Group``). Data that has strategy names but no stage information (e.g. pre-aggregated or anonymized exports) is treated as v1 so the synthetic-stage fallback is used. .. py:function:: rename_and_cast_types(df: polars.LazyFrame, table_definition: dict) -> polars.LazyFrame Rename columns and cast data types based on table definition. Performs a single-pass rename from raw column keys to display names, then casts types for default columns. :param df: The input dataframe to process :type df: pl.LazyFrame :param table_definition: Dictionary containing column definitions with 'display_name', 'default', and 'type' keys :type table_definition: dict :returns: Processed dataframe with renamed columns and cast types :rtype: pl.LazyFrame .. py:function:: get_table_definition(table: str) Get table definition. .. py:function:: create_hierarchical_selectors(data: polars.LazyFrame, selected_issue: str | None = None, selected_group: str | None = None, selected_action: str | None = None) -> dict[str, dict[str, list[str] | int]] Create hierarchical filter options and calculate indices for selectbox widgets. :param data: LazyFrame with hierarchical data (should be pre-filtered to desired stage) :type data: pl.LazyFrame :param selected_issue: Currently selected issue (optional) :type selected_issue: str | None :param selected_group: Currently selected group (optional) :type selected_group: str | None :param selected_action: Currently selected action (optional) :type selected_action: str | None :returns: dict with structure: { "issues": {"options": [...], "index": 0}, "groups": {"options": ["All", ...], "index": 0}, "actions": {"options": ["All", ...], "index": 0} } :rtype: dict[str, dict[str, list[str] | int]] .. py:function:: get_scope_config(selected_issue: str, selected_group: str, selected_action: str) -> dict[str, str | polars.Expr | list[str]] Generate scope configuration for lever application and plotting based on user selections. :param selected_issue: Selected issue value from dropdown (can be "All") :type selected_issue: str :param selected_group: Selected group value from dropdown (can be "All") :type selected_group: str :param selected_action: Selected action value from dropdown (can be "All") :type selected_action: str :returns: Configuration dictionary containing: - level: "Action", "Group", or "Issue" indicating scope level - lever_condition: Polars expression for filtering selected actions - group_cols: List of column names for grouping operations - x_col: Column name to use for x-axis in plots - selected_value: The actual selected value for highlighting - plot_title_prefix: Prefix for plot titles :rtype: dict[str, str | pl.Expr | list[str]] .. py:function:: sample_interactions(df: polars.LazyFrame, n: int | None = None, fraction: float | None = None, id_column: str | None = None, use_random: bool = False, total_interactions: int | None = None) -> polars.LazyFrame Sample interactions from a LazyFrame before ingestion. By default, uses deterministic hash-based filtering so the same data and limit always produce the same sample. When sampling already-sampled data, uses random sampling to avoid bias from repeated deterministic sampling. All rows belonging to a selected interaction are kept (stratified on interaction ID). Exactly one of *n* or *fraction* must be provided. :param df: Raw data to sample from. :type df: pl.LazyFrame :param n: Maximum number of unique interactions to keep. :type n: int, optional :param fraction: Fraction of interactions to keep (0.0–1.0). :type fraction: float, optional :param id_column: Name of the interaction ID column. Auto-detected if not given. :type id_column: str, optional :param use_random: If True, use random sampling instead of deterministic hash-based sampling. This should be set when sampling already-sampled data to avoid bias. :type use_random: bool, default False :param total_interactions: Pre-computed total number of unique interactions. If provided, avoids an expensive full-data scan to count them. :type total_interactions: int, optional :returns: Filtered LazyFrame containing only the sampled interactions. :rtype: pl.LazyFrame .. rubric:: Examples Sample ~50 000 interactions from a large export and save to parquet: >>> from pdstools.pega_io.File import read_ds_export >>> from pdstools.decision_analyzer.utils import sample_interactions >>> df = read_ds_export("big_export.zip") >>> sampled = sample_interactions(df, n=50_000) >>> sampled.collect().write_parquet("my_sample.parquet") Sample 10 % of interactions (lazy — no full scan needed): >>> sampled = sample_interactions(df, fraction=0.10) .. seealso:: :py:obj:`prepare_and_save` Sample *and* persist as parquet with source metadata. .. py:function:: prepare_and_save(df: polars.LazyFrame, n: int | None = None, fraction: float | None = None, output_dir: str | None = None, source_path: str | None = None) -> tuple[polars.LazyFrame, pathlib.Path | None] Prepare data for analysis by sampling or caching, and persist as parquet. **Sampling mode** (when n or fraction provided): Writes ``decision_analyzer_sample_.parquet`` into *output_dir* (defaults to the current working directory). Returns a LazyFrame scanning the written file plus the file path. **Caching mode** (when neither n nor fraction provided): Writes ``decision_analyzer_cache_.parquet`` into *output_dir* with 100% sample metadata. Useful for caching non-parquet sources (CSV, JSON, ZIP) for faster reloading. The parquet file includes metadata tracking: - Original source file path - Sample percentage relative to original data (100% for caching mode) - Whether percentage was calculated exactly or approximated If sampling is requested but the data is smaller than the requested sample, sampling is skipped and the original LazyFrame is returned unchanged (no file is written). :param df: Raw data to process. :type df: pl.LazyFrame :param n: Maximum number of unique interactions to keep (sampling mode). :type n: int, optional :param fraction: Fraction of interactions to keep 0.0–1.0 (sampling mode). :type fraction: float, optional :param output_dir: Directory for the output parquet file. If not provided, defaults to the source file's directory (when source is a file and directory is writeable), otherwise current directory ``"."``. :type output_dir: str, optional :param source_path: Path to the original source file for metadata tracking and determining output directory. :type source_path: str, optional :returns: The (possibly sampled/cached) LazyFrame and the path to the written parquet file, or ``None`` when no file was written. :rtype: tuple[pl.LazyFrame, Path | None] .. rubric:: Examples Sample ~50 000 interactions from a large zip and save as parquet (equivalent to ``pdstools da --data-path big_export.zip --sample 50000`` but usable without the Streamlit app): >>> from pdstools.pega_io.File import read_ds_export >>> from pdstools.decision_analyzer.utils import prepare_and_save >>> df = read_ds_export("big_export.zip") >>> sampled, path = prepare_and_save( ... df, n=50_000, source_path="big_export.zip" ... ) >>> print(path) decision_analyzer_sample_50k.parquet Sample from a parquet file: >>> df = pl.scan_parquet("large_data.parquet") >>> sampled, path = prepare_and_save( ... df, ... n=100000, ... source_path="large_data.parquet" ... ) Cache non-parquet data (no sampling, just convert to parquet): >>> df = read_ds_export("export.csv") >>> cached, path = prepare_and_save(df, source_path="export.csv") Read metadata from a prepared file: >>> metadata = pl.read_parquet_metadata("decision_analyzer_sample_50k.parquet") >>> print(metadata["pdstools:source_file"]) big_export.zip >>> print(metadata["pdstools:sample_percentage"]) 5.0 .. seealso:: :py:obj:`sample_interactions` Lower-level sampling without file persistence. .. py:function:: parse_sample_flag(value: str) -> dict[str, int | float] Parse the ``--sample`` CLI flag value into keyword arguments. Delegates to :func:`pdstools.utils.streamlit_utils.parse_sample_spec`. .. py:function:: resolve_filter_column(name: str, available_columns: set[str]) -> str Resolve a user-friendly column name to the actual column in the data. Checks display names, aliases, and raw keys from both the ``DecisionAnalyzer`` and ``ExplainabilityExtract`` schemas. Resolution is case-insensitive. :param name: User-provided column name (display name, alias, or raw key). :type name: str :param available_columns: Column names actually present in the raw data. :type available_columns: set[str] :returns: The actual column name present in *available_columns*. :rtype: str :raises ValueError: If *name* cannot be resolved to any column in *available_columns*. .. py:function:: parse_filter_specs(filter_specs: list[str], available_columns: set[str]) -> polars.Expr Parse ``--filter`` specs into a combined Polars filter expression. Supported syntax: * **Categorical:** ``"Column Name=value1,value2,..."`` — exact-match on any listed value. * **Numeric:** ``"Column>=N"``, ``"Column<=N"``, ``"Column>N"``, ``"Column str Format an interaction count for use in filenames. Uses human-readable abbreviations with 2 significant figures. :param count: Number of interactions. :type count: int :returns: Formatted count (e.g., "87k", "1.2M", "2.5B"). :rtype: str .. rubric:: Examples >>> format_count_for_filename(42) '42' >>> format_count_for_filename(1500) '1.5k' >>> format_count_for_filename(87432) '87k' >>> format_count_for_filename(1234567) '1.2M' .. py:function:: should_cache_source(source_path: str | None) -> bool Return True if source should be cached as parquet. Caching is beneficial for non-parquet sources (CSV, JSON, ZIP, directories) but unnecessary for single parquet files which are already optimized. :param source_path: Path to the source file or directory. :type source_path: str | None :returns: True if source should be cached, False otherwise. :rtype: bool .. rubric:: Examples >>> should_cache_source("/data/export.csv") True >>> should_cache_source("/data/export.parquet") False >>> should_cache_source(None) False