pdstools.pega_io

Submodules

Classes

Anonymization

Anonymise Pega datasets (in particular, the Historical Dataset).

S3Data

Asynchronous helper for downloading Pega datasets from S3.

Functions

get_token(→ str)

Fetch an OAuth2 access token for a Pega Platform instance.

cache_to_file(…)

Very simple convenience function to cache data.

find_files(→ list[str])

Filter a list of filenames down to those matching a Pega snapshot target.

get_latest_file(→ str | None)

Find the most recent Pega snapshot file matching a target type.

read_data(→ polars.LazyFrame)

Read data from various file formats and sources.

read_dataflow_output(files[, cache_file_name, ...])

Read the file output of a Pega dataflow run.

read_ds_export(→ polars.LazyFrame | None)

Read Pega dataset exports with additional capabilities.

read_multi_zip(→ polars.LazyFrame)

Read multiple gzip-compressed NDJSON files and concatenate them.

read_zipped_file(→ tuple[io.BytesIO, str])

Read a Pega zipped NDJSON dataset export.

Package Contents

class Anonymization(path_to_files: str, temporary_path: str | None = None, output_file: str = 'anonymised.parquet', skip_columns_with_prefix: list[str] | tuple[str, Ellipsis] | None = None, batch_size: int = 500, file_limit: int | None = None)

Anonymise Pega datasets (in particular, the Historical Dataset).

Numeric columns are min-max scaled to [0, 1]. Symbolic columns are hashed with SHA-256. Columns whose name starts with one of the skip_columns_with_prefix values are passed through unchanged (by default Context_* and Decision_*).

Once constructed, call anonymize() to run the pipeline. All file system work happens then; __init__ is pure.

Parameters:
  • path_to_files (str) – Glob pattern matching the input files, e.g. "~/Downloads/*.json".

  • temporary_path (str, optional) – Directory used for intermediate parquet chunks. Defaults to a fresh tempfile.mkdtemp directory created on first use.

  • output_file (str, default="anonymised.parquet") – Path to write the final anonymised parquet file.

  • skip_columns_with_prefix (list[str], optional) – Column-name prefixes to leave unchanged. Defaults to ("Context_", "Decision_").

  • batch_size (int, default=500) – Number of input files combined per intermediate parquet chunk.

  • file_limit (int, optional) – Process at most this many files (useful for testing).

Examples

>>> Anonymization(
...     path_to_files="~/Downloads/*.json",
...     batch_size=1000,
...     file_limit=10,
... ).anonymize()
path_to_files
_temp_path: str | None = None
output_file = 'anonymised.parquet'
skip_col_prefix: tuple[str, Ellipsis] = ('Context_', 'Decision_')
batch_size = 500
file_limit = None
property temp_path: str

Lazily create (and cache) the temp directory.

Return type:

str

anonymize(verbose: bool = True) None

Run the full anonymisation pipeline.

Parameters:

verbose (bool, default=True) – Print progress messages between stages.

Return type:

None

static min_max(column_name: str, value_range: list[dict[str, float]]) polars.Expr

Return a min-max scaling expression for column_name.

Parameters:
  • column_name (str) – Column to normalise.

  • value_range (list[dict[str, float]]) – Single-element list whose dict has "min" and "max" keys, matching the shape produced by Polars when collecting a struct of min/max aggregations.

Returns:

(col - min) / (max - min), or the literal 0.0 when min == max.

Return type:

pl.Expr

static _infer_types(df: polars.DataFrame) dict[str, str]

Classify each column as "numeric" or "symbolic".

A column is considered numeric if its values can be cast to Float64 (after replacing empty strings with null).

Parameters:

df (polars.DataFrame)

Return type:

dict[str, str]

static chunker(files: list[str], size: int) collections.abc.Iterator[list[str]]

Yield successive size-element slices of files.

Parameters:
Return type:

collections.abc.Iterator[list[str]]

chunk_to_parquet(files: list[str], i: int) str

Read a chunk of NDJSON files and write them as a parquet file.

Parameters:
  • files (list[str]) – NDJSON file paths to combine.

  • i (int) – Chunk index (used in the output filename).

Returns:

Path to the parquet file produced.

Return type:

str

preprocess(verbose: bool) list[str]

