pdstools

Pega Data Scientist Tools Python library

Submodules

Classes

ADMDatamart

Monitor and analyze ADM data from the Pega Datamart.

Prediction

Monitor Pega Prediction Studio Predictions

ValueFinder

Analyze the Value Finder dataset for detailed insights

Functions

read_ds_export(→ Optional[polars.LazyFrame])

Read in most out of the box Pega dataset export formats

default_predictor_categorization() → polars.Expr)

Function to determine the 'category' of a predictor.

cdh_sample(→ pdstools.adm.ADMDatamart.ADMDatamart)

Import a sample dataset from the CDH Sample application

sample_value_finder(...)

Import a sample dataset of a Value Finder simulation

show_versions(…)

Get a list of currently installed versions of pdstools and its dependencies.

Package Contents

class ADMDatamart(model_df: polars.LazyFrame | None = None, predictor_df: polars.LazyFrame | None = None, *, query: pdstools.utils.types.QUERY | None = None, extract_pyname_keys: bool = True)

Monitor and analyze ADM data from the Pega Datamart.

To initialize this class, either 1. Initialize directly with the model_df and predictor_df polars LazyFrames 2. Use one of the class methods: from_ds_export, from_s3 or from_dataflow_export

This class will read in the data from different sources, properly structure them from further analysis, and apply correct typing and useful renaming.

There is also a few “namespaces” that you can call from this class:

  • .plot contains ready-made plots to analyze the data with

  • .aggregates contains mostly internal data aggregations queries

  • .agb contains analysis utilities for Adaptive Gradient Boosting models

  • .generate leads to some ready-made reports, such as the Health Check

  • .bin_aggregator allows you to compare the bins across various models

Parameters:
  • model_df (pl.LazyFrame, optional) – The Polars LazyFrame representation of the model snapshot table.

  • predictor_df (pl.LazyFrame, optional) – The Polars LazyFrame represenation of the predictor binning table.

  • query (QUERY, optional) – An optional query to apply to the input data. For details, see pdstools.utils.cdh_utils._apply_query().

  • extract_pyname_keys (bool, default = True) – Whether to extract extra keys from the pyName column. In older Pega versions, this contained pyTreatment among other (customizable) fields. By default True

Examples

>>> from pdstools import ADMDatamart
>>> from glob import glob
>>> dm = ADMDatamart(
         model_df = pl.scan_parquet('models.parquet'),
         predictor_df = pl.scan_parquet('predictors.parquet')
         query = {"Configuration":["Web_Click_Through"]}
         )
>>> dm = ADMDatamart.from_ds_export(base_path='/my_export_folder')
>>> dm = ADMDatamart.from_s3("pega_export")
>>> dm = ADMDatamart.from_dataflow_export(glob("data/models*"), glob("data/preds*"))

Note

This class depends on two datasets:

  • pyModelSnapshots corresponds to the model_data attribute

  • pyADMPredictorSnapshots corresponds to the predictor_data attribute

For instructions on how to download these datasets, please refer to the following article: https://docs.pega.com/bundle/platform/page/platform/decision-management/exporting-monitoring-database.html

See also

pdstools.adm.Plots

The out of the box plots on the Datamart data

pdstools.adm.Reports

Methods to generate the Health Check and Model Report

pdstools.utils.cdh_utils._apply_query

How to query the ADMDatamart class and methods

model_data: polars.LazyFrame | None
predictor_data: polars.LazyFrame | None
combined_data: polars.LazyFrame | None
plot: pdstools.adm.Plots.Plots
aggregates: pdstools.adm.Aggregates.Aggregates
agb: pdstools.adm.ADMTrees.AGB
generate: pdstools.adm.Reports.Reports
cdh_guidelines: pdstools.adm.CDH_Guidelines.CDHGuidelines
bin_aggregator: pdstools.adm.BinAggregator.BinAggregator
context_keys: List[str] = ['Channel', 'Direction', 'Issue', 'Group', 'Name']
classmethod from_ds_export(model_filename: str | None = None, predictor_filename: str | None = None, base_path: os.PathLike | str = '.', *, query: pdstools.utils.types.QUERY | None = None, extract_pyname_keys: bool = True)

Import the ADMDatamart class from a Pega Dataset Export

Parameters:
  • model_filename (Optional[str], optional) – The full path or name (if base_path is given) to the model snapshot files, by default None

  • predictor_filename (Optional[str], optional) – The full path or name (if base_path is given) to the predictor binning snapshot files, by default None

  • base_path (Union[os.PathLike, str], optional) – A base path to provide so that we can automatically find the most recent files for both the model and predictor snapshots, if model_filename and predictor_filename are not given as full paths, by default “.”

  • query (Optional[QUERY], optional) – An optional argument to filter out selected data, by default None

  • extract_pyname_keys (bool, optional) – Whether to extract additional keys from the pyName column, by default True

