pdstools¶
Pega Data Scientist Tools Python library
Submodules¶
Classes¶
Monitor and analyze ADM data from the Pega Datamart. |
|
Analyze Interaction History data from Pega CDH. |
|
Analyze and visualize Impact Analyzer experiment results from Pega CDH. |
|
Monitor and analyze Pega Prediction Studio Predictions. |
|
Analyze the Value Finder dataset for detailed insights |
Functions¶
|
Read in most out of the box Pega dataset export formats |
|
Function to determine the 'category' of a predictor. |
|
Import a sample dataset from the CDH Sample application |
|
Import a sample dataset of a Value Finder simulation |
Get a list of currently installed versions of pdstools and its dependencies. |
Package Contents¶
- class ADMDatamart(model_df: polars.LazyFrame | None = None, predictor_df: polars.LazyFrame | None = None, *, query: pdstools.utils.types.QUERY | None = None, extract_pyname_keys: bool = True)¶
Monitor and analyze ADM data from the Pega Datamart.
To initialize this class, either 1. Initialize directly with the model_df and predictor_df polars LazyFrames 2. Use one of the class methods: from_ds_export, from_s3, from_dataflow_export etc.
This class will read in the data from different sources, properly structure them from further analysis, and apply correct typing and useful renaming.
There is also a few “namespaces” that you can call from this class:
.plot contains ready-made plots to analyze the data with
.aggregates contains mostly internal data aggregations queries
.agb contains analysis utilities for Adaptive Gradient Boosting models
.generate leads to some ready-made reports, such as the Health Check
.bin_aggregator allows you to compare the bins across various models
- Parameters:
model_df (pl.LazyFrame, optional) – The Polars LazyFrame representation of the model snapshot table.
predictor_df (pl.LazyFrame, optional) – The Polars LazyFrame represenation of the predictor binning table.
query (QUERY, optional) – An optional query to apply to the input data. For details, see
pdstools.utils.cdh_utils._apply_query().extract_pyname_keys (bool, default = True) – Whether to extract extra keys from the pyName column. In older Pega versions, this contained pyTreatment among other (customizable) fields. By default True
Examples
>>> from pdstools import ADMDatamart >>> from glob import glob >>> dm = ADMDatamart( model_df = pl.scan_parquet('models.parquet'), predictor_df = pl.scan_parquet('predictors.parquet') query = {"Configuration":["Web_Click_Through"]} ) >>> dm = ADMDatamart.from_ds_export(base_path='/my_export_folder') >>> dm = ADMDatamart.from_s3("pega_export") >>> dm = ADMDatamart.from_dataflow_export(glob("data/models*"), glob("data/preds*"))
Note
This class depends on two datasets:
pyModelSnapshots corresponds to the model_data attribute
pyADMPredictorSnapshots corresponds to the predictor_data attribute
For instructions on how to download these datasets, please refer to the following article: https://docs.pega.com/bundle/platform/page/platform/decision-management/exporting-monitoring-database.html
See also
pdstools.adm.PlotsThe out of the box plots on the Datamart data
pdstools.adm.ReportsMethods to generate the Health Check and Model Report
pdstools.utils.cdh_utils._apply_queryHow to query the ADMDatamart class and methods
- aggregates: pdstools.adm.Aggregates.Aggregates¶
- generate: pdstools.adm.Reports.Reports¶
- bin_aggregator: pdstools.adm.BinAggregator.BinAggregator¶
- _get_first_action_dates(df: polars.LazyFrame | None) polars.LazyFrame | None¶
- Parameters:
df (Optional[polars.LazyFrame])
- Return type:
Optional[polars.LazyFrame]
- classmethod from_ds_export(model_filename: str | None = None, predictor_filename: str | None = None, base_path: os.PathLike | str = '.', *, query: pdstools.utils.types.QUERY | None = None, extract_pyname_keys: bool = True)¶
Import the ADMDatamart class from a Pega Dataset Export
- Parameters:
model_filename (Optional[str], optional) – The full path or name (if base_path is given) to the model snapshot files, by default None
predictor_filename (Optional[str], optional) – The full path or name (if base_path is given) to the predictor binning snapshot files, by default None
base_path (Union[os.PathLike, str], optional) – A base path to provide so that we can automatically find the most recent files for both the model and predictor snapshots, if model_filename and predictor_filename are not given as full paths, by default “.”
query (Optional[QUERY], optional) – An optional argument to filter out selected data, by default None
extract_pyname_keys (bool, optional) – Whether to extract additional keys from the pyName column, by default True
- Returns:
The properly initialized ADMDatamart class
- Return type:
Examples
>>> from pdstools import ADMDatamart
>>> # To automatically find the most recent files in the 'my_export_folder' dir: >>> dm = ADMDatamart.from_ds_export(base_path='/my_export_folder')
>>> # To specify individual files: >>> dm = ADMDatamart.from_ds_export( model_df='/Downloads/model_snapshots.parquet', predictor_df = '/Downloads/predictor_snapshots.parquet' )
Note
By default, the dataset export in Infinity returns a zip file per table. You do not need to open up this zip file! You can simply point to the zip, and this method will be able to read in the underlying data.
See also
pdstools.pega_io.File.read_ds_exportMore information on file compatibility
pdstools.utils.cdh_utils._apply_queryHow to query the ADMDatamart class and methods
- classmethod from_s3()¶
Not implemented yet. Please let us know if you would like this functionality!
- classmethod from_dataflow_export(model_data_files: Iterable[str] | str, predictor_data_files: Iterable[str] | str, *, query: pdstools.utils.types.QUERY | None = None, extract_pyname_keys: bool = True, cache_file_prefix: str = '', extension: Literal['json'] = 'json', compression: Literal['gzip'] = 'gzip', cache_directory: os.PathLike | str = 'cache')¶
Read in data generated by a data flow, such as the Prediction Studio export.
Dataflows are able to export data from and to various sources. As they are meant to be used in production, they are highly resiliant. For every partition and every node, a dataflow will output a small json file every few seconds. While this is great for production loads, it can be a bit more tricky to read in the data for smaller-scale and ad-hoc analyses.
This method aims to make the ingestion of such highly partitioned data easier. It reads in every individual small json file that the dataflow has output, and caches them to a parquet file in the cache_directory folder. As such, if you re-run this method later with more data added since the last export, we will not read in from the (slow) dataflow files, but rather from the (much faster) cache.
- Parameters:
model_data_files (Union[Iterable[str], str]) – A list of files to read in as the model snapshots
predictor_data_files (Union[Iterable[str], str]) – A list of files to read in as the predictor snapshots
query (Optional[QUERY], optional) – A, by default None
extract_pyname_keys (bool, optional) – Whether to extract extra keys from the pyName column, by default True
cache_file_prefix (str, optional) – An optional prefix for the cache files, by default “”
extension (Literal["json"], optional) – The extension of the source data, by default “json”
compression (Literal["gzip"], optional) – The compression of the source files, by default “gzip”
cache_directory (Union[os.PathLike, str], optional) – Where to store the cached files, by default “cache”
- Returns:
An initialized instance of the datamart class
- Return type:
Examples
>>> from pdstools import ADMDatamart >>> import glob >>> dm = ADMDatamart.from_dataflow_export(glob("data/models*"), glob("data/preds*"))
See also
pdstools.utils.cdh_utils._apply_queryHow to query the ADMDatamart class and methods
globMakes creating lists of files much easier
- classmethod from_pdc(df: polars.LazyFrame, return_df=False)¶
- Parameters:
df (polars.LazyFrame)
- _validate_model_data(df: polars.LazyFrame | None, extract_pyname_keys: bool = True) polars.LazyFrame | None¶
Internal method to validate model data
- Parameters:
df (Optional[polars.LazyFrame])
extract_pyname_keys (bool)
- Return type:
Optional[polars.LazyFrame]
- _validate_predictor_data(df: polars.LazyFrame | None) polars.LazyFrame | None¶
Internal method to validate predictor data
- Parameters:
df (Optional[polars.LazyFrame])
- Return type:
Optional[polars.LazyFrame]
- apply_predictor_categorization(categorization: polars.Expr | Callable[Ellipsis, polars.Expr] | Dict[str, str | List[str]] = cdh_utils.default_predictor_categorization, *, use_regexp: bool = False, df: polars.LazyFrame | None = None)¶
Apply a new predictor categorization to the datamart tables
In certain plots, we use the predictor categorization to indicate what ‘kind’ a certain predictor is, such as IH, Customer, etc. Call this method with a custom Polars Expression (or a method that returns one) or a simple mapping and it will be applied to the predictor data (and the combined dataset too). When the categorization provides no match, the existing categories are kept as they are.
For a reference implementation of a custom predictor categorization, refer to pdstools.utils.cdh_utils.default_predictor_categorization.
- Parameters:
categorization (Union[pl.Expr, Callable[..., pl.Expr], Dict[str, Union[str, List[str]]]]) – A Polars Expression (or method that returns one) that returns the predictor categories. Should be based on Polars’ when.then.otherwise syntax. Alternatively can be a dictionary of categories to (list of) string matches which can be either exact (the default) or regular expressions. By default, pdstools.utils.cdh_utils.default_predictor_categorization is used.
use_regexp (bool, optional) – Treat the mapping patterns in the categorization dictionary as regular expressions rather than plain strings. When treated as regular expressions, they will be interpreted in non-strict mode, so invalid expressions will return in no match. See https://docs.pola.rs/api/python/stable/reference/series/api/polars.Series.str.contains.html for exact behavior of the regular expressions. By default, False
df (Optional[pl.LazyFrame], optional) – A Polars Lazyframe to apply the categorization to. If not provided, applies it over the predictor data and combined datasets. By default, None
See also
pdstools.utils.cdh_utils.default_predictor_categorizationThe default method
Examples
>>> dm = ADMDatamart(my_data) #uses the OOTB predictor categorization
>>> # Uses a custom Polars expression to set the categories >>> dm.apply_predictor_categorization(categorization=pl.when( >>> pl.col("PredictorName").cast(pl.Utf8).str.contains("Propensity") >>> ).then(pl.lit("External Model") >>> )
>>> # Uses a simple dictionary to set the categories >>> dm.apply_predictor_categorization(categorization={ >>> "External Model" : ["Score", "Propensity"]} >>> )
- save_data(path: os.PathLike | str = '.', selected_model_ids: List[str] | None = None) Tuple[pathlib.Path | None, pathlib.Path | None]¶
Caches model_data and predictor_data to files.
- property unique_channels¶
A consistently ordered set of unique channels in the data
Used for making the color schemes in different plots consistent
- property unique_configurations¶
A consistently ordered set of unique configurations in the data
Used for making the color schemes in different plots consistent
- property unique_channel_direction¶
A consistently ordered set of unique channel+direction combos in the data Used for making the color schemes in different plots consistent
- property unique_configuration_channel_direction¶
A consistently ordered set of unique configuration+channel+direction Used for making the color schemes in different plots consistent
- property unique_predictor_categories¶
A consistently ordered set of unique predictor categories in the data Used for making the color schemes in different plots consistent
- classmethod _minMaxScoresPerModel(bin_data: polars.LazyFrame) polars.LazyFrame¶
- Parameters:
bin_data (polars.LazyFrame)
- Return type:
polars.LazyFrame
- active_ranges(model_ids: str | List[str] | None = None) polars.LazyFrame¶
Calculate the active, reachable bins in classifiers.
The classifiers exported by Pega contain (in certain product versions) more than the bins that can be reached given the current state of the predictors. This method first calculates the min and max score range from the predictor log odds, then maps that to the interval boundaries of the classifier(s) to find the min and max index.
It returns a LazyFrame with the score min/max, the min/max index, as well as the AUC as reported in the datamart data, when calculated from the full range, and when calculated from the reachable bins only.
This information can be used in the Health Check documents or when verifying the AUC numbers from the datamart.
- Parameters:
model_ids (Optional[Union[str, List[str]]], optional) – An optional list of model id’s, or just a single one, to report on. When not given, the information is returned for all models.
- Returns:
A table with all the index and AUC information for all the models with the following fields:
Model Identification: - ModelID - The unique identifier for the model
AUC Metrics: - AUC_Datamart - The AUC value as reported in the datamart - AUC_FullRange - The AUC calculated from the full range of bins in the classifier - AUC_ActiveRange - The AUC calculated from only the active/reachable bins
Classifier Information: - Bins - The total number of bins in the classifier - nActivePredictors - The number of active predictors in the model
Log Odds Information (mostly for internal use): - classifierLogOffset - The log offset of the classifier (baseline log odds) - sumMinLogOdds - The sum of minimum log odds across all active predictors - sumMaxLogOdds - The sum of maximum log odds across all active predictors - score_min - The minimum score (normalized sum of log odds including classifier offset) - score_max - The maximum score (normalized sum of log odds including classifier offset)
Active Range Information: - idx_min - The minimum bin index that can be reached given the current binning of all predictors - idx_max - The maximum bin index that can be reached given the current binning of all predictors
- Return type:
pl.LazyFrame
- class IH(data: polars.LazyFrame)¶
Analyze Interaction History data from Pega CDH.
The IH class provides analysis and visualization capabilities for customer interaction data from Pega’s Customer Decision Hub. It supports engagement, conversion, and open rate metrics through customizable outcome label mappings.
- Parameters:
data (polars.LazyFrame)
- data¶
The underlying interaction history data.
- Type:
pl.LazyFrame
- aggregates¶
Aggregation methods accessor.
- Type:
See also
pdstools.adm.ADMDatamartFor ADM model analysis.
pdstools.impactanalyzer.ImpactAnalyzerFor Impact Analyzer experiments.
Examples
>>> from pdstools import IH >>> ih = IH.from_ds_export("interaction_history.zip") >>> ih.aggregates.summary_by_channel().collect() >>> ih.plot.response_count_trend()
- data: polars.LazyFrame¶
- aggregates¶
- plot¶
- classmethod from_ds_export(ih_filename: os.PathLike | str, query: pdstools.utils.types.QUERY | None = None) IH¶
Create an IH instance from a Pega Dataset Export.
- Parameters:
ih_filename (Union[os.PathLike, str]) – Path to the dataset export file (parquet, csv, ndjson, or zip).
query (Optional[QUERY], optional) – Polars expression to filter the data. Default is None.
- Returns:
Initialized IH instance.
- Return type:
Examples
>>> ih = IH.from_ds_export("Data-pxStrategyResult_pxInteractionHistory.zip") >>> ih.data.collect_schema()
- classmethod from_s3() IH¶
- Abstractmethod:
- Return type:
Create an IH instance from S3 data.
Note
Not implemented yet. Please let us know if you would like this!
- Raises:
NotImplementedError – This method is not yet implemented.
- Return type:
- classmethod from_mock_data(days: int = 90, n: int = 100000) IH¶
Create an IH instance with synthetic sample data.
Generates realistic interaction history data for testing and demonstration purposes. Includes inbound (Web) and outbound (Email) channels with configurable propensities and model noise.
- Parameters:
- Returns:
IH instance with synthetic data.
- Return type:
Examples
>>> ih = IH.from_mock_data(days=30, n=10000) >>> ih.data.select("pyChannel").collect().unique()
- get_sequences(positive_outcome_label: str, level: str, outcome_column: str, customerid_column: str) Tuple[List[Tuple[str, Ellipsis]], List[Tuple[int, Ellipsis]], List[collections.defaultdict], List[collections.defaultdict]]¶
Extract customer action sequences for PMI analysis.
Processes customer interaction data to produce action sequences, outcome labels, and frequency counts needed for Pointwise Mutual Information (PMI) calculations.
- Parameters:
- Returns:
customer_sequences (List[Tuple[str, …]]) – Action sequences per customer.
customer_outcomes (List[Tuple[int, …]]) – Binary outcomes (1=positive, 0=other) per sequence position.
count_actions (List[defaultdict]) – Action frequency counts: - [0]: First element counts in bigrams - [1]: Second element counts in bigrams
count_sequences (List[defaultdict]) – Sequence frequency counts: - [0]: All bigrams - [1]: ≥3-grams ending with positive outcome - [2]: Bigrams ending with positive outcome - [3]: Unique n-grams per customer
- Return type:
Tuple[List[Tuple[str, Ellipsis]], List[Tuple[int, Ellipsis]], List[collections.defaultdict], List[collections.defaultdict]]
See also
calculate_pmiCompute PMI scores from sequence counts.
pmi_overviewGenerate PMI analysis summary.
- static calculate_pmi(count_actions: List[collections.defaultdict], count_sequences: List[collections.defaultdict]) Dict[Tuple[str, Ellipsis], float | Dict[str, float | Dict]]¶
Compute PMI scores for action sequences.
Calculates Pointwise Mutual Information scores for bigrams and higher-order n-grams. Higher values indicate more informative or surprising action sequences.
- Parameters:
count_actions (List[defaultdict]) – Action frequency counts from
get_sequences().count_sequences (List[defaultdict]) – Sequence frequency counts from
get_sequences().
- Returns:
PMI scores for sequences: - Bigrams: Direct PMI value (float) - N-grams (n≥3): Dict with ‘average_pmi’ and ‘links’ (constituent bigram PMIs)
- Return type:
See also
get_sequencesExtract sequences for PMI analysis.
pmi_overviewGenerate PMI analysis summary.
Notes
Bigram PMI is calculated as:
\[PMI(a, b) = \log_2 \frac{P(a, b)}{P(a) \cdot P(b)}\]N-gram PMI is the average of constituent bigram PMIs.
- static pmi_overview(ngrams_pmi: Dict[Tuple[str, Ellipsis], float | Dict], count_sequences: List[collections.defaultdict], customer_sequences: List[Tuple[str, Ellipsis]], customer_outcomes: List[Tuple[int, Ellipsis]]) polars.DataFrame¶
Generate PMI analysis summary DataFrame.
Creates a summary of action sequences ranked by their significance in predicting positive outcomes.
- Parameters:
ngrams_pmi (Dict[Tuple[str, ...], Union[float, Dict]]) – PMI scores from
calculate_pmi().count_sequences (List[defaultdict]) – Sequence frequency counts from
get_sequences().customer_sequences (List[Tuple[str, ...]]) – Customer action sequences from
get_sequences().customer_outcomes (List[Tuple[int, ...]]) – Customer outcome sequences from
get_sequences().
- Returns:
Summary DataFrame with columns:
Sequence: Action sequence tuple
Length: Number of actions in sequence
Avg PMI: Average PMI value
Frequency: Total occurrence count
Unique freq: Unique customer count
Score: PMI × log(Frequency), sorted descending
- Return type:
pl.DataFrame
See also
get_sequencesExtract sequences for analysis.
calculate_pmiCompute PMI scores.
Examples
>>> seqs, outs, actions, counts = ih.get_sequences( ... "Conversion", "pyName", "pyOutcome", "pxInteractionID" ... ) >>> pmi = IH.calculate_pmi(actions, counts) >>> IH.pmi_overview(pmi, counts, seqs, outs)
- class ImpactAnalyzer(raw_data: polars.LazyFrame)¶
Analyze and visualize Impact Analyzer experiment results from Pega CDH.
The ImpactAnalyzer class provides analysis and visualization capabilities for NBA (Next-Best-Action) Impact Analyzer experiments. It processes experiment data from Pega’s Customer Decision Hub to compare the effectiveness of different NBA strategies including adaptive models, propensity prioritization, lever usage, and engagement policies.
Data can be loaded from three sources:
PDC exports via
from_pdc(): Uses pre-aggregated experiment data from PDC JSON exports. Value Lift is copied from PDC data as it cannot be re-calculated from the available numbers.VBD exports via
from_vbd(): Reconstructs experiment metrics from raw VBD Actuals or Scenario Planner Actuals data. Allows flexible time ranges and data selection. Value Lift is calculated from ValuePerImpression.Interaction History via
from_ih(): Loads experiment metrics from Interaction History data. Not yet implemented.
\[\text{Engagement Lift} = \frac{\text{SuccessRate}_{test} - \text{SuccessRate}_{control}}{\text{SuccessRate}_{control}}\]\[\text{Value Lift} = \frac{\text{ValueCapture}_{test} - \text{ValueCapture}_{control}}{\text{ValueCapture}_{control}}\]- Parameters:
raw_data (polars.LazyFrame)
- ia_data¶
The underlying experiment data containing control group metrics.
- Type:
pl.LazyFrame
See also
pdstools.adm.ADMDatamartFor ADM model analysis.
pdstools.ih.IHFor Interaction History analysis.
Examples
>>> from pdstools import ImpactAnalyzer >>> ia = ImpactAnalyzer.from_pdc("impact_analyzer_export.json") >>> ia.overall_summary().collect() >>> ia.plot.overview()
- ia_data: polars.LazyFrame¶
- default_ia_experiments¶
Default experiments mapping experiment names to (control, test) group tuples.
- outcome_labels¶
Mapping of metric names to outcome labels used for aggregation.
- default_ia_controlgroups¶
- plot¶
- classmethod from_pdc(pdc_source: os.PathLike | str | List[os.PathLike] | List[str], *, reader: Callable | None = None, query: pdstools.utils.types.QUERY | None = None, return_wide_df: bool = False, return_df: bool = False) ImpactAnalyzer | polars.LazyFrame¶
Create an ImpactAnalyzer instance from PDC JSON export(s).
Loads pre-aggregated experiment data from Pega Decision Central JSON exports. Value Lift metrics are copied directly from the PDC data.
- Parameters:
pdc_source (Union[os.PathLike, str, List[os.PathLike], List[str]]) – Path to PDC JSON file, or a list of paths to concatenate.
reader (Optional[Callable], optional) – Custom function to read source data into a dict. If None, uses standard JSON file reader. Default is None.
query (Optional[QUERY], optional) – Polars expression to filter the data. Default is None.
return_wide_df (bool, optional) – If True, return the raw wide-format data as a LazyFrame for debugging. Default is False.
return_df (bool, optional) – If True, return the processed data as a LazyFrame instead of an ImpactAnalyzer instance. Default is False.
- Returns:
ImpactAnalyzer instance, or LazyFrame if return_df or return_wide_df is True.
- Return type:
ImpactAnalyzer or pl.LazyFrame
- Raises:
ValueError – If an empty list of source files is provided.
Examples
>>> ia = ImpactAnalyzer.from_pdc("CDH_Metrics_ImpactAnalyzer.json") >>> ia.overall_summary().collect()
- classmethod from_vbd(vbd_source: os.PathLike | str, *, return_df: bool = False) ImpactAnalyzer | polars.LazyFrame | None¶
Create an ImpactAnalyzer instance from VBD data.
Processes VBD Actuals or Scenario Planner Actuals data to reconstruct Impact Analyzer experiment metrics. Provides more flexible time ranges and data selection compared to PDC exports.
Value Lift is calculated from ValuePerImpression since raw value data is available in VBD exports.
- Parameters:
vbd_source (Union[os.PathLike, str]) – Path to VBD export file (parquet, csv, ndjson, or zip).
return_df (bool, optional) – If True, return processed data as LazyFrame instead of ImpactAnalyzer instance. Default is False.
- Returns:
ImpactAnalyzer instance, LazyFrame if return_df is True, or None if the source contains no data.
- Return type:
ImpactAnalyzer or pl.LazyFrame or None
Examples
>>> ia = ImpactAnalyzer.from_vbd("ScenarioPlannerActuals.zip") >>> ia.summary_by_channel().collect()
- classmethod from_ih(ih_source: os.PathLike | str, *, return_df: bool = False) ImpactAnalyzer | polars.LazyFrame | None¶
- Abstractmethod:
- Parameters:
ih_source (Union[os.PathLike, str])
return_df (bool)
- Return type:
Union[ImpactAnalyzer, polars.LazyFrame, None]
Create an ImpactAnalyzer instance from Interaction History data.
Note
This method is not yet implemented.
Reconstructs experiment metrics from Interaction History data, allowing analysis of experiments using detailed interaction-level records.
- Parameters:
ih_source (Union[os.PathLike, str]) – Path to Interaction History export file.
return_df (bool, optional) – If True, return processed data as LazyFrame instead of ImpactAnalyzer instance. Default is False.
- Returns:
ImpactAnalyzer instance, LazyFrame if return_df is True, or None if the source contains no data.
- Return type:
ImpactAnalyzer or pl.LazyFrame or None
- Raises:
NotImplementedError – This method is not yet implemented.
- classmethod _normalize_pdc_ia_data(json_data: dict, *, query: pdstools.utils.types.QUERY | None = None, return_wide_df: bool = False) polars.LazyFrame¶
Transform PDC Impact Analyzer JSON into normalized long format.
Converts the hierarchical PDC JSON structure (organized by experiments) into a flat structure organized by control groups with impression and accept counts.
- Parameters:
- Returns:
Normalized data with columns: SnapshotTime, Channel, ControlGroup, Impressions, Accepts, ValuePerImpression, Pega_ValueLift.
- Return type:
pl.LazyFrame
- summary_by_channel() polars.LazyFrame¶
Get experiment summary pivoted by channel.
Returns experiment lift metrics (CTR_Lift and Value_Lift) for each experiment, with one row per channel.
- Returns:
Wide-format summary with columns:
Channel: Channel name
CTR_Lift <Experiment>: Engagement lift for each experiment
Value_Lift <Experiment>: Value lift for each experiment
- Return type:
pl.LazyFrame
See also
overall_summarySummary without channel breakdown.
summarize_experimentsLong-format experiment summary.
Examples
>>> ia.summary_by_channel().collect()
- overall_summary() polars.LazyFrame¶
Get overall experiment summary aggregated across all channels.
Returns experiment lift metrics (CTR_Lift and Value_Lift) for each experiment, aggregated across all data.
- Returns:
Single-row wide-format summary with columns:
CTR_Lift <Experiment>: Engagement lift for each experiment
Value_Lift <Experiment>: Value lift for each experiment
- Return type:
pl.LazyFrame
See also
summary_by_channelSummary with channel breakdown.
summarize_experimentsLong-format experiment summary.
Examples
>>> ia.overall_summary().collect()
- summarize_control_groups(by: List[str] | List[polars.Expr] | str | polars.Expr | None = None, drop_internal_cols: bool = True) polars.LazyFrame¶
Aggregate metrics by control group.
Summarizes impressions, accepts, CTR, and value metrics for each control group, optionally grouped by additional dimensions.
- Parameters:
- Returns:
Aggregated metrics with columns: ControlGroup, Impressions, Accepts, CTR, ValuePerImpression, plus any grouping columns.
- Return type:
pl.LazyFrame
Examples
>>> ia.summarize_control_groups().collect() >>> ia.summarize_control_groups(by="Channel").collect()
- summarize_experiments(by: List[str] | List[polars.Expr] | str | polars.Expr | None = None) polars.LazyFrame¶
Summarize experiment metrics comparing test vs control groups.
Computes lift metrics for each defined experiment by comparing test and control group performance.
Note
Returns all default experiments regardless of whether they are active in the data. Experiments without data will have null values for all metrics (Impressions, Accepts, CTR_Lift, Value_Lift, etc.).
- Parameters:
by (Optional[Union[List[str], List[pl.Expr], str, pl.Expr]], optional) – Column name(s) or expression(s) to group by. Default is None (aggregate all data).
- Returns:
Experiment summary with columns:
Experiment: Experiment name
Test, Control: Control group names for the experiment
Impressions_Test, Impressions_Control: Impression counts (null if not active)
Accepts_Test, Accepts_Control: Accept counts (null if not active)
CTR_Test, CTR_Control: Click-through rates (null if not active)
Control_Fraction: Fraction of impressions in control group
CTR_Lift: Engagement lift (null if experiment not active)
Value_Lift: Value lift (null if experiment not active)
- Return type:
pl.LazyFrame
See also
summarize_control_groupsLower-level control group aggregation.
overall_summaryPivoted overall summary.
summary_by_channelPivoted summary by channel.
Examples
>>> ia.summarize_experiments().collect() >>> ia.summarize_experiments(by="Channel").collect()
- read_ds_export(filename: str | io.BytesIO, path: str | os.PathLike = '.', verbose: bool = False, **reading_opts) polars.LazyFrame | None¶
Read in most out of the box Pega dataset export formats Accepts one of the following formats: - .csv - .json - .zip (zipped json or CSV) - .feather - .ipc - .parquet
It automatically infers the default file names for both model data as well as predictor data. If you supply either ‘modelData’ or ‘predictorData’ as the ‘file’ argument, it will search for them. If you supply the full name of the file in the ‘path’ directory, it will import that instead. Since pdstools V3.x, returns a Polars LazyFrame. Simply call .collect() to get an eager frame.
- Parameters:
filename (Union[str, BytesIO]) – Can be one of the following: - A string with the full path to the file - A string with the name of the file (to be searched in the given path) - A BytesIO object containing the file data (e.g., from an uploaded file in a webapp)
path (str, default = '.') – The location of the file
verbose (bool, default = True) – Whether to print out which file will be imported
- Keyword Arguments:
Any – Any arguments to plug into the scan_* function from Polars.
- Returns:
pl.LazyFrame – The (lazy) dataframe
Examples – >>> df = read_ds_export(filename=’full/path/to/ModelSnapshot.json’) >>> df = read_ds_export(filename=’ModelSnapshot.json’, path=’data/ADMData’) >>> df = read_ds_export(filename=uploaded_file) # Where uploaded_file is a BytesIO object
- Return type:
Optional[polars.LazyFrame]
- class Prediction(df: polars.LazyFrame, *, query: pdstools.utils.types.QUERY | None = None)¶
Monitor and analyze Pega Prediction Studio Predictions.
To initialize this class, either 1. Initialize directly with the df polars LazyFrame 2. Use one of the class methods
This class will read in the data from different sources, properly structure them for further analysis, and apply correct typing and useful renaming.
There is also a “namespace” that you can call from this class:
.plot contains ready-made plots to analyze the prediction data with
- Parameters:
df (pl.LazyFrame) – The Polars LazyFrame representation of the prediction data.
query (QUERY, optional) – An optional query to apply to the input data. For details, see
pdstools.utils.cdh_utils._apply_query().
Examples
>>> pred = Prediction.from_ds_export('/my_export_folder/predictions.zip') >>> pred = Prediction.from_mock_data(days=70) >>> from pdstools import Prediction >>> import polars as pl >>> pred = Prediction( df = pl.scan_parquet('predictions.parquet'), query = {"Class":["DATA-DECISION-REQUEST-CUSTOMER-CDH"]} )
See also
pdstools.prediction.PredictionPlotsThe out of the box plots on the Prediction data
pdstools.utils.cdh_utils._apply_queryHow to query the Prediction class and methods
- predictions: polars.LazyFrame¶
- plot: PredictionPlots¶
- prediction_validity_expr¶
- classmethod from_ds_export(predictions_filename: os.PathLike | str, base_path: os.PathLike | str = '.', *, query: pdstools.utils.types.QUERY | None = None)¶
Import from a Pega Dataset Export of the PR_DATA_DM_SNAPSHOTS table.
- Parameters:
predictions_filename (Union[os.PathLike, str]) – The full path or name (if base_path is given) to the prediction snapshot files
base_path (Union[os.PathLike, str], optional) – A base path to provide if predictions_filename is not given as a full path, by default “.”
query (Optional[QUERY], optional) – An optional argument to filter out selected data, by default None
- Returns:
The properly initialized Prediction class
- Return type:
Examples
>>> from pdstools import Prediction >>> pred = Prediction.from_ds_export('predictions.zip', '/my_export_folder')
Note
By default, the dataset export in Infinity returns a zip file per table. You do not need to open up this zip file! You can simply point to the zip, and this method will be able to read in the underlying data.
See also
pdstools.pega_io.File.read_ds_exportMore information on file compatibility
pdstools.utils.cdh_utils._apply_queryHow to query the Prediction class and methods
- classmethod from_s3()¶
Not implemented yet. Please let us know if you would like this functionality!
- Returns:
The properly initialized Prediction class
- Return type:
- classmethod from_dataflow_export()¶
Import from a data flow, such as the Prediction Studio export. Not implemented yet. Please let us know if you would like this functionality!
- Returns:
The properly initialized Prediction class
- Return type:
- classmethod from_pdc(df: polars.LazyFrame, *, return_df=False, query: pdstools.utils.types.QUERY | None = None)¶
Import from (Pega-internal) PDC data, which is a combination of the PR_DATA_DM_SNAPSHOTS and PR_DATA_DM_ADMMART_MDL_FACT tables.
- Parameters:
df (pl.LazyFrame) – The Polars LazyFrame containing the PDC data
return_df (bool, optional) – If True, returns the processed DataFrame instead of initializing the class, by default False
query (Optional[QUERY], optional) – An optional query to apply to the input data, by default None
- Returns:
Either the initialized Prediction class or the processed DataFrame if return_df is True
- Return type:
Union[Prediction, pl.LazyFrame]
See also
pdstools.utils.cdh_utils._read_pdcMore information on PDC data processing
pdstools.utils.cdh_utils._apply_queryHow to query the Prediction class and methods
- save_data(path: os.PathLike | str = '.') os.PathLike | None¶
Cache predictions to a file.
- Parameters:
path (Union[os.PathLike, str]) – Where to place the file
- Returns:
The path to the cached prediction data file, or None if no data available
- Return type:
Optional[os.PathLike]
- classmethod from_processed_data(df: polars.LazyFrame)¶
Load a Prediction from already-processed data (e.g., from cache).
This bypasses the normal data transformation pipeline and directly assigns the data to self.predictions. Use this when loading data that has already been processed by the Prediction class constructor, such as data saved via save_data().
- Parameters:
df (pl.LazyFrame) – A LazyFrame containing already-processed prediction data with columns like ‘Positives’, ‘CTR’, ‘Performance’, etc. rather than the raw ‘pyPositives’, ‘pyModelType’, etc.
- Returns:
A Prediction instance with the processed data loaded
- Return type:
Examples
>>> # Load from a cached file >>> cached_data = pl.scan_parquet('cached_predictions.parquet') >>> pred = Prediction.from_processed_data(cached_data)
- classmethod from_mock_data(days=70)¶
Create a Prediction instance with mock data for testing and demonstration purposes.
- Parameters:
days (int, optional) – Number of days of mock data to generate, by default 70
- Returns:
The initialized Prediction class with mock data
- Return type:
Examples
>>> from pdstools import Prediction >>> pred = Prediction.from_mock_data(days=30) >>> pred.plot.performance_trend()
- property is_available: bool¶
Check if prediction data is available.
- Returns:
True if prediction data is available, False otherwise
- Return type:
- property is_valid: bool¶
Check if prediction data is valid.
A valid prediction meets the criteria defined in prediction_validity_expr, which requires positive and negative responses in both test and control groups.
- Returns:
True if prediction data is valid, False otherwise
- Return type:
- summary_by_channel(custom_predictions: List[List] | None = None, *, start_date: datetime.datetime | None = None, end_date: datetime.datetime | None = None, window: int | datetime.timedelta | None = None, every: str | None = None, debug: bool = False) polars.LazyFrame¶
Summarize prediction per channel
- Parameters:
custom_predictions (Optional[List[List]], optional) – Optional list with custom prediction name to channel mappings. Each item should be [PredictionName, Channel, Direction, isMultiChannel]. Defaults to None.
start_date (datetime.datetime, optional) – Start date of the summary period. If None (default) uses the end date minus the window, or if both absent, the earliest date in the data
end_date (datetime.datetime, optional) – End date of the summary period. If None (default) uses the start date plus the window, or if both absent, the latest date in the data
window (int or datetime.timedelta, optional) – Number of days to use for the summary period or an explicit timedelta. If None (default) uses the whole period. Can’t be given if start and end date are also given.
every (str, optional) – Optional additional grouping by time period. Format string as in polars.Expr.dt.truncate (https://docs.pola.rs/api/python/stable/reference/expressions/api/polars.Expr.dt.truncate.html), for example “1mo”, “1w”, “1d” for calendar month, week day. Defaults to None.
debug (bool, optional) – If True, enables debug mode for additional logging or outputs. Defaults to False.
- Returns:
Summary across all Predictions as a dataframe with the following fields:
Time and Configuration Fields: - DateRange Min - The minimum date in the summary time range - DateRange Max - The maximum date in the summary time range - Duration - The duration in seconds between the minimum and maximum snapshot times - Prediction: The prediction name - Channel: The channel name - Direction: The direction (e.g., Inbound, Outbound) - ChannelDirectionGroup: Combined Channel/Direction identifier - isValid: Boolean indicating if the prediction data is valid - usesNBAD: Boolean indicating if this is a standard NBAD prediction - isMultiChannel: Boolean indicating if this is a multichannel prediction - ControlPercentage: Percentage of responses in control group - TestPercentage: Percentage of responses in test group
Performance Metrics: - Performance: Weighted model performance (AUC) - Positives: Sum of positive responses - Negatives: Sum of negative responses - Responses: Sum of all responses - Positives_Test: Sum of positive responses in test group - Positives_Control: Sum of positive responses in control group - Positives_NBA: Sum of positive responses in NBA group - Negatives_Test: Sum of negative responses in test group - Negatives_Control: Sum of negative responses in control group - Negatives_NBA: Sum of negative responses in NBA group - CTR: Clickthrough rate (Positives over Positives + Negatives) - CTR_Test: Clickthrough rate for test group (model propensitities) - CTR_Control: Clickthrough rate for control group (random propensities) - CTR_NBA: Clickthrough rate for NBA group (available only when Impact Analyzer is used) - Lift: Lift in Engagement when testing prioritization with just Adaptive Models vs just Random Propensity
Technology Usage Indicators: - usesImpactAnalyzer: Boolean indicating if Impact Analyzer is used
- Return type:
pl.LazyFrame
- overall_summary(custom_predictions: List[List] | None = None, *, start_date: datetime.datetime | None = None, end_date: datetime.datetime | None = None, window: int | datetime.timedelta | None = None, every: str | None = None, debug: bool = False) polars.LazyFrame¶
Overall prediction summary. Only valid prediction data is included.
- Parameters:
custom_predictions (Optional[List[List]], optional) – Optional list with custom prediction name to channel mappings. Each item should be [PredictionName, Channel, Direction, isMultiChannel]. Defaults to None.
start_date (datetime.datetime, optional) – Start date of the summary period. If None (default) uses the end date minus the window, or if both absent, the earliest date in the data
end_date (datetime.datetime, optional) – End date of the summary period. If None (default) uses the start date plus the window, or if both absent, the latest date in the data
window (int or datetime.timedelta, optional) – Number of days to use for the summary period or an explicit timedelta. If None (default) uses the whole period. Can’t be given if start and end date are also given.
every (str, optional) – Optional additional grouping by time period. Format string as in polars.Expr.dt.truncate (https://docs.pola.rs/api/python/stable/reference/expressions/api/polars.Expr.dt.truncate.html), for example “1mo”, “1w”, “1d” for calendar month, week day. Defaults to None.
debug (bool, optional) – If True, enables debug mode for additional logging or outputs. Defaults to False.
- Returns:
Summary across all Predictions as a dataframe with the following fields:
Time and Configuration Fields: - DateRange Min - The minimum date in the summary time range - DateRange Max - The maximum date in the summary time range - Duration - The duration in seconds between the minimum and maximum snapshot times - ControlPercentage: Weighted average percentage of control group responses - TestPercentage: Weighted average percentage of test group responses - usesNBAD: Boolean indicating if any of the predictions is a standard NBAD prediction
Performance Metrics: - Performance: Weighted average performance across all valid channels - Positives Inbound: Sum of positive responses across all valid inbound channels - Positives Outbound: Sum of positive responses across all valid outbound channels - Responses Inbound: Sum of all responses across all valid inbound channels - Responses Outbound: Sum of all responses across all valid outbound channels - Overall Lift: Weighted average lift across all valid channels - Minimum Negative Lift: The lowest negative lift value found
Channel Statistics: - Number of Valid Channels: Count of unique valid channel/direction combinations - Channel with Minimum Negative Lift: Channel with the lowest negative lift value
Technology Usage Indicators: - usesImpactAnalyzer: Boolean indicating if any channel uses Impact Analyzer
- Return type:
pl.LazyFrame
- default_predictor_categorization(x: str | polars.Expr = pl.col('PredictorName')) polars.Expr¶
Function to determine the ‘category’ of a predictor.
It is possible to supply a custom function. This function can accept an optional column as input And as output should be a Polars expression. The most straight-forward way to implement this is with pl.when().then().otherwise(), which you can chain.
By default, this function returns “Primary” whenever there is no ‘.’ anywhere in the name string, otherwise returns the first string before the first period
- Parameters:
x (Union[str, pl.Expr], default = pl.col('PredictorName')) – The column to parse
- Return type:
polars.Expr
- cdh_sample(query: pdstools.utils.types.QUERY | None = None) pdstools.adm.ADMDatamart.ADMDatamart¶
Import a sample dataset from the CDH Sample application
- Parameters:
query (Optional[QUERY], optional) – An optional query to apply to the data, by default None
- Returns:
The ADM Datamart class populated with CDH Sample data
- Return type:
- sample_value_finder(threshold: float | None = None) pdstools.valuefinder.ValueFinder.ValueFinder¶
Import a sample dataset of a Value Finder simulation
This simulation was ran on a stock CDH Sample system.
- Parameters:
threshold (Optional[float], optional) – Optional override of the propensity threshold in the system, by default None
- Returns:
The Value Finder class populated with the Value Finder simulation data
- Return type:
- show_versions(print_output: Literal[True] = True) None¶
- show_versions(print_output: Literal[False] = False) str
Get a list of currently installed versions of pdstools and its dependencies.
- Parameters:
print_output (bool, optional) – If True, print the version information to stdout. If False, return the version information as a string. Default is True.
include_dependencies (bool, optional) – If True, include the versions of dependencies in the output. If False, only include the pdstools version and system information. Default is True.
- Returns:
Version information as a string if print_output is False, else None.
- Return type:
Optional[str]
Examples
>>> from pdstools import show_versions >>> show_versions() --- Version info --- pdstools: 4.0.0-alpha Platform: macOS-14.7-arm64-arm-64bit Python: 3.12.4 (main, Jun 6 2024, 18:26:44) [Clang 15.0.0 (clang-1500.3.9.4)]
— Dependencies — typing_extensions: 4.12.2 polars>=1.9: 1.9.0
— Dependency group: adm — plotly>=5.5.0: 5.24.1
— Dependency group: api — pydantic: 2.9.2 httpx: 0.27.2
- class ValueFinder(df: polars.LazyFrame, *, query: pdstools.utils.types.QUERY | None = None, n_customers: int | None = None, threshold: float | None = None)¶
Analyze the Value Finder dataset for detailed insights
- Parameters:
- df: polars.LazyFrame¶
- nbad_stages = ['Eligibility', 'Applicability', 'Suitability', 'Arbitration']¶
- aggregates¶
- plot¶
- classmethod from_ds_export(filename: str | None = None, base_path: os.PathLike | str = '.', *, query: pdstools.utils.types.QUERY | None = None, n_customers: int | None = None, threshold: float | None = None)¶
- Parameters:
filename (Optional[str])
base_path (Union[os.PathLike, str])
query (Optional[pdstools.utils.types.QUERY])
n_customers (Optional[int])
threshold (Optional[float])
- classmethod from_dataflow_export(files: Iterable[str] | str, *, query: pdstools.utils.types.QUERY | None = None, n_customers: int | None = None, threshold: float | None = None, cache_file_prefix: str = '', extension: Literal['json'] = 'json', compression: Literal['gzip'] = 'gzip', cache_directory: os.PathLike | str = 'cache')¶
- property threshold¶
- save_data(path: os.PathLike | str = '.') pathlib.Path | None¶
Cache the pyValueFinder dataset to a Parquet file
- Parameters:
path (str) – Where to place the file
- Returns:
The paths to the model and predictor data files
- Return type:
(Optional[Path], Optional[Path])