Convert input files into intermediate parquet chunks.

Parameters:

verbose (bool) – Show a tqdm progress bar over chunks (if installed).

Returns:

Paths to the temporary chunked parquet files.

Return type:

list[str]

process(chunked_files: list[str], verbose: bool = True) None

Hash, scale, and write the final anonymised parquet file.

Parameters:
  • chunked_files (list[str]) – Intermediate parquet files produced by preprocess().

  • verbose (bool, default=True) – Print which columns will be hashed / scaled / preserved.

Raises:

MissingDependenciesException – When polars-hash is not installed.

Return type:

None

get_token(credential_file: os.PathLike, verify: bool = True) str

Fetch an OAuth2 access token for a Pega Platform instance.

After configuring OAuth2 in Dev Studio, download the credential file and point this helper at it.

Parameters:
  • credential_file (PathLike) – Path to the credential file downloaded from Pega.

  • verify (bool, default=True) – Whether to verify TLS certificates. Set to False only for unsecured test endpoints.

Returns:

The bearer access token.

Return type:

str

cache_to_file(df: polars.DataFrame | polars.LazyFrame, path: str | os.PathLike, name: str, cache_type: Literal['parquet'] = 'parquet', compression: polars._typing.ParquetCompression = 'uncompressed') pathlib.Path
cache_to_file(df: polars.DataFrame | polars.LazyFrame, path: str | os.PathLike, name: str, cache_type: Literal['ipc'] = 'ipc', compression: polars._typing.IpcCompression = 'uncompressed') pathlib.Path

Very simple convenience function to cache data. Caches in arrow format for very fast reading.

Parameters:
  • df (pl.DataFrame) – The dataframe to cache

  • path (os.PathLike) – The location to cache the data

  • name (str) – The name to give to the file

  • cache_type (str) – The type of file to export. Default is IPC, also supports parquet

  • compression (str) – The compression to apply, default is uncompressed

Returns:

The filepath to the cached file

Return type:

os.PathLike

find_files(files_dir: collections.abc.Iterable[str], target: str) list[str]

Filter a list of filenames down to those matching a Pega snapshot target.

Parameters:
  • files_dir (Iterable[str]) – Filenames to scan (typically the contents of a directory).

  • target (str) – One of "model_data", "predictor_data", "prediction_data", "value_finder".

Returns:

Filenames whose names match one of the known patterns for target.

Return type:

list[str]

Raises:

ValueError – If target is not one of the supported names.

get_latest_file(path: str | os.PathLike, target: str) str | None

Find the most recent Pega snapshot file matching a target type.

Searches path for files whose name matches one of the well-known Pega snapshot patterns for target, then returns the most recent one (parsed from the filename’s GMT timestamp, falling back to file ctime). Supports .json, .csv, .zip, .parquet, .feather, .ipc, .arrow.

Parameters:
  • path (str or os.PathLike) – Directory to search.

  • target (str) – One of "model_data", "predictor_data", "prediction_data", "value_finder".

Returns:

Full path to the most recent matching file, or None when no matching file exists.

Return type:

str or None

Raises:

ValueError – If target is not one of the supported names.

read_data(path: str | pathlib.Path | io.BytesIO) polars.LazyFrame

Read data from various file formats and sources.

Supports multiple formats: parquet, csv, arrow, feather, ndjson, json, xlsx, xls, zip, tar, tar.gz, tgz, gz. Handles both individual files and directories (including Hive-partitioned structures). Archives (zip, tar) are automatically extracted to temporary directories. Gzip files (.gz) are automatically decompressed.

Parameters:

path (str, Path, or BytesIO) – Path to a data file, archive, directory, or BytesIO object. When using BytesIO (e.g., from Streamlit file uploads), the object must have a ‘name’ attribute indicating the file extension. Supported formats: - Parquet files or directories - CSV files - Arrow/IPC/Feather files - NDJSON/JSONL files - Excel files (.xlsx, .xls — requires the optional fastexcel package) - GZIP compressed files (.gz, .json.gz, .csv.gz, etc.) - ZIP archives including Pega Dataset Export format (extracted automatically) - TAR archives including .tar.gz and .tgz (extracted automatically) - Hive-partitioned directories (scanned recursively)

Returns:

