pdstools.pega_io.File

Attributes

Functions

_read_excel(→ polars.DataFrame)

Read an Excel file via polars, surfacing the optional-dependency requirement.

_is_artifact(→ bool)

Return True for OS-generated junk entries (macOS, Windows, etc.).

_clean_artifacts(→ None)

Remove OS-generated artifact files and directories after archive extraction.

_extract_tar(→ str)

Extract a tar archive to a temporary directory and return the path.

_extract_zip(→ str)

Extract a zip archive to a temporary directory and return the path.

_read_from_bytesio(→ polars.LazyFrame)

Read data from a BytesIO object (e.g., from Streamlit file upload).

read_data(→ polars.LazyFrame)

Read data from various file formats and sources.

read_ds_export(→ polars.LazyFrame | None)

Read Pega dataset exports with additional capabilities.

_fill_context_field_nulls(→ polars.LazyFrame)

Fill nulls in context fields to prevent issues in downstream operations.

_import_file(→ polars.LazyFrame)

Import a file with Pega-specific schema handling.

read_zipped_file(→ tuple[io.BytesIO, str])

Read a Pega zipped NDJSON dataset export.

read_multi_zip(→ polars.LazyFrame)

Read multiple gzip-compressed NDJSON files and concatenate them.

get_latest_file(→ str | None)

Find the most recent Pega snapshot file matching a target type.

find_files(→ list[str])

Filter a list of filenames down to those matching a Pega snapshot target.

cache_to_file(…)

Very simple convenience function to cache data.

read_dataflow_output(files[, cache_file_name, ...])

Read the file output of a Pega dataflow run.

Module Contents

logger
_SUPPORTED_EXTENSIONS: set[str]
_read_excel(path, **kwargs) polars.DataFrame

Read an Excel file via polars, surfacing the optional-dependency requirement.

Polars dispatches Excel reading to fastexcel/calamine and raises a bare ModuleNotFoundError when that backend is missing. We catch it here and re-raise as MissingDependenciesException so the rest of the codebase can rely on a single, package-manager-neutral error path.

Return type:

polars.DataFrame

_is_artifact(name: str) bool

Return True for OS-generated junk entries (macOS, Windows, etc.).

Parameters:

name (str)

Return type:

bool

_clean_artifacts(directory: str) None

Remove OS-generated artifact files and directories after archive extraction.

Polars glob patterns (e.g. **/*.parquet) cannot skip hidden files or __MACOSX resource-fork directories, so we delete them from the extracted tree before scanning.

Parameters:

directory (str)

Return type:

None

_extract_tar(archive_path: pathlib.Path) str

Extract a tar archive to a temporary directory and return the path.

Parameters:

archive_path (pathlib.Path)

Return type:

str

_extract_zip(archive_path: pathlib.Path) str

Extract a zip archive to a temporary directory and return the path.

Parameters:

archive_path (pathlib.Path)

Return type:

str

_read_from_bytesio(file: io.BytesIO, extension: str) polars.LazyFrame

Read data from a BytesIO object (e.g., from Streamlit file upload).

Parameters:
  • file (BytesIO) – The BytesIO object containing file data.

  • extension (str) – The file extension (e.g., ‘.csv’, ‘.json’, ‘.zip’, ‘.gz’).

Returns:

Lazy DataFrame ready for processing.

Return type:

pl.LazyFrame

read_data(path: str | pathlib.Path | io.BytesIO) polars.LazyFrame

Read data from various file formats and sources.

Supports multiple formats: parquet, csv, arrow, feather, ndjson, json, xlsx, xls, zip, tar, tar.gz, tgz, gz. Handles both individual files and directories (including Hive-partitioned structures). Archives (zip, tar) are automatically extracted to temporary directories. Gzip files (.gz) are automatically decompressed.

Parameters:

path (str, Path, or BytesIO) – Path to a data file, archive, directory, or BytesIO object. When using BytesIO (e.g., from Streamlit file uploads), the object must have a ‘name’ attribute indicating the file extension. Supported formats: - Parquet files or directories - CSV files - Arrow/IPC/Feather files - NDJSON/JSONL files - Excel files (.xlsx, .xls — requires the optional fastexcel package) - GZIP compressed files (.gz, .json.gz, .csv.gz, etc.) - ZIP archives including Pega Dataset Export format (extracted automatically) - TAR archives including .tar.gz and .tgz (extracted automatically) - Hive-partitioned directories (scanned recursively)

Returns:

Lazy DataFrame ready for processing. Use .collect() to materialize.

Return type:

pl.LazyFrame

Raises:

ValueError – If no supported data files are found in a directory, or if the file type is not supported.

Examples

Read a parquet file:

>>> df = read_data("data.parquet")

Read from a ZIP archive:

>>> df = read_data("export.zip")

Read from a TAR archive:

>>> df = read_data("export.tar.gz")

Read from a Hive-partitioned directory:

>>> df = read_data("pxDecisionTime_day=08/")

Read a Pega Dataset Export file:

>>> df = read_data("Data-Decision-ADM-ModelSnapshot_pyModelSnapshots_20210101T010000_GMT.zip")

Read a gzip-compressed file:

>>> df = read_data("export.json.gz")
>>> df = read_data("data.csv.gz")

Read from a BytesIO object (e.g., Streamlit upload):

>>> from io import BytesIO
>>> uploaded_file = ...  # BytesIO with 'name' attribute
>>> df = read_data(uploaded_file)

Read a Feather file:

>>> df = read_data("data.feather")

Notes

Pega Dataset Export Support: This function fully supports Pega Dataset Export format (e.g., Data-Decision-ADM-.zip, Data-DM-.zip). These are zip archives containing a data.json file (NDJSON format) and optionally a META-INF/MANIFEST.mf metadata file. The function automatically extracts and reads the data.json file.

Other Notes: - Archives are extracted to temporary directories with automatic cleanup - OS artifacts (__MACOSX, .DS_Store, ._* files) are automatically removed - For directories, the first supported file type found determines the format

read_ds_export(filename: str | os.PathLike | io.BytesIO, path: str | os.PathLike = '.', *, infer_schema_length: int = 10000, separator: str = ',', ignore_errors: bool = False) polars.LazyFrame | None

Read Pega dataset exports with additional capabilities.

Extends read_data() with:

  • Smart file finding: accepts "model_data" or "predictor_data" and searches for matching files (ADM-specific).

  • URL downloads: fetches remote files when local paths are not found (useful for demos and examples).

  • Schema overrides: applies Pega-specific type corrections (e.g. PYMODELID as string).

For simple file reading without these features, use read_data().

Parameters:
  • filename (str, os.PathLike, or BytesIO) – File identifier. May be a full file path, a generic name like "model_data" / "predictor_data" (triggers smart search), or a io.BytesIO object (delegates to read_data()).

  • path (str or os.PathLike, default='.') – Directory to search for files (ignored for BytesIO or full paths).

  • infer_schema_length (int, keyword-only, default=10000) – Rows to scan for schema inference (CSV/JSON).

  • separator (str, keyword-only, default=",") – CSV delimiter.

  • ignore_errors (bool, keyword-only, default=False) – Whether to continue on parse errors (CSV).

Returns:

Lazy dataframe, or None if the file could not be located.

Return type:

pl.LazyFrame or None

Examples

>>> df = read_ds_export("model_data", path="data/ADMData")
>>> df = read_ds_export("ModelSnapshot_20210101.json", path="data")
>>> df = read_ds_export(
...     "ModelSnapshot.zip", path="https://example.com/exports"
... )
>>> df = read_ds_export("export.csv", infer_schema_length=200000)
_fill_context_field_nulls(df: polars.LazyFrame) polars.LazyFrame

Fill nulls in context fields to prevent issues in downstream operations.

Context fields (Channel, Direction, Issue, Group, Name) often have nulls in source data which can cause errors in group_by, transpose, and concat_str operations. This function fills nulls with “Unknown” to ensure these operations work correctly.

Note: Treatment is intentionally NOT filled because null Treatment has semantic meaning (no treatment variation exists for that action).

Parameters:

df (pl.LazyFrame) – Input dataframe

Returns:

Dataframe with nulls filled in context fields

Return type:

pl.LazyFrame

_import_file(file: str | io.BytesIO, extension: str, *, infer_schema_length: int = 10000, separator: str = ',', ignore_errors: bool = False) polars.LazyFrame

Import a file with Pega-specific schema handling.

Applies ADM-specific type corrections and schema overrides during import. Used internally by read_ds_export().

Parameters:
  • file (str or BytesIO) – File path or BytesIO object.

  • extension (str) – File extension (e.g., .csv, .json, .parquet).

  • infer_schema_length (int, keyword-only, default=10000) – Rows to scan for schema inference (CSV/JSON).

  • separator (str, keyword-only, default=",") – CSV delimiter.

  • ignore_errors (bool, keyword-only, default=False) – Whether to continue on parse errors (CSV).

Returns:

Lazy dataframe with schema corrections applied.

Return type:

pl.LazyFrame

read_zipped_file(file: str | io.BytesIO) tuple[io.BytesIO, str]

Read a Pega zipped NDJSON dataset export.

A Pega dataset export is a zip archive that contains a data.json file (NDJSON format) and optionally a META-INF/MANIFEST.mf metadata file. This helper opens the zip, locates data.json (top-level or nested) and returns its bytes.

Parameters:

file (str or BytesIO) – Path to the zip file, or an in-memory zip buffer.

Returns:

A pair of (buffer, ".json") ready to be passed back into a Polars reader.

Return type:

tuple[BytesIO, str]

Raises:

FileNotFoundError – If the archive does not contain a data.json entry.

read_multi_zip(files: collections.abc.Iterable[str], *, add_original_file_name: bool = False, verbose: bool = True) polars.LazyFrame

Read multiple gzip-compressed NDJSON files and concatenate them.

Parameters:
  • files (Iterable[str]) – Paths to the .json.gz files to read.

  • add_original_file_name (bool, keyword-only, default=False) – If True, add a file column recording each source path.

  • verbose (bool, keyword-only, default=True) – Show a tqdm progress bar (if installed) and print a completion line when done.

Returns:

Concatenated lazy frame across all input files.

Return type:

pl.LazyFrame

get_latest_file(path: str | os.PathLike, target: str) str | None

Find the most recent Pega snapshot file matching a target type.

Searches path for files whose name matches one of the well-known Pega snapshot patterns for target, then returns the most recent one (parsed from the filename’s GMT timestamp, falling back to file ctime). Supports .json, .csv, .zip, .parquet, .feather, .ipc, .arrow.

Parameters:
  • path (str or os.PathLike) – Directory to search.

  • target (str) – One of "model_data", "predictor_data", "prediction_data", "value_finder".

Returns:

Full path to the most recent matching file, or None when no matching file exists.

Return type:

str or None

Raises:

ValueError – If target is not one of the supported names.

find_files(files_dir: collections.abc.Iterable[str], target: str) list[str]

Filter a list of filenames down to those matching a Pega snapshot target.

Parameters:
  • files_dir (Iterable[str]) – Filenames to scan (typically the contents of a directory).

  • target (str) – One of "model_data", "predictor_data", "prediction_data", "value_finder".

Returns:

Filenames whose names match one of the known patterns for target.

Return type:

list[str]

Raises:

ValueError – If target is not one of the supported names.

cache_to_file(df: polars.DataFrame | polars.LazyFrame, path: str | os.PathLike, name: str, cache_type: Literal['parquet'] = 'parquet', compression: polars._typing.ParquetCompression = 'uncompressed') pathlib.Path
cache_to_file(df: polars.DataFrame | polars.LazyFrame, path: str | os.PathLike, name: str, cache_type: Literal['ipc'] = 'ipc', compression: polars._typing.IpcCompression = 'uncompressed') pathlib.Path

Very simple convenience function to cache data. Caches in arrow format for very fast reading.

Parameters:
  • df (pl.DataFrame) – The dataframe to cache

  • path (os.PathLike) – The location to cache the data

  • name (str) – The name to give to the file

  • cache_type (str) – The type of file to export. Default is IPC, also supports parquet

  • compression (str) – The compression to apply, default is uncompressed

Returns:

The filepath to the cached file

Return type:

os.PathLike

read_dataflow_output(files: collections.abc.Iterable[str] | str, cache_file_name: str | None = None, *, cache_directory: str | os.PathLike = 'cache')

Read the file output of a Pega dataflow run.

By default, the Prediction Studio data export also uses dataflows, so this function applies to those exports as well.

Dataflow nodes write many small .json.gz files for each partition. This helper takes a list of files (or a glob pattern) and concatenates them into a single polars.LazyFrame.

If cache_file_name is supplied, results are cached as a parquet file. Subsequent calls only read files that aren’t already in the cache, then update it.

Parameters:
  • files (str or Iterable[str]) – File paths to read. If a string is provided, it’s expanded with glob.glob().

  • cache_file_name (str, optional) – If given, cache results to <cache_directory>/<cache_file_name>.parquet.

  • cache_directory (str or os.PathLike, keyword-only, default="cache") – Directory to store the parquet cache.

Examples

>>> from glob import glob
>>> read_dataflow_output(files=glob("model_snapshots_*.json"))