pdstools.pega_io

Submodules

Classes

Anonymization

A utility class to efficiently anonymize Pega Datasets

S3Data

Functions

_read_client_credential_file(credential_file)

get_token(credential_file[, verify])

Get API credentials to a Pega Platform instance.

cache_to_file(…)

Very simple convenience function to cache data.

find_files(files_dir, target)

get_latest_file(→ str)

Convenience method to find the latest model snapshot.

read_dataflow_output(files[, cache_file_name, ...])

Reads the file output of a dataflow run.

read_ds_export(→ Optional[polars.LazyFrame])

Read in most out of the box Pega dataset export formats

read_multi_zip(→ polars.LazyFrame)

Reads multiple zipped ndjson files, and concats them to one Polars dataframe.

read_zipped_file(→ Tuple[io.BytesIO, str])

Read a zipped NDJSON file.

Package Contents

class Anonymization(path_to_files: str, temporary_path: str = '/tmp/anonymisation', output_file: str = 'anonymised.parquet', skip_columns_with_prefix: List[str] | None = None, batch_size: int = 500, file_limit: int | None = None)

A utility class to efficiently anonymize Pega Datasets

In particular, this class is aimed at anonymizing the Historical Dataset.

Parameters:
  • path_to_files (str)

  • temporary_path (str)

  • output_file (str)

  • skip_columns_with_prefix (Optional[List[str]])

  • batch_size (int)

  • file_limit (Optional[int])

path_to_files
temp_path = '/tmp/anonymisation'
output_file = 'anonymised.parquet'
skip_col_prefix = ('Context_', 'Decision_')
batch_size = 500
file_limit = None
anonymize(verbose: bool = True)

Anonymize the data.

This method performs the anonymization process on the data files specified during initialization. It writes temporary parquet files, processes and writes the parquet files to a single file, and outputs the anonymized data to the specified output file.

Parameters:

verbose (bool, optional) – Whether to print verbose output during the anonymization process. Defaults to True.

static min_max(column_name: str, range: List[Dict[str, float]]) polars.Expr

Normalize the values in a column using the min-max scaling method.

Parameters:
  • column_name (str) – The name of the column to be normalized.

  • range (List[Dict[str, float]]) – A list of dictionaries containing the minimum and maximum values for the column.

Returns:

A Polars expression representing the normalized column.

Return type:

pl.Expr

Examples

>>> range = [{"min": 0.0, "max": 100.0}]
>>> min_max("age", range)
Column "age" normalized using min-max scaling.
static _infer_types(df: polars.DataFrame)

Infers the types of columns in a DataFrame.

Parameters:
  • (pl.DataFrame) (df) – The DataFrame for which to infer column types.

  • df (polars.DataFrame)

Returns:

A dictionary mapping column names to their inferred types. The inferred types can be either “numeric” or “symbolic”.

Return type:

dict

static chunker(files: List[str], size: int)

Split a list of files into chunks of a specified size.

Parameters:
  • (List[str]) (files) – A list of file names.

  • (int) (size) – The size of each chunk.

  • files (List[str])

  • size (int)

Returns:

A generator that yields chunks of files.

Return type:

generator

Examples

>>> files = ['file1.txt', 'file2.txt', 'file3.txt', 'file4.txt', 'file5.txt']
>>> chunks = chunker(files, 2)
>>> for chunk in chunks:
...     print(chunk)
['file1.txt', 'file2.txt']
['file3.txt', 'file4.txt']
['file5.txt']
chunk_to_parquet(files: List[str], i) str

Convert a chunk of files to Parquet format.

Parameters: files (List[str]):

List of file paths to be converted.

temp_path (str):

Path to the temporary directory where the Parquet file will be saved.

i:

Index of the chunk.

Returns: str: File path of the converted Parquet file.

Parameters:

files (List[str])

Return type:

str

preprocess(verbose: bool) List[str]

Preprocesses the files in the specified path.

Parameters:
  • (bool) (verbose) – Set to True to get a progress bar for the file count

  • verbose (bool)

Returns:

list[str]

Return type:

A list of the temporary bundled parquet files

process(chunked_files: List[str], verbose: bool = True)

Process the data for anonymization.

Parameters:
  • (list[str]) (chunked_files) – A list of the bundled temporary parquet files to process

  • (bool) (verbose) – Whether to print verbose output. Default is True.

  • chunked_files (List[str])

  • verbose (bool)

Raises:
  • ImportError: – If polars-hash is not installed.

  • Returns: – None

_read_client_credential_file(credential_file: os.PathLike)
Parameters:

credential_file (os.PathLike)

get_token(credential_file: os.PathLike, verify: bool = True)

Get API credentials to a Pega Platform instance.

After setting up OAuth2 authentication in Dev Studio, you should be able to download a credential file. Simply point this method to that file, and it’ll read the relevant properties and give you your access token.