Lazy DataFrame ready for processing. Use .collect() to materialize.

Return type:

pl.LazyFrame

Raises:

ValueError – If no supported data files are found in a directory, or if the file type is not supported.

Examples

Read a parquet file:

>>> df = read_data("data.parquet")

Read from a ZIP archive:

>>> df = read_data("export.zip")

Read from a TAR archive:

>>> df = read_data("export.tar.gz")

Read from a Hive-partitioned directory:

>>> df = read_data("pxDecisionTime_day=08/")

Read a Pega Dataset Export file:

>>> df = read_data("Data-Decision-ADM-ModelSnapshot_pyModelSnapshots_20210101T010000_GMT.zip")

Read a gzip-compressed file:

>>> df = read_data("export.json.gz")
>>> df = read_data("data.csv.gz")

Read from a BytesIO object (e.g., Streamlit upload):

>>> from io import BytesIO
>>> uploaded_file = ...  # BytesIO with 'name' attribute
>>> df = read_data(uploaded_file)

Read a Feather file:

>>> df = read_data("data.feather")

Notes

Pega Dataset Export Support: This function fully supports Pega Dataset Export format (e.g., Data-Decision-ADM-.zip, Data-DM-.zip). These are zip archives containing a data.json file (NDJSON format) and optionally a META-INF/MANIFEST.mf metadata file. The function automatically extracts and reads the data.json file.

Other Notes: - Archives are extracted to temporary directories with automatic cleanup - OS artifacts (__MACOSX, .DS_Store, ._* files) are automatically removed - For directories, the first supported file type found determines the format

read_dataflow_output(files: collections.abc.Iterable[str] | str, cache_file_name: str | None = None, *, cache_directory: str | os.PathLike = 'cache')

Read the file output of a Pega dataflow run.

By default, the Prediction Studio data export also uses dataflows, so this function applies to those exports as well.

Dataflow nodes write many small .json.gz files for each partition. This helper takes a list of files (or a glob pattern) and concatenates them into a single polars.LazyFrame.

If cache_file_name is supplied, results are cached as a parquet file. Subsequent calls only read files that aren’t already in the cache, then update it.

Parameters:
  • files (str or Iterable[str]) – File paths to read. If a string is provided, it’s expanded with glob.glob().

  • cache_file_name (str, optional) – If given, cache results to <cache_directory>/<cache_file_name>.parquet.

  • cache_directory (str or os.PathLike, keyword-only, default="cache") – Directory to store the parquet cache.

Examples

>>> from glob import glob
>>> read_dataflow_output(files=glob("model_snapshots_*.json"))
read_ds_export(filename: str | os.PathLike | io.BytesIO, path: str | os.PathLike = '.', *, infer_schema_length: int = 10000, separator: str = ',', ignore_errors: bool = False) polars.LazyFrame | None

Read Pega dataset exports with additional capabilities.

Extends read_data() with:

  • Smart file finding: accepts "model_data" or "predictor_data" and searches for matching files (ADM-specific).

  • URL downloads: fetches remote files when local paths are not found (useful for demos and examples).

  • Schema overrides: applies Pega-specific type corrections (e.g. PYMODELID as string).

For simple file reading without these features, use read_data().

Parameters:
  • filename (str, os.PathLike, or BytesIO) – File identifier. May be a full file path, a generic name like "model_data" / "predictor_data" (triggers smart search), or a io.BytesIO object (delegates to read_data()).

  • path (str or os.PathLike, default='.') – Directory to search for files (ignored for BytesIO or full paths).

  • infer_schema_length (int, keyword-only, default=10000) – Rows to scan for schema inference (CSV/JSON).

  • separator (str, keyword-only, default=",") – CSV delimiter.

  • ignore_errors (bool, keyword-only, default=False) – Whether to continue on parse errors (CSV).

Returns:

Lazy dataframe, or None if the file could not be located.

Return type:

pl.LazyFrame or None

Examples

>>> df = read_ds_export("model_data", path="data/ADMData")
>>> df = read_ds_export("ModelSnapshot_20210101.json", path="data")
>>> df = read_ds_export(
...     "ModelSnapshot.zip", path="https://example.com/exports"
... )
>>> df = read_ds_export("export.csv", infer_schema_length=200000)
read_multi_zip(files: collections.abc.Iterable[str], *, add_original_file_name: bool = False, verbose: bool = True) polars.LazyFrame

Read multiple gzip-compressed NDJSON files and concatenate them.

Parameters:
  • files (Iterable[str]) – Paths to the .json.gz files to read.

  • add_original_file_name (bool, keyword-only, default=False) – If True, add a file column recording each source path.

  • verbose (bool, keyword-only, default=True) – Show a tqdm progress bar (if installed) and print a completion line when done.

Returns:

Concatenated lazy frame across all input files.

Return type:

pl.LazyFrame

read_zipped_file(file: str | io.BytesIO) tuple[io.BytesIO, str]

Read a Pega zipped NDJSON dataset export.

A Pega dataset export is a zip archive that contains a data.json file (NDJSON format) and optionally a META-INF/MANIFEST.mf metadata file. This helper opens the zip, locates data.json (top-level or nested) and returns its bytes.

Parameters:

file (str or BytesIO) – Path to the zip file, or an in-memory zip buffer.

Returns:

A pair of (buffer, ".json") ready to be passed back into a Polars reader.

Return type:

tuple[BytesIO, str]

Raises:

FileNotFoundError – If the archive does not contain a data.json entry.

class S3Data(bucket_name: str, temp_dir: str = './s3_download')

Asynchronous helper for downloading Pega datasets from S3.

Use this when Prediction Studio is configured to export monitoring tables to an S3 bucket: it downloads the partitioned .json.gz files into a local directory and (optionally) hands them off to pdstools.adm.ADMDatamart.

Parameters:
  • bucket_name (str) – Name of the S3 bucket containing the dataset folder.

  • temp_dir (str, default="./s3_download") – Directory where downloaded files are placed. Should be a folder you don’t mind being filled with cached exports.

bucket_name
temp_dir = './s3_download'
async get_files(prefix: str, *, use_meta_files: bool = False, verbose: bool = True) list[str]

Download files from the bucket whose key starts with prefix.

Pega data exports are split into many small files. This method fetches them concurrently into temp_dir, skipping any file that already exists locally.

When use_meta_files is True, each real export file X is accompanied by a .X.meta sentinel file that signals the export has finished. We list keys under the dotted prefix (path/to/.files), keep entries ending in .meta, and map them back to the underlying file (path/to/files_001.json). .meta files themselves are never copied locally.

When use_meta_files is False, every key under prefix is downloaded.

Parameters:
  • prefix (str) – S3 key prefix (see boto3 Bucket.objects.filter(Prefix=...)).

  • use_meta_files (bool, keyword-only, default=False) – Whether to use companion .meta files to gate downloads.

  • verbose (bool, keyword-only, default=True) – Show a tqdm progress bar (if installed) and print a summary.

Returns:

Local paths of all files that match prefix (newly downloaded and already cached).

Return type:

list[str]

async get_datamart_data(table: str, *, datamart_folder: str = 'datamart', verbose: bool = True) list[str]

Download a single datamart table from S3.

Parameters:
  • table (str) – Datamart table name. One of the keys in DATAMART_TABLE_PREFIXES: "modelSnapshot", "predictorSnapshot", "binaryDistribution", "contingencyTable", "histogram", "snapshot", "notification".

  • datamart_folder (str, keyword-only, default="datamart") – Top-level folder inside the bucket that contains the datamart export.

  • verbose (bool, keyword-only, default=True) – Show download progress.

Returns:

Local paths of the downloaded files.

Return type:

list[str]

async get_adm_datamart(*, datamart_folder: str = 'datamart', verbose: bool = True) pdstools.adm.ADMDatamart.ADMDatamart

Construct an ADMDatamart directly from S3.

Convenience wrapper that downloads the model and predictor snapshot exports and feeds them into ADMDatamart. Because this is an async function, it must be awaited.

Parameters:
  • datamart_folder (str, keyword-only, default="datamart") – Top-level folder inside the bucket that contains the datamart export.

  • verbose (bool, keyword-only, default=True) – Show download progress.

Returns:

A datamart populated with the freshly downloaded files.

Return type:

ADMDatamart

Examples

>>> dm = await S3Data(bucket_name="testbucket").get_adm_datamart()