pdstools.pega_io.Anonymization

Hash-based anonymisation of Pega Historical Datasets.

Attributes

Classes

Anonymization

Anonymise Pega datasets (in particular, the Historical Dataset).

Module Contents

logger
class Anonymization(path_to_files: str, temporary_path: str | None = None, output_file: str = 'anonymised.parquet', skip_columns_with_prefix: list[str] | tuple[str, Ellipsis] | None = None, batch_size: int = 500, file_limit: int | None = None)

Anonymise Pega datasets (in particular, the Historical Dataset).

Numeric columns are min-max scaled to [0, 1]. Symbolic columns are hashed with SHA-256. Columns whose name starts with one of the skip_columns_with_prefix values are passed through unchanged (by default Context_* and Decision_*).

Once constructed, call anonymize() to run the pipeline. All file system work happens then; __init__ is pure.

Parameters:
  • path_to_files (str) – Glob pattern matching the input files, e.g. "~/Downloads/*.json".

  • temporary_path (str, optional) – Directory used for intermediate parquet chunks. Defaults to a fresh tempfile.mkdtemp directory created on first use.

  • output_file (str, default="anonymised.parquet") – Path to write the final anonymised parquet file.

  • skip_columns_with_prefix (list[str], optional) – Column-name prefixes to leave unchanged. Defaults to ("Context_", "Decision_").

  • batch_size (int, default=500) – Number of input files combined per intermediate parquet chunk.

  • file_limit (int, optional) – Process at most this many files (useful for testing).

Examples

>>> Anonymization(
...     path_to_files="~/Downloads/*.json",
...     batch_size=1000,
...     file_limit=10,
... ).anonymize()
path_to_files
_temp_path: str | None = None
output_file = 'anonymised.parquet'
skip_col_prefix: tuple[str, Ellipsis] = ('Context_', 'Decision_')
batch_size = 500
file_limit = None
property temp_path: str

Lazily create (and cache) the temp directory.

Return type:

str

anonymize(verbose: bool = True) None

Run the full anonymisation pipeline.

Parameters:

verbose (bool, default=True) – Print progress messages between stages.

Return type:

None

static min_max(column_name: str, value_range: list[dict[str, float]]) polars.Expr

Return a min-max scaling expression for column_name.

Parameters:
  • column_name (str) – Column to normalise.

  • value_range (list[dict[str, float]]) – Single-element list whose dict has "min" and "max" keys, matching the shape produced by Polars when collecting a struct of min/max aggregations.

Returns:

(col - min) / (max - min), or the literal 0.0 when min == max.

Return type:

pl.Expr

static _infer_types(df: polars.DataFrame) dict[str, str]

Classify each column as "numeric" or "symbolic".

A column is considered numeric if its values can be cast to Float64 (after replacing empty strings with null).

Parameters:

df (polars.DataFrame)

Return type:

dict[str, str]

static chunker(files: list[str], size: int) collections.abc.Iterator[list[str]]

Yield successive size-element slices of files.

Parameters:
Return type:

collections.abc.Iterator[list[str]]

chunk_to_parquet(files: list[str], i: int) str

Read a chunk of NDJSON files and write them as a parquet file.

Parameters:
  • files (list[str]) – NDJSON file paths to combine.

  • i (int) – Chunk index (used in the output filename).

Returns:

Path to the parquet file produced.

Return type:

str

preprocess(verbose: bool) list[str]

Convert input files into intermediate parquet chunks.

Parameters:

verbose (bool) – Show a tqdm progress bar over chunks (if installed).

Returns:

Paths to the temporary chunked parquet files.

Return type:

list[str]

process(chunked_files: list[str], verbose: bool = True) None

Hash, scale, and write the final anonymised parquet file.

Parameters:
  • chunked_files (list[str]) – Intermediate parquet files produced by preprocess().

  • verbose (bool, default=True) – Print which columns will be hashed / scaled / preserved.

Raises:

MissingDependenciesException – When polars-hash is not installed.

Return type:

None