Parameters:
  • credentialFile (str) – The credential file downloaded after setting up OAuth in a Pega system

  • verify (bool, default = True) – Whether to only allow safe SSL requests. In case you’re connecting to an unsecured API endpoint, you need to explicitly set verify to False, otherwise Python will yell at you.

  • credential_file (os.PathLike)

cache_to_file(df: polars.DataFrame | polars.LazyFrame, path: str | os.PathLike, name: str, cache_type: Literal['parquet'] = 'parquet', compression: polars._typing.ParquetCompression = 'uncompressed') pathlib.Path
cache_to_file(df: polars.DataFrame | polars.LazyFrame, path: str | os.PathLike, name: str, cache_type: Literal['ipc'] = 'ipc', compression: polars._typing.IpcCompression = 'uncompressed') pathlib.Path

Very simple convenience function to cache data. Caches in arrow format for very fast reading.

Parameters:
  • df (pl.DataFrame) – The dataframe to cache

  • path (os.PathLike) – The location to cache the data

  • name (str) – The name to give to the file

  • cache_type (str) – The type of file to export. Default is IPC, also supports parquet

  • compression (str) – The compression to apply, default is uncompressed

Returns:

The filepath to the cached file

Return type:

os.PathLike

find_files(files_dir, target)
get_latest_file(path: str | os.PathLike, target: str, verbose: bool = False) str

Convenience method to find the latest model snapshot. It has a set of default names to search for and finds all files who match it. Once it finds all matching files in the directory, it chooses the most recent one. Supports [“.json”, “.csv”, “.zip”, “.parquet”, “.feather”, “.ipc”]. Needs a path to the directory and a target of either ‘modelData’ or ‘predictorData’.

Parameters:
  • path (str) – The filepath where the data is stored

  • target (str in ['model_data', 'model_data']) – Whether to look for data about the predictive models (‘model_data’) or the predictor bins (‘model_data’)

  • verbose (bool, default = False) – Whether to print all found files before comparing name criteria for debugging purposes

Returns:

The most recent file given the file name criteria.

Return type:

str

read_dataflow_output(files: Iterable[str] | str, cache_file_name: str | None = None, *, extension: Literal['json'] = 'json', compression: Literal['gzip'] = 'gzip', cache_directory: str | os.PathLike = 'cache')

Reads the file output of a dataflow run.

By default, the Prediction Studio data export also uses dataflows, thus this function can be used for those use cases as well.

Because dataflows have good resiliancy, they can produce a great number of files. By default, every few seconds each dataflow node writes a file for each partition. While this helps the system stay healthy, it is a bit more difficult to consume. This function can take in a list of files (or a glob pattern), and read in all of the files.

If cache_file_name is specified, this function caches the data it read before as a parquet file. This not only reduces the file size, it is also very fast. When this function is run and there is a pre-existing parquet file with the name specified in cache_file_name, it will read all of the files that weren’t read in before and add it to the parquet file. If no new files are found, it simply returns the contents of that parquet file - significantly speeding up operations.

In a future version, the functionality of this function will be extended to also read from S3 or other remote file systems directly using the same caching method.

Parameters:
  • files (Union[str, Iterable[str]]) – An iterable (list or a glob) of file strings to read. If a string is provided, we call glob() on it to find all files corresponding

  • cache_file_name (str, Optional) – If given, caches the files to a file with the given name. If None, does not use the cache at all

  • extension (Literal["json"]) – The extension of the files, by default “json”

  • compression (Literal["gzip"]) – The compression of the files, by default “gzip”

  • cache_directory (os.PathLike) – The file path to cache the previously read files

  • Usage

  • -----

  • glob (>>> from glob import)

  • read_dataflow_output(files=glob("model_snapshots_*.json")) (>>>)

read_ds_export(filename: str | io.BytesIO, path: str | os.PathLike = '.', verbose: bool = False, **reading_opts) polars.LazyFrame | None

Read in most out of the box Pega dataset export formats Accepts one of the following formats: - .csv - .json - .zip (zipped json or CSV) - .feather - .ipc - .parquet

It automatically infers the default file names for both model data as well as predictor data. If you supply either ‘modelData’ or ‘predictorData’ as the ‘file’ argument, it will search for them. If you supply the full name of the file in the ‘path’ directory, it will import that instead. Since pdstools V3.x, returns a Polars LazyFrame. Simply call .collect() to get an eager frame.

Parameters:
  • filename (Union[str, BytesIO]) – Can be one of the following: - A string with the full path to the file - A string with the name of the file (to be searched in the given path) - A BytesIO object containing the file data (e.g., from an uploaded file in a webapp)

  • path (str, default = '.') – The location of the file

  • verbose (bool, default = True) – Whether to print out which file will be imported

Keyword Arguments:

Any – Any arguments to plug into the scan_* function from Polars.

Returns:

  • pl.LazyFrame – The (lazy) dataframe

  • Examples – >>> df = read_ds_export(filename=’full/path/to/ModelSnapshot.json’) >>> df = read_ds_export(filename=’ModelSnapshot.json’, path=’data/ADMData’) >>> df = read_ds_export(filename=uploaded_file) # Where uploaded_file is a BytesIO object

Return type:

Optional[polars.LazyFrame]

read_multi_zip(files: Iterable[str], zip_type: Literal['gzip'] = 'gzip', add_original_file_name: bool = False, verbose: bool = True) polars.LazyFrame

Reads multiple zipped ndjson files, and concats them to one Polars dataframe.

Parameters:
  • files (list) – The list of files to concat

  • zip_type (Literal['gzip']) – At this point, only ‘gzip’ is supported

  • verbose (bool, default = True) – Whether to print out the progress of the import

  • add_original_file_name (bool)

Return type:

polars.LazyFrame

read_zipped_file(file: str | io.BytesIO, verbose: bool = False) Tuple[io.BytesIO, str]

Read a zipped NDJSON file. Reads a dataset export file as exported and downloaded from Pega. The export file is formatted as a zipped multi-line JSON file. It reads the file, and then returns the file as a BytesIO object.

Parameters:
  • file (str) – The full path to the file

  • verbose (str, default=False) – Whether to print the names of the files within the unzipped file for debugging purposes

Returns:

The raw bytes object to pass through to Polars

Return type:

os.BytesIO

class S3Data(bucketName: str, temp_dir='./s3_download')
Parameters:

bucketName (str)

bucketName
temp_dir = './s3_download'
async getS3Files(prefix, use_meta_files=False, verbose=True)

OOTB file exports can be written in many very small files.

This method asyncronously retrieves these files, and puts them in a temporary directory.

The logic, if use_meta_files is True, is:

1. Take the prefix, add a . in front of it (‘path/to/files’ becomes (‘path/to/.files’)

  • rsplit on / ([‘path/to’, ‘files’])

  • take the last element (‘files’)

  • add . in front of it (‘.files’)

  • concat back to a filepath (‘path/to/.files’)

  1. fetch all files in the repo that adhere to the prefix (‘path/to/.files*’)

  2. For each file, if the file ends with .meta:

  • rsplit on ‘/’ ([‘path/to’, ‘.files_001.json.meta’])

  • for the last element (just the filename), strip the period and the .meta ([‘path/to’, ‘files_001.json’])

  • concat back to a filepath (‘path/to/files_001.json’)

  1. Import all files in the list

If use_meta_files is False, the logic is as simple as:

1. Import all files starting with the prefix (‘path/to/files’ gives [‘path/to/files_001.json’, ‘path/to/files_002.json’, etc], irrespective of whether a .meta file exists).

Parameters:
  • prefix (str) – The prefix, pointing to the s3 files. See boto3 docs for filter.

  • use_meta_files (bool, default=False) – Whether to use the meta files to check for eligible files

Notes

We don’t import/copy over the .meta files at all. There is an internal function, getNewFiles(), that checks if the filename exists in the local file system. Since the meta files are not really useful for local processing, there’s no sense in copying them over. This logic also still works with the use_meta_files - we first check which files are ‘eligible’ in S3 because they have a meta file, then we check if the ‘real’ files exist on disk. If the file is already on disk, we don’t copy it over.

async getDatamartData(table, datamart_folder: str = 'datamart', verbose: bool = True)

Wrapper method to import one of the tables in the datamart.

Parameters:
  • table (str) – One of the datamart tables. See notes for the full list.

  • datamart_folder (str, default='datamart') – The path to the ‘datamart’ folder within the s3 bucket. Typically, this is the top-level folder in the bucket.

  • verbose (bool, default = True) – Whether to print out the progress of the import

Note

Supports the following tables: {

  • “modelSnapshot”: “Data-Decision-ADM-ModelSnapshot_pzModelSnapshots”,

  • “predictorSnapshot”: “Data-Decision-ADM-PredictorBinningSnapshot_pzADMPredictorSnapshots”,

  • “binaryDistribution”: “Data-DM-BinaryDistribution”,

  • “contingencyTable”: “Data-DM-ContingencyTable”,

  • “histogram”: “Data-DM-Histogram”,

  • “snapshot”: “Data-DM-Snapshot”,

  • “notification”: “Data-DM-Notification”,

}

async get_ADMDatamart(datamart_folder: str = 'datamart', verbose: bool = True)

Get the ADMDatamart class directly from files in S3

In the Prediction Studio settings, you can configure an automatic export of the monitoring tables to a chosen repository. This method interacts with that repository to retrieve files.

Because this is an async function, you need to await it. See Examples for an example on how to use this (in a jupyter notebook).

It checks for files that are already on your local device, but it always concatenates the raw zipped files together when calling the function, which can potentially make it slow. If you don’t always need the latest data, just use pdstools.adm.ADMDatamart.save_data() to save the data to more easily digestible files.

Parameters:
  • verbose (bool) – Whether to print out the progress of the imports

  • datamart_folder (str, default='datamart') – The path to the ‘datamart’ folder within the s3 bucket. Typically, this is the top-level folder in the bucket.

Examples

>>> dm = await S3Datamart(bucketName='testbucket').get_ADMDatamart()