Returns:

The properly initialized ADMDatamart class

Return type:

ADMDatamart

Examples

>>> from pdstools import ADMDatamart
>>> # To automatically find the most recent files in the 'my_export_folder' dir:
>>> dm = ADMDatamart.from_ds_export(base_path='/my_export_folder')
>>> # To specify individual files:
>>> dm = ADMDatamart.from_ds_export(
        model_df='/Downloads/model_snapshots.parquet',
        predictor_df = '/Downloads/predictor_snapshots.parquet'
        )

Note

By default, the dataset export in Infinity returns a zip file per table. You do not need to open up this zip file! You can simply point to the zip, and this method will be able to read in the underlying data.

See also

pdstools.pega_io.File.read_ds_export

More information on file compatibility

pdstools.utils.cdh_utils._apply_query

How to query the ADMDatamart class and methods

classmethod from_s3()

Not implemented yet. Please let us know if you would like this functionality!

classmethod from_dataflow_export(model_data_files: Iterable[str] | str, predictor_data_files: Iterable[str] | str, *, query: pdstools.utils.types.QUERY | None = None, extract_pyname_keys: bool = True, cache_file_prefix: str = '', extension: Literal['json'] = 'json', compression: Literal['gzip'] = 'gzip', cache_directory: os.PathLike | str = 'cache')

Read in data generated by a data flow, such as the Prediction Studio export.

Dataflows are able to export data from and to various sources. As they are meant to be used in production, they are highly resiliant. For every partition and every node, a dataflow will output a small json file every few seconds. While this is great for production loads, it can be a bit more tricky to read in the data for smaller-scale and ad-hoc analyses.

This method aims to make the ingestion of such highly partitioned data easier. It reads in every individual small json file that the dataflow has output, and caches them to a parquet file in the cache_directory folder. As such, if you re-run this method later with more data added since the last export, we will not read in from the (slow) dataflow files, but rather from the (much faster) cache.

Parameters:
  • model_data_files (Union[Iterable[str], str]) – A list of files to read in as the model snapshots

  • predictor_data_files (Union[Iterable[str], str]) – A list of files to read in as the predictor snapshots

  • query (Optional[QUERY], optional) – A, by default None

  • extract_pyname_keys (bool, optional) – Whether to extract extra keys from the pyName column, by default True

  • cache_file_prefix (str, optional) – An optional prefix for the cache files, by default “”

  • extension (Literal["json"], optional) – The extension of the source data, by default “json”

  • compression (Literal["gzip"], optional) – The compression of the source files, by default “gzip”

  • cache_directory (Union[os.PathLike, str], optional) – Where to store the cached files, by default “cache”

Returns:

An initialized instance of the datamart class

Return type:

ADMDatamart

Examples

>>> from pdstools import ADMDatamart
>>> import glob
>>> dm = ADMDatamart.from_dataflow_export(glob("data/models*"), glob("data/preds*"))

See also

pdstools.utils.cdh_utils._apply_query

How to query the ADMDatamart class and methods

glob

Makes creating lists of files much easier

_validate_model_data(df: polars.LazyFrame | None, query: pdstools.utils.types.QUERY | None = None, extract_pyname_keys: bool = True) polars.LazyFrame | None

Internal method to validate model data

Parameters:
  • df (Optional[polars.LazyFrame])

  • query (Optional[pdstools.utils.types.QUERY])

  • extract_pyname_keys (bool)

Return type:

Optional[polars.LazyFrame]

_validate_predictor_data(df: polars.LazyFrame | None) polars.LazyFrame | None

Internal method to validate predictor data

Parameters:

df (Optional[polars.LazyFrame])

Return type:

Optional[polars.LazyFrame]

apply_predictor_categorization(df: polars.LazyFrame | None = None, categorization: polars.Expr | Callable[Ellipsis, polars.Expr] = cdh_utils.default_predictor_categorization)

Apply a new predictor categorization to the datamart tables

In certain plots, we use the predictor categorization to indicate what ‘kind’ a certain predictor is, such as IH, Customer, etc. Call this method with a custom Polars Expression (or a method that returns one) - and it will be applied to the predictor data (and the combined dataset too).

For a reference implementation of a custom predictor categorization, refer to pdstools.utils.cdh_utils.default_predictor_categorization.

Parameters:
  • df (Optional[pl.LazyFrame], optional) – A Polars Lazyframe to apply the categorization to. If not provided, applies it over the predictor data and combined datasets. By default, None

  • categorization (Union[pl.Expr, Callable[..., pl.Expr]]) – A polars Expression (or method that returns one) to apply the mapping with. Should be based on Polars’ when.then.otherwise syntax. By default, pdstools.utils.cdh_utils.default_predictor_categorization

Examples

>>> dm = ADMDatamart(my_data) #uses the OOTB predictor categorization
>>> dm.apply_predictor_categorization(categorization=pl.when(
>>> pl.col("PredictorName").cast(pl.Utf8).str.contains("Propensity")
>>> ).then(pl.lit("External Model")
>>> ).otherwise(pl.lit("Adaptive Model)")
>>> # Now, every subsequent plot will use the custom categorization
save_data(path: os.PathLike | str = '.', selected_model_ids: List[str] | None = None) Tuple[pathlib.Path | None, pathlib.Path | None]

Caches model_data and predictor_data to files.

Parameters:
  • path (str) – Where to place the files

  • selected_model_ids (List[str]) – Optional list of model IDs to restrict to

Returns:

The paths to the model and predictor data files

Return type:

(Optional[Path], Optional[Path])

property unique_channels

A consistently ordered set of unique channels in the data

Used for making the color schemes in different plots consistent

property unique_configurations

A consistently ordered set of unique configurations in the data

Used for making the color schemes in different plots consistent

property unique_channel_direction

A consistently ordered set of unique channel+direction combos in the data Used for making the color schemes in different plots consistent

property unique_configuration_channel_direction

A consistently ordered set of unique configuration+channel+direction Used for making the color schemes in different plots consistent

property unique_predictor_categories

A consistently ordered set of unique predictor categories in the data Used for making the color schemes in different plots consistent

read_ds_export(filename: str | io.BytesIO, path: str | os.PathLike = '.', verbose: bool = False, **reading_opts) polars.LazyFrame | None

Read in most out of the box Pega dataset export formats Accepts one of the following formats: - .csv - .json - .zip (zipped json or CSV) - .feather - .ipc - .parquet

It automatically infers the default file names for both model data as well as predictor data. If you supply either ‘modelData’ or ‘predictorData’ as the ‘file’ argument, it will search for them. If you supply the full name of the file in the ‘path’ directory, it will import that instead. Since pdstools V3.x, returns a Polars LazyFrame. Simply call .collect() to get an eager frame.

Parameters:
  • filename (Union[str, BytesIO]) – Can be one of the following: - A string with the full path to the file - A string with the name of the file (to be searched in the given path) - A BytesIO object containing the file data (e.g., from an uploaded file in a webapp)

  • path (str, default = '.') – The location of the file

  • verbose (bool, default = True) – Whether to print out which file will be imported

Keyword Arguments:

Any – Any arguments to plug into the scan_* function from Polars.

Returns:

  • pl.LazyFrame – The (lazy) dataframe

  • Examples – >>> df = read_ds_export(filename=’full/path/to/ModelSnapshot.json’) >>> df = read_ds_export(filename=’ModelSnapshot.json’, path=’data/ADMData’) >>> df = read_ds_export(filename=uploaded_file) # Where uploaded_file is a BytesIO object

Return type:

Optional[polars.LazyFrame]

class Prediction(df: polars.LazyFrame)

Monitor Pega Prediction Studio Predictions

Parameters:

df (polars.LazyFrame)

predictions: polars.LazyFrame
plot: PredictionPlots
prediction_validity_expr
cdh_guidelines
static from_mock_data(days=70)
property is_available: bool
Return type:

bool

property is_valid: bool
Return type:

bool

summary_by_channel(custom_predictions: List[List] | None = None, by_period: str = None) polars.LazyFrame

Summarize prediction per channel

Parameters:
  • custom_predictions (Optional[List[CDH_Guidelines.NBAD_Prediction]], optional) – Optional list with custom prediction name to channel mappings. Defaults to None.

  • by_period (str, optional) – Optional grouping by time period. Format string as in polars.Expr.dt.truncate (https://docs.pola.rs/api/python/stable/reference/expressions/api/polars.Expr.dt.truncate.html), for example “1mo”, “1w”, “1d” for calendar month, week day. If provided, creates a new Period column with the truncated date/time. Defaults to None.

Returns:

Dataframe with prediction summary (validity, numbers in test, control etc.)

Return type:

pl.LazyFrame

overall_summary(custom_predictions: List[List] | None = None, by_period: str = None) polars.LazyFrame

Overall prediction summary. Only valid prediction data is included.

Parameters:
  • custom_predictions (Optional[List[CDH_Guidelines.NBAD_Prediction]], optional) – Optional list with custom prediction name to channel mappings. Defaults to None.

  • by_period (str, optional) – Optional grouping by time period. Format string as in polars.Expr.dt.truncate (https://docs.pola.rs/api/python/stable/reference/expressions/api/polars.Expr.dt.truncate.html), for example “1mo”, “1w”, “1d” for calendar month, week day. If provided, creates a new Period column with the truncated date/time. Defaults to None.

Returns:

Summary across all valid predictions as a dataframe

Return type:

pl.LazyFrame

default_predictor_categorization(x: str | polars.Expr = pl.col('PredictorName')) polars.Expr

Function to determine the ‘category’ of a predictor.

It is possible to supply a custom function. This function can accept an optional column as input And as output should be a Polars expression. The most straight-forward way to implement this is with pl.when().then().otherwise(), which you can chain.

By default, this function returns “Primary” whenever there is no ‘.’ anywhere in the name string, otherwise returns the first string before the first period

Parameters:

x (Union[str, pl.Expr], default = pl.col('PredictorName')) – The column to parse

Return type:

polars.Expr

cdh_sample(query: pdstools.utils.types.QUERY | None = None) pdstools.adm.ADMDatamart.ADMDatamart

Import a sample dataset from the CDH Sample application

Parameters:

query (Optional[QUERY], optional) – An optional query to apply to the data, by default None

Returns:

The ADM Datamart class populated with CDH Sample data

Return type:

ADMDatamart

sample_value_finder(threshold: float | None = None) pdstools.valuefinder.ValueFinder.ValueFinder

Import a sample dataset of a Value Finder simulation

This simulation was ran on a stock CDH Sample system.

Parameters:

threshold (Optional[float], optional) – Optional override of the propensity threshold in the system, by default None

Returns:

The Value Finder class populated with the Value Finder simulation data

Return type:

ValueFinder

show_versions(print_output: Literal[True] = True) None
show_versions(print_output: Literal[False] = False) str

Get a list of currently installed versions of pdstools and its dependencies.

Parameters:

print_output (bool, optional) – If True, print the version information to stdout. If False, return the version information as a string. Default is True.

Returns:

Version information as a string if print_output is False, else None.

Return type:

Optional[str]

Examples

>>> from pdstools import show_versions
>>> show_versions()
--- Version info ---
pdstools: 4.0.0-alpha
Platform: macOS-14.7-arm64-arm-64bit
Python: 3.12.4 (main, Jun  6 2024, 18:26:44) [Clang 15.0.0 (clang-1500.3.9.4)]

— Dependencies — typing_extensions: 4.12.2 polars>=1.9: 1.9.0

— Dependency group: adm — plotly>=5.5.0: 5.24.1

— Dependency group: api — pydantic: 2.9.2 httpx: 0.27.2

class ValueFinder(df: polars.LazyFrame, *, query: pdstools.utils.types.QUERY | None = None, n_customers: int | None = None, threshold: float | None = None)

Analyze the Value Finder dataset for detailed insights

Parameters:
  • df (polars.LazyFrame)

  • query (Optional[pdstools.utils.types.QUERY])

  • n_customers (Optional[int])

  • threshold (Optional[float])

df: polars.LazyFrame
n_customers: int
nbad_stages = ['Eligibility', 'Applicability', 'Suitability', 'Arbitration']
aggregates
plot
classmethod from_ds_export(filename: str | None = None, base_path: os.PathLike | str = '.', *, query: pdstools.utils.types.QUERY | None = None, n_customers: int | None = None, threshold: float | None = None)
Parameters:
  • filename (Optional[str])

  • base_path (Union[os.PathLike, str])

  • query (Optional[pdstools.utils.types.QUERY])

  • n_customers (Optional[int])

  • threshold (Optional[float])

classmethod from_dataflow_export(files: Iterable[str] | str, *, query: pdstools.utils.types.QUERY | None = None, n_customers: int | None = None, threshold: float | None = None, cache_file_prefix: str = '', extension: Literal['json'] = 'json', compression: Literal['gzip'] = 'gzip', cache_directory: os.PathLike | str = 'cache')
Parameters:
  • files (Union[Iterable[str], str])

  • query (Optional[pdstools.utils.types.QUERY])

  • n_customers (Optional[int])

  • threshold (Optional[float])

  • cache_file_prefix (str)

  • extension (Literal['json'])

  • compression (Literal['gzip'])

  • cache_directory (Union[os.PathLike, str])

set_threshold(new_threshold: float | None = None)
Parameters:

new_threshold (Optional[float])

property threshold
save_data(path: os.PathLike | str = '.') pathlib.Path | None

Cache the pyValueFinder dataset to a Parquet file

Parameters:

path (str) – Where to place the file

Returns:

The paths to the model and predictor data files

Return type:

(Optional[Path], Optional[Path])