pdstools.pega_io.File

Attributes

Functions

_is_artifact(→ bool)

Return True for OS-generated junk entries (macOS, Windows, etc.).

_clean_artifacts(→ None)

Remove OS-generated artifact files and directories after archive extraction.

_extract_tar(→ str)

Extract a tar archive to a temporary directory and return the path.

_extract_zip(→ str)

Extract a zip archive to a temporary directory and return the path.

_read_from_bytesio(→ polars.LazyFrame)

Read data from a BytesIO object (e.g., from Streamlit file upload).

read_data(→ polars.LazyFrame)

Read data from various file formats and sources.

read_ds_export(→ polars.LazyFrame | None)

Read Pega dataset exports with additional capabilities.

_fill_context_field_nulls(→ polars.LazyFrame)

Fill nulls in context fields to prevent issues in downstream operations.

import_file(→ polars.LazyFrame)

Import a file with Pega-specific schema handling.

read_zipped_file(→ tuple[io.BytesIO, str])

Read a zipped NDJSON file.

read_multi_zip(→ polars.LazyFrame)

Reads multiple zipped ndjson files, and concats them to one Polars dataframe.

get_latest_file(→ str | None)

Convenience method to find the latest model snapshot.

find_files(files_dir, target)

cache_to_file(…)

Very simple convenience function to cache data.

read_dataflow_output(files[, cache_file_name, ...])

Reads the file output of a dataflow run.

Module Contents

logger
_SUPPORTED_EXTENSIONS: set[str]
_is_artifact(name: str) bool

Return True for OS-generated junk entries (macOS, Windows, etc.).

Parameters:

name (str)

Return type:

bool

_clean_artifacts(directory: str) None

Remove OS-generated artifact files and directories after archive extraction.

Polars glob patterns (e.g. **/*.parquet) cannot skip hidden files or __MACOSX resource-fork directories, so we delete them from the extracted tree before scanning.

Parameters:

directory (str)

Return type:

None

_extract_tar(archive_path: pathlib.Path) str

Extract a tar archive to a temporary directory and return the path.

Parameters:

archive_path (pathlib.Path)

Return type:

str

_extract_zip(archive_path: pathlib.Path) str

Extract a zip archive to a temporary directory and return the path.

Parameters:

archive_path (pathlib.Path)

Return type:

str

_read_from_bytesio(file: io.BytesIO, extension: str) polars.LazyFrame

Read data from a BytesIO object (e.g., from Streamlit file upload).

Parameters:
  • file (BytesIO) – The BytesIO object containing file data.

  • extension (str) – The file extension (e.g., ‘.csv’, ‘.json’, ‘.zip’, ‘.gz’).

Returns:

Lazy DataFrame ready for processing.

Return type:

pl.LazyFrame

read_data(path: str | pathlib.Path | io.BytesIO) polars.LazyFrame

Read data from various file formats and sources.

Supports multiple formats: parquet, csv, arrow, feather, ndjson, json, zip, tar, tar.gz, tgz, gz. Handles both individual files and directories (including Hive-partitioned structures). Archives (zip, tar) are automatically extracted to temporary directories. Gzip files (.gz) are automatically decompressed.

Parameters:

path (str, Path, or BytesIO) – Path to a data file, archive, directory, or BytesIO object. When using BytesIO (e.g., from Streamlit file uploads), the object must have a ‘name’ attribute indicating the file extension. Supported formats: - Parquet files or directories - CSV files - Arrow/IPC/Feather files - NDJSON/JSONL files - GZIP compressed files (.gz, .json.gz, .csv.gz, etc.) - ZIP archives including Pega Dataset Export format (extracted automatically) - TAR archives including .tar.gz and .tgz (extracted automatically) - Hive-partitioned directories (scanned recursively)

Returns:

Lazy DataFrame ready for processing. Use .collect() to materialize.

Return type:

pl.LazyFrame

Raises:

ValueError – If no supported data files are found in a directory, or if the file type is not supported.

Examples

Read a parquet file:

>>> df = read_data("data.parquet")

Read from a ZIP archive:

>>> df = read_data("export.zip")

Read from a TAR archive:

>>> df = read_data("export.tar.gz")

Read from a Hive-partitioned directory:

>>> df = read_data("pxDecisionTime_day=08/")

Read a Pega Dataset Export file:

>>> df = read_data("Data-Decision-ADM-ModelSnapshot_pyModelSnapshots_20210101T010000_GMT.zip")

Read a gzip-compressed file:

>>> df = read_data("export.json.gz")
>>> df = read_data("data.csv.gz")

Read from a BytesIO object (e.g., Streamlit upload):

>>> from io import BytesIO
>>> uploaded_file = ...  # BytesIO with 'name' attribute
>>> df = read_data(uploaded_file)

Read a Feather file:

>>> df = read_data("data.feather")

Notes

Pega Dataset Export Support: This function fully supports Pega Dataset Export format (e.g., Data-Decision-ADM-.zip, Data-DM-.zip). These are zip archives containing a data.json file (NDJSON format) and optionally a META-INF/MANIFEST.mf metadata file. The function automatically extracts and reads the data.json file.

Other Notes: - Archives are extracted to temporary directories with automatic cleanup - OS artifacts (__MACOSX, .DS_Store, ._* files) are automatically removed - For directories, the first supported file type found determines the format

read_ds_export(filename: str | os.PathLike | io.BytesIO, path: str | os.PathLike = '.', verbose: bool = False, **reading_opts) polars.LazyFrame | None

Read Pega dataset exports with additional capabilities.

This function extends read_data() with: - Smart file finding: accepts ‘modelData’ or ‘predictorData’ and searches for matching files (ADM-specific) - URL downloads: fetches remote files when local paths are not found (useful for demos and examples) - Schema overrides: applies Pega-specific type corrections (e.g., PYMODELID as string)

For simple file reading without these features, use read_data() instead.

Parameters:
  • filename (str, os.PathLike, or BytesIO) – File identifier. Can be: - Full file path - Generic name like ‘modelData’ or ‘predictorData’ (triggers smart search) - BytesIO object (delegates to read_data)

  • path (str or os.PathLike, default='.') – Directory to search for files (ignored for BytesIO or full paths)

  • verbose (bool, default=False) – Print file selection details

  • **reading_opts – Additional Polars scan_* options. Common options include: - infer_schema_length (int, default=10000): Rows to scan for schema inference - separator (str): CSV delimiter - ignore_errors (bool): Continue on parse errors

Returns:

Lazy dataframe, or None if file not found

Return type:

pl.LazyFrame or None

Examples

Smart file finding:

>>> df = read_ds_export('modelData', path='data/ADMData')

Specific file:

>>> df = read_ds_export('ModelSnapshot_20210101.json', path='data')

URL download:

>>> df = read_ds_export('ModelSnapshot.zip', path='https://example.com/exports')

Schema control:

>>> df = read_ds_export('export.csv', infer_schema_length=200000)
_fill_context_field_nulls(df: polars.LazyFrame) polars.LazyFrame

Fill nulls in context fields to prevent issues in downstream operations.

Context fields (Channel, Direction, Issue, Group, Name) often have nulls in source data which can cause errors in group_by, transpose, and concat_str operations. This function fills nulls with “Unknown” to ensure these operations work correctly.

Note: Treatment is intentionally NOT filled because null Treatment has semantic meaning (no treatment variation exists for that action).

Parameters:

df (pl.LazyFrame) – Input dataframe

Returns:

Dataframe with nulls filled in context fields

Return type:

pl.LazyFrame

import_file(file: str | io.BytesIO, extension: str, **reading_opts) polars.LazyFrame

Import a file with Pega-specific schema handling.

Applies ADM-specific type corrections and schema overrides during import. Used internally by read_ds_export() for backward compatibility with legacy code.

Parameters:
  • file (str or BytesIO) – File path or BytesIO object

  • extension (str) – File extension (e.g., ‘.csv’, ‘.json’, ‘.parquet’)

  • **reading_opts – Polars reading options (infer_schema_length, separator, ignore_errors, etc.)

Returns:

Lazy dataframe with schema corrections applied

Return type:

pl.LazyFrame

read_zipped_file(file: str | io.BytesIO, verbose: bool = False) tuple[io.BytesIO, str]

Read a zipped NDJSON file. Reads a dataset export file as exported and downloaded from Pega. The export file is formatted as a zipped multi-line JSON file. It reads the file, and then returns the file as a BytesIO object.

Parameters:
  • file (str) – The full path to the file

  • verbose (str, default=False) – Whether to print the names of the files within the unzipped file for debugging purposes

Returns:

The raw bytes object to pass through to Polars

Return type:

os.BytesIO

read_multi_zip(files: collections.abc.Iterable[str], zip_type: Literal['gzip'] = 'gzip', add_original_file_name: bool = False, verbose: bool = True) polars.LazyFrame

Reads multiple zipped ndjson files, and concats them to one Polars dataframe.

Parameters:
  • files (list) – The list of files to concat

  • zip_type (Literal['gzip']) – At this point, only ‘gzip’ is supported

  • verbose (bool, default = True) – Whether to print out the progress of the import

  • add_original_file_name (bool)

Return type:

polars.LazyFrame

get_latest_file(path: str | os.PathLike, target: str, verbose: bool = False) str | None

Convenience method to find the latest model snapshot. It has a set of default names to search for and finds all files who match it. Once it finds all matching files in the directory, it chooses the most recent one. Supports [“.json”, “.csv”, “.zip”, “.parquet”, “.feather”, “.ipc”]. Needs a path to the directory and a target of either ‘modelData’ or ‘predictorData’.

Parameters:
  • path (str) – The filepath where the data is stored

  • target (str in ['model_data', 'predictor_data', 'prediction_data']) – Whether to look for data about the predictive models (‘model_data’) or the predictor bins (‘predictor_data’)

  • verbose (bool, default = False) – Whether to print all found files before comparing name criteria for debugging purposes

Returns:

The most recent file given the file name criteria.

Return type:

str

find_files(files_dir, target)
cache_to_file(df: polars.DataFrame | polars.LazyFrame, path: str | os.PathLike, name: str, cache_type: Literal['parquet'] = 'parquet', compression: polars._typing.ParquetCompression = 'uncompressed') pathlib.Path
cache_to_file(df: polars.DataFrame | polars.LazyFrame, path: str | os.PathLike, name: str, cache_type: Literal['ipc'] = 'ipc', compression: polars._typing.IpcCompression = 'uncompressed') pathlib.Path

Very simple convenience function to cache data. Caches in arrow format for very fast reading.

Parameters:
  • df (pl.DataFrame) – The dataframe to cache

  • path (os.PathLike) – The location to cache the data

  • name (str) – The name to give to the file

  • cache_type (str) – The type of file to export. Default is IPC, also supports parquet

  • compression (str) – The compression to apply, default is uncompressed

Returns:

The filepath to the cached file

Return type:

os.PathLike

read_dataflow_output(files: collections.abc.Iterable[str] | str, cache_file_name: str | None = None, *, extension: Literal['json'] = 'json', compression: Literal['gzip'] = 'gzip', cache_directory: str | os.PathLike = 'cache')

Reads the file output of a dataflow run.

By default, the Prediction Studio data export also uses dataflows, thus this function can be used for those use cases as well.

Because dataflows have good resiliancy, they can produce a great number of files. By default, every few seconds each dataflow node writes a file for each partition. While this helps the system stay healthy, it is a bit more difficult to consume. This function can take in a list of files (or a glob pattern), and read in all of the files.

If cache_file_name is specified, this function caches the data it read before as a parquet file. This not only reduces the file size, it is also very fast. When this function is run and there is a pre-existing parquet file with the name specified in cache_file_name, it will read all of the files that weren’t read in before and add it to the parquet file. If no new files are found, it simply returns the contents of that parquet file - significantly speeding up operations.

In a future version, the functionality of this function will be extended to also read from S3 or other remote file systems directly using the same caching method.

Parameters:
  • files (Union[str, Iterable[str]]) – An iterable (list or a glob) of file strings to read. If a string is provided, we call glob() on it to find all files corresponding

  • cache_file_name (str, Optional) – If given, caches the files to a file with the given name. If None, does not use the cache at all

  • extension (Literal["json"]) – The extension of the files, by default “json”

  • compression (Literal["gzip"]) – The compression of the files, by default “gzip”

  • cache_directory (os.PathLike) – The file path to cache the previously read files

  • Usage

  • -----

  • glob (>>> from glob import)

  • read_dataflow_output(files=glob("model_snapshots_*.json")) (>>>)