pdstools.pega_io.File

Functions

read_ds_export(→ Optional[polars.LazyFrame])

Read in most out of the box Pega dataset export formats

import_file(→ polars.LazyFrame)

Imports a file using Polars

read_zipped_file(→ Tuple[io.BytesIO, str])

Read a zipped NDJSON file.

read_multi_zip(→ polars.LazyFrame)

Reads multiple zipped ndjson files, and concats them to one Polars dataframe.

get_latest_file(→ str)

Convenience method to find the latest model snapshot.

find_files(files_dir, target)

cache_to_file(…)

Very simple convenience function to cache data.

read_dataflow_output(files[, cache_file_name, ...])

Reads the file output of a dataflow run.

Module Contents

read_ds_export(filename: str | io.BytesIO, path: str | os.PathLike = '.', verbose: bool = False, **reading_opts) polars.LazyFrame | None

Read in most out of the box Pega dataset export formats Accepts one of the following formats: - .csv - .json - .zip (zipped json or CSV) - .feather - .ipc - .parquet

It automatically infers the default file names for both model data as well as predictor data. If you supply either ‘modelData’ or ‘predictorData’ as the ‘file’ argument, it will search for them. If you supply the full name of the file in the ‘path’ directory, it will import that instead. Since pdstools V3.x, returns a Polars LazyFrame. Simply call .collect() to get an eager frame.

Parameters:
  • filename (Union[str, BytesIO]) – Can be one of the following: - A string with the full path to the file - A string with the name of the file (to be searched in the given path) - A BytesIO object containing the file data (e.g., from an uploaded file in a webapp)

  • path (str, default = '.') – The location of the file

  • verbose (bool, default = True) – Whether to print out which file will be imported

Keyword Arguments:

Any – Any arguments to plug into the scan_* function from Polars.

Returns:

  • pl.LazyFrame – The (lazy) dataframe

  • Examples – >>> df = read_ds_export(filename=’full/path/to/ModelSnapshot.json’) >>> df = read_ds_export(filename=’ModelSnapshot.json’, path=’data/ADMData’) >>> df = read_ds_export(filename=uploaded_file) # Where uploaded_file is a BytesIO object

Return type:

Optional[polars.LazyFrame]

import_file(file: str | io.BytesIO, extension: str, **reading_opts) polars.LazyFrame

Imports a file using Polars

Parameters:
  • File (str) – The path to the file, passed directly to the read functions

  • extension (str) – The extension of the file, used to determine which function to use

  • file (Union[str, io.BytesIO])

Returns:

The (imported) lazy dataframe

Return type:

pl.LazyFrame

read_zipped_file(file: str | io.BytesIO, verbose: bool = False) Tuple[io.BytesIO, str]

Read a zipped NDJSON file. Reads a dataset export file as exported and downloaded from Pega. The export file is formatted as a zipped multi-line JSON file. It reads the file, and then returns the file as a BytesIO object.

Parameters:
  • file (str) – The full path to the file

  • verbose (str, default=False) – Whether to print the names of the files within the unzipped file for debugging purposes

Returns:

The raw bytes object to pass through to Polars

Return type:

os.BytesIO

read_multi_zip(files: Iterable[str], zip_type: Literal['gzip'] = 'gzip', add_original_file_name: bool = False, verbose: bool = True) polars.LazyFrame

Reads multiple zipped ndjson files, and concats them to one Polars dataframe.

Parameters:
  • files (list) – The list of files to concat

  • zip_type (Literal['gzip']) – At this point, only ‘gzip’ is supported

  • verbose (bool, default = True) – Whether to print out the progress of the import

  • add_original_file_name (bool)

Return type:

polars.LazyFrame

get_latest_file(path: str | os.PathLike, target: str, verbose: bool = False) str

Convenience method to find the latest model snapshot. It has a set of default names to search for and finds all files who match it. Once it finds all matching files in the directory, it chooses the most recent one. Supports [“.json”, “.csv”, “.zip”, “.parquet”, “.feather”, “.ipc”]. Needs a path to the directory and a target of either ‘modelData’ or ‘predictorData’.

Parameters:
  • path (str) – The filepath where the data is stored

  • target (str in ['model_data', 'model_data']) – Whether to look for data about the predictive models (‘model_data’) or the predictor bins (‘model_data’)

  • verbose (bool, default = False) – Whether to print all found files before comparing name criteria for debugging purposes

Returns:

The most recent file given the file name criteria.

Return type:

str

find_files(files_dir, target)
cache_to_file(df: polars.DataFrame | polars.LazyFrame, path: str | os.PathLike, name: str, cache_type: Literal['parquet'] = 'parquet', compression: polars._typing.ParquetCompression = 'uncompressed') pathlib.Path
cache_to_file(df: polars.DataFrame | polars.LazyFrame, path: str | os.PathLike, name: str, cache_type: Literal['ipc'] = 'ipc', compression: polars._typing.IpcCompression = 'uncompressed') pathlib.Path

Very simple convenience function to cache data. Caches in arrow format for very fast reading.

Parameters:
  • df (pl.DataFrame) – The dataframe to cache

  • path (os.PathLike) – The location to cache the data

  • name (str) – The name to give to the file

  • cache_type (str) – The type of file to export. Default is IPC, also supports parquet

  • compression (str) – The compression to apply, default is uncompressed

Returns:

The filepath to the cached file

Return type:

os.PathLike

read_dataflow_output(files: Iterable[str] | str, cache_file_name: str | None = None, *, extension: Literal['json'] = 'json', compression: Literal['gzip'] = 'gzip', cache_directory: str | os.PathLike = 'cache')

Reads the file output of a dataflow run.

By default, the Prediction Studio data export also uses dataflows, thus this function can be used for those use cases as well.

Because dataflows have good resiliancy, they can produce a great number of files. By default, every few seconds each dataflow node writes a file for each partition. While this helps the system stay healthy, it is a bit more difficult to consume. This function can take in a list of files (or a glob pattern), and read in all of the files.

If cache_file_name is specified, this function caches the data it read before as a parquet file. This not only reduces the file size, it is also very fast. When this function is run and there is a pre-existing parquet file with the name specified in cache_file_name, it will read all of the files that weren’t read in before and add it to the parquet file. If no new files are found, it simply returns the contents of that parquet file - significantly speeding up operations.

In a future version, the functionality of this function will be extended to also read from S3 or other remote file systems directly using the same caching method.

Parameters:
  • files (Union[str, Iterable[str]]) – An iterable (list or a glob) of file strings to read. If a string is provided, we call glob() on it to find all files corresponding

  • cache_file_name (str, Optional) – If given, caches the files to a file with the given name. If None, does not use the cache at all

  • extension (Literal["json"]) – The extension of the files, by default “json”

  • compression (Literal["gzip"]) – The compression of the files, by default “gzip”

  • cache_directory (os.PathLike) – The file path to cache the previously read files

  • Usage

  • -----

  • glob (>>> from glob import)

  • read_dataflow_output(files=glob("model_snapshots_*.json")) (>>>)