pdstools.prediction.Prediction ============================== .. py:module:: pdstools.prediction.Prediction Attributes ---------- .. autoapisummary:: pdstools.prediction.Prediction.logger Classes ------- .. autoapisummary:: pdstools.prediction.Prediction.Prediction Module Contents --------------- .. py:data:: logger .. py:class:: Prediction(df: polars.LazyFrame, *, query: pdstools.utils.types.QUERY | None = None) Monitor and analyze Pega Prediction Studio Predictions. To initialize this class, either 1. Initialize directly with the df polars LazyFrame 2. Use one of the class methods This class will read in the data from different sources, properly structure them for further analysis, and apply correct typing and useful renaming. There is also a "namespace" that you can call from this class: - `.plot` contains ready-made plots to analyze the prediction data with :param df: The Polars LazyFrame representation of the prediction data. :type df: pl.LazyFrame :param query: An optional query to apply to the input data. For details, see :meth:`pdstools.utils.cdh_utils._apply_query`. :type query: QUERY, optional .. rubric:: Examples >>> pred = Prediction.from_ds_export('/my_export_folder/predictions.zip') >>> pred = Prediction.from_mock_data(days=70) >>> from pdstools import Prediction >>> import polars as pl >>> pred = Prediction( df = pl.scan_parquet('predictions.parquet'), query = {"Class":["DATA-DECISION-REQUEST-CUSTOMER-CDH"]} ) .. seealso:: :py:obj:`pdstools.prediction.PredictionPlots` The out of the box plots on the Prediction data :py:obj:`pdstools.utils.cdh_utils._apply_query` How to query the Prediction class and methods .. py:attribute:: predictions :type: polars.LazyFrame .. py:attribute:: plot :type: pdstools.prediction.Plots.PredictionPlots .. py:attribute:: prediction_validity_expr .. py:method:: from_ds_export(predictions_filename: os.PathLike | str, base_path: os.PathLike | str = '.', *, query: pdstools.utils.types.QUERY | None = None, infer_schema_length: int = 10000) :classmethod: Import from a Pega Dataset Export of the PR_DATA_DM_SNAPSHOTS table. :param predictions_filename: The full path or name (if base_path is given) to the prediction snapshot files :type predictions_filename: Union[os.PathLike, str] :param base_path: A base path to provide if predictions_filename is not given as a full path, by default "." :type base_path: Union[os.PathLike, str], optional :param query: An optional argument to filter out selected data, by default None :type query: Optional[QUERY], optional :param infer_schema_length: Number of rows to scan when inferring the schema for CSV/JSON files. For large production datasets, increase this value (e.g., 200000) if columns are not being detected correctly. Higher values use more memory but provide more accurate schema detection. By default 10000 :type infer_schema_length: int, optional :returns: The properly initialized Prediction class :rtype: Prediction .. rubric:: Examples >>> from pdstools import Prediction >>> pred = Prediction.from_ds_export('predictions.zip', '/my_export_folder') >>> # For large datasets with schema detection issues: >>> pred = Prediction.from_ds_export( 'predictions.zip', '/my_export_folder', infer_schema_length=200000 ) .. note:: By default, the dataset export in Infinity returns a zip file per table. You do not need to open up this zip file! You can simply point to the zip, and this method will be able to read in the underlying data. .. seealso:: :py:obj:`pdstools.pega_io.File.read_ds_export` More information on file compatibility :py:obj:`pdstools.utils.cdh_utils._apply_query` How to query the Prediction class and methods .. py:method:: from_s3(bucket: str, key: str, *, region: str | None = None, boto3_client=None, query: pdstools.utils.types.QUERY | None = None, infer_schema_length: int = 10000) -> Prediction :classmethod: Import the Prediction class from a single object stored in S3. Downloads the prediction snapshot file from the given S3 bucket to a temporary directory, then delegates to :meth:`from_ds_export` for parsing. :param bucket: Name of the S3 bucket holding the export file. :type bucket: str :param key: S3 object key for the prediction snapshot file. :type key: str :param region: AWS region name. Ignored if ``boto3_client`` is provided. :type region: str | None, optional :param boto3_client: Pre-configured ``boto3`` S3 client. Use this to inject custom credentials, endpoints, or sessions. When omitted, a default client is created via ``boto3.client("s3", region_name=region)``. :type boto3_client: optional :param query: An optional argument to filter the data, by default None. :type query: QUERY | None, optional :param infer_schema_length: Number of rows to scan when inferring the schema for CSV/JSON files. By default 10000. :type infer_schema_length: int, optional :returns: The properly initialized Prediction class. :rtype: Prediction .. rubric:: Examples >>> from pdstools import Prediction >>> pred = Prediction.from_s3( ... bucket="my-pega-exports", ... key="datamart/prediction_snapshots.parquet", ... ) .. note:: ``boto3`` is an optional dependency; install the ``pega_io`` extra (or install ``boto3`` directly) before calling this method. .. seealso:: :py:obj:`Prediction.from_ds_export` Underlying parser for downloaded files. .. py:method:: from_dataflow_export(prediction_data_files: collections.abc.Iterable[str] | str, *, query: pdstools.utils.types.QUERY | None = None, cache_file_prefix: str = '', cache_directory: os.PathLike | str = 'cache') -> Prediction :classmethod: Read prediction data generated by a data flow (e.g. Prediction Studio export). Dataflows can export data from and to various sources. As they are meant to be used in production, they are highly resilient. For every partition and every node, a dataflow will output a small JSON file every few seconds. While this is great for production loads, it can be tricky to read the data back for smaller-scale and ad-hoc analyses. This method reads in every individual small JSON file that the dataflow has output and caches them to a parquet file in the ``cache_directory`` folder. Re-running the method later with more data added since the last export reads from the (much faster) cache rather than the (slow) raw dataflow files. :param prediction_data_files: A list of files (or a glob pattern string) to read in as the prediction snapshots. :type prediction_data_files: Iterable[str] or str :param query: An optional argument to filter the data, by default None. :type query: QUERY, optional :param cache_file_prefix: An optional prefix for the cache files, by default "". :type cache_file_prefix: str, optional :param cache_directory: Where to store the cached files, by default ``"cache"``. :type cache_directory: os.PathLike or str, optional :returns: An initialized instance of the Prediction class. :rtype: Prediction .. rubric:: Examples >>> from pdstools import Prediction >>> from glob import glob >>> pred = Prediction.from_dataflow_export(glob("data/predictions*")) .. seealso:: :py:obj:`pdstools.pega_io.read_dataflow_output` Underlying file reader. :py:obj:`pdstools.utils.cdh_utils._apply_query` How to query the Prediction class and methods. .. py:method:: from_pdc(df: polars.LazyFrame, *, query: pdstools.utils.types.QUERY | None = None) :classmethod: Import from (Pega-internal) PDC data, which is a combination of the PR_DATA_DM_SNAPSHOTS and PR_DATA_DM_ADMMART_MDL_FACT tables. :param df: The Polars LazyFrame containing the PDC data :type df: pl.LazyFrame :param query: An optional query to apply to the input data, by default None :type query: Optional[QUERY], optional :returns: The initialized Prediction class. Use ``pred.predictions`` to access the transformed prediction frame directly. :rtype: Prediction .. seealso:: :py:obj:`pdstools.utils.cdh_utils._read_pdc` More information on PDC data processing :py:obj:`pdstools.utils.cdh_utils._apply_query` How to query the Prediction class and methods .. py:method:: save_data(path: os.PathLike | str = '.') -> os.PathLike | None Cache predictions to a file. :param path: Where to place the file :type path: Union[os.PathLike, str] :returns: The path to the cached prediction data file, or None if no data available :rtype: Optional[os.PathLike] .. py:method:: from_processed_data(df: polars.LazyFrame) :classmethod: Load a Prediction from already-processed data (e.g., from cache). This bypasses the normal data transformation pipeline and directly assigns the data to self.predictions. Use this when loading data that has already been processed by the Prediction class constructor, such as data saved via save_data(). :param df: A LazyFrame containing already-processed prediction data with columns like 'Positives', 'CTR', 'Performance', etc. rather than the raw 'pyPositives', 'pyModelType', etc. :type df: pl.LazyFrame :returns: A Prediction instance with the processed data loaded :rtype: Prediction .. rubric:: Examples >>> # Load from a cached file >>> cached_data = pl.scan_parquet('cached_predictions.parquet') >>> pred = Prediction.from_processed_data(cached_data) .. py:method:: from_mock_data(days=70) :classmethod: Create a Prediction instance with mock data for testing and demonstration purposes. :param days: Number of days of mock data to generate, by default 70 :type days: int, optional :returns: The initialized Prediction class with mock data :rtype: Prediction .. rubric:: Examples >>> from pdstools import Prediction >>> pred = Prediction.from_mock_data(days=30) >>> pred.plot.performance_trend() .. py:property:: is_available :type: bool Check if prediction data is available. :returns: True if prediction data is available, False otherwise :rtype: bool .. py:property:: is_valid :type: bool Check if prediction data is valid. A valid prediction meets the criteria defined in prediction_validity_expr, which requires positive and negative responses in both test and control groups. :returns: True if prediction data is valid, False otherwise :rtype: bool .. py:method:: summary_by_channel(custom_predictions: list[list] | None = None, *, start_date: datetime.datetime | None = None, end_date: datetime.datetime | None = None, window: int | datetime.timedelta | None = None, every: str | None = None, debug: bool = False) -> polars.LazyFrame Summarize prediction per channel :param custom_predictions: Optional list with custom prediction name to channel mappings. Each item should be [PredictionName, Channel, Direction, isMultiChannel]. Defaults to None. :type custom_predictions: Optional[list[list]], optional :param start_date: Start date of the summary period. If None (default) uses the end date minus the window, or if both absent, the earliest date in the data :type start_date: datetime.datetime, optional :param end_date: End date of the summary period. If None (default) uses the start date plus the window, or if both absent, the latest date in the data :type end_date: datetime.datetime, optional :param window: Number of days to use for the summary period or an explicit timedelta. If None (default) uses the whole period. Can't be given if start and end date are also given. :type window: int or datetime.timedelta, optional :param every: Optional additional grouping by time period. Format string as in polars.Expr.dt.truncate (https://docs.pola.rs/api/python/stable/reference/expressions/api/polars.Expr.dt.truncate.html), for example "1mo", "1w", "1d" for calendar month, week day. Defaults to None. :type every: str, optional :param debug: If True, include the Period column in output when `every` is specified. If False, the Period column is dropped from the results. This parameter affects the return value structure, not logging output. For debug logging, use logging.basicConfig(level=logging.DEBUG). :type debug: bool, default False :returns: Summary across all Predictions as a dataframe with the following fields: Time and Configuration Fields: - DateRange Min - The minimum date in the summary time range - DateRange Max - The maximum date in the summary time range - Duration - The duration in seconds between the minimum and maximum snapshot times - Prediction: The prediction name - Channel: The channel name - Direction: The direction (e.g., Inbound, Outbound) - ChannelDirectionGroup: Combined Channel/Direction identifier - isValid: Boolean indicating if the prediction data is valid - usesNBAD: Boolean indicating if this is a standard NBAD prediction - isMultiChannel: Boolean indicating if this is a multichannel prediction - ControlPercentage: Percentage of responses in control group - TestPercentage: Percentage of responses in test group Performance Metrics: - Performance: Weighted model performance (AUC) in range 0.5-1.0 - Positives: Sum of positive responses - Negatives: Sum of negative responses - Responses: Sum of all responses - Positives_Test: Sum of positive responses in test group - Positives_Control: Sum of positive responses in control group - Positives_NBA: Sum of positive responses in NBA group - Negatives_Test: Sum of negative responses in test group - Negatives_Control: Sum of negative responses in control group - Negatives_NBA: Sum of negative responses in NBA group - CTR: Clickthrough rate (Positives over Positives + Negatives) - CTR_Test: Clickthrough rate for test group (model propensitities) - CTR_Control: Clickthrough rate for control group (random propensities) - CTR_NBA: Clickthrough rate for NBA group (available only when Impact Analyzer is used) - Lift: Lift in Engagement when testing prioritization with just Adaptive Models vs just Random Propensity Technology Usage Indicators: - usesImpactAnalyzer: Boolean indicating if Impact Analyzer is used :rtype: pl.LazyFrame .. py:method:: overall_summary(custom_predictions: list[list] | None = None, *, start_date: datetime.datetime | None = None, end_date: datetime.datetime | None = None, window: int | datetime.timedelta | None = None, every: str | None = None, debug: bool = False) -> polars.LazyFrame Overall prediction summary. Only valid prediction data is included. :param custom_predictions: Optional list with custom prediction name to channel mappings. Each item should be [PredictionName, Channel, Direction, isMultiChannel]. Defaults to None. :type custom_predictions: Optional[list[list]], optional :param start_date: Start date of the summary period. If None (default) uses the end date minus the window, or if both absent, the earliest date in the data :type start_date: datetime.datetime, optional :param end_date: End date of the summary period. If None (default) uses the start date plus the window, or if both absent, the latest date in the data :type end_date: datetime.datetime, optional :param window: Number of days to use for the summary period or an explicit timedelta. If None (default) uses the whole period. Can't be given if start and end date are also given. :type window: int or datetime.timedelta, optional :param every: Optional additional grouping by time period. Format string as in polars.Expr.dt.truncate (https://docs.pola.rs/api/python/stable/reference/expressions/api/polars.Expr.dt.truncate.html), for example "1mo", "1w", "1d" for calendar month, week day. Defaults to None. :type every: str, optional :param debug: If True, include the Period column in output when `every` is specified. If False, the Period column is dropped from the results. This parameter affects the return value structure, not logging output. For debug logging, use logging.basicConfig(level=logging.DEBUG). :type debug: bool, default False :returns: Summary across all Predictions as a dataframe with the following fields: Time and Configuration Fields: - DateRange Min - The minimum date in the summary time range - DateRange Max - The maximum date in the summary time range - Duration - The duration in seconds between the minimum and maximum snapshot times - ControlPercentage: Weighted average percentage of control group responses - TestPercentage: Weighted average percentage of test group responses - usesNBAD: Boolean indicating if any of the predictions is a standard NBAD prediction Performance Metrics: - Performance: Weighted average performance (AUC) across all valid channels in range 0.5-1.0 - Positives Inbound: Sum of positive responses across all valid inbound channels - Positives Outbound: Sum of positive responses across all valid outbound channels - Responses Inbound: Sum of all responses across all valid inbound channels - Responses Outbound: Sum of all responses across all valid outbound channels - Overall Lift: Weighted average lift across all valid channels - Minimum Negative Lift: The lowest negative lift value found Channel Statistics: - Number of Valid Channels: Count of unique valid channel/direction combinations - Channel with Minimum Negative Lift: Channel with the lowest negative lift value Technology Usage Indicators: - usesImpactAnalyzer: Boolean indicating if any channel uses Impact Analyzer :rtype: pl.LazyFrame