{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# HDS (Historical Data Set) Analysis and Finding new Features\n", "\n", "This notebook demonstrates how to work with a sample HDS dataset, explore its structure, and build a simple XGBoost model to show feature importance.\n", "\n", "It then shows how to combine this data with a sample dataset from the Data Lake and perform an analysis of which features could be considered to pull in to Pega to improve the models." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import polars as pl\n", "import polars.selectors as cs\n", "import plotly.express as px\n", "import requests, zipfile, io\n", "from pathlib import Path\n", "from sklearn.model_selection import train_test_split\n", "from sklearn.preprocessing import LabelEncoder\n", "from sklearn.metrics import classification_report, roc_auc_score\n", "from pdstools.utils import cdh_utils" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data Read\n", "\n", "Read the HDS data. Depending on how the data was extracted and stored, you may need different reading methods using\n", "zipfile and/or Polars.\n", "\n", "We're also casting the data to the appropriate types and dropping some features that shouldnt be used for models. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if Path(\"../../data/hds.zip\").exists():\n", " archive = zipfile.ZipFile(\"../../data/hds.zip\", \"r\")\n", "else:\n", " # Download the dataset if it does not exist locally\n", " archive = zipfile.ZipFile(io.BytesIO(requests.get('https://github.com/pegasystems/pega-datascientist-tools/raw/master/data/hds.zip').content))\n", "hds_data = (\n", " pl.concat([pl.read_ndjson(archive.open(f)) for f in archive.namelist()])\n", " .rename({\"Customer_C_CIFNBR\": \"Customer_ID\"})\n", " .with_columns(\n", " cdh_utils.parse_pega_date_time_formats(\"Decision_DecisionTime\"),\n", " cdh_utils.parse_pega_date_time_formats(\"Decision_OutcomeTime\"),\n", " cs.ends_with(\"_DaysSince\", \"_pyHistoricalOutcomeCount\").cast(pl.Float64),\n", " pl.col(\n", " [\n", " \"Customer_NetWealth\",\n", " \"Customer_CreditScore\",\n", " \"Customer_CLV_VALUE\",\n", " \"Customer_RelationshipStartDate\",\n", " \"Customer_Date_of_Birth\",\n", " \"Customer_NoOfDependents\"\n", " ]\n", " ).cast(pl.Float64),\n", " cs.starts_with(\"Param_ExtGroup\").cast(pl.Float64),\n", " )\n", " .drop([\"Customer_Gender\", \"Customer_Prefix\"])\n", " .sort([\"Customer_ID\", \"Decision_DecisionTime\"])\n", ")\n", "hds_data.describe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Available Fields in the HDS dataset\n", "\n", "The HDS data contains all the payload sent to the ADM models (over a period of time) plus the outcomes (Accepted/Declined/Clicked etc). There are a few categories of fields, that can be identified by their prefix:\n", "\n", "* \"Customer\" fields, representing the fields/predictors configured in ADM\n", "* \"Context\" fields, these are Channel/Direction/Issue/Group/Name, the usual \"context identifiers\" of the ADM models\n", "* \"IH\" fields, these are Pega-generated fields derived from Interaction History\n", "* Optional \"Param\" fields, also user defined fields/predictors, but configured in the strategies, rather than defined in the ADM model configuration\n", "\n", "Meta information about the decisions is in Decision and internal fields, containing info about the time of decision, the sample size etc. These are not used in the models.\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hds_data_dictionary = (\n", " pl.DataFrame(\n", " {\"Field\" : hds_data.schema.names(),\n", " \"Numeric\" : [x.is_numeric() for x in hds_data.schema.dtypes()],}\n", " )\n", " .with_columns(\n", " Category=pl.when(pl.col(\"Field\").str.contains(\"_\", literal=True))\n", " .then(pl.col(\"Field\").str.replace(r\"([^_]+)_.*\", \"${1}\"))\n", " .otherwise(pl.lit(\"Internal\"))\n", " )\n", " .sort(\"Category\")\n", ")\n", "hds_data_dictionary.to_pandas().style.hide()\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "category_counts = (\n", " hds_data_dictionary\n", " .group_by(\"Category\", \"Numeric\")\n", " .agg(Count=pl.len())\n", " .sort(\"Category\")\n", ")\n", "fig = px.bar(\n", " category_counts, #.to_dict(as_series=False),\n", " y=\"Category\",\n", " x=\"Count\",\n", " title=\"Number of Fields by Category\",\n", " color=\"Numeric\",\n", " text=\"Count\",\n", " orientation=\"h\",\n", ")\n", "\n", "fig.update_layout(\n", " yaxis_title=\"Field Category\",\n", " xaxis_title=\"Number of Fields\",\n", ")\n", "\n", "fig.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create XGBoost model \n", "\n", "From this HDS data we create a simple XGBoost model. We create a simple model over all channels and all actions, not split up like ADM does. This could be changed, but the goal here is to get a list of the features ranked by importance, not to create a model that is better than ADM.\n", "\n", "First, there is data prep to do one-hot encoding and target encoding." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def data_prep(data, data_dictionary):\n", " categorical_fields = (\n", " data_dictionary.filter(~pl.col(\"Numeric\"))\n", " .filter((pl.col(\"Category\") != \"Internal\") & (pl.col(\"Category\") != \"Decision\"))\n", " .select(\"Field\")\n", " .to_series()\n", " .to_list()\n", " )\n", " numerical_fields = (\n", " data_dictionary.filter(pl.col(\"Numeric\"))\n", " .filter((pl.col(\"Category\") != \"Internal\") & (pl.col(\"Category\") != \"Decision\"))\n", " .select(\"Field\")\n", " .to_series()\n", " .to_list()\n", " )\n", "\n", " print(f\"Categorical fields: {categorical_fields}\")\n", " print(f\"Numerical fields: {numerical_fields}\")\n", "\n", " # Simple encoding for categorical features\n", " for column in categorical_fields:\n", " if column != \"Decision_Outcome\":\n", " # Handle missing values\n", " data = data.with_columns(\n", " pl.col(column).fill_null(\"missing\").alias(column)\n", " )\n", "\n", " # Create a simple label encoder\n", " le = LabelEncoder()\n", " encoded = le.fit_transform(data[column].to_list())\n", " data = data.with_columns(\n", " pl.Series(name=column + \"_encoded\", values=encoded)\n", " )\n", "\n", " # Encode target variable\n", " le_target = LabelEncoder()\n", " encoded_target = le_target.fit_transform(data[\"Decision_Outcome\"].to_list())\n", " data = data.with_columns(\n", " pl.Series(name=\"target\", values=encoded_target)\n", " )\n", "\n", " # Show target encoding\n", " target_mapping = dict(zip(le_target.classes_, range(len(le_target.classes_))))\n", " print(f\"\\nTarget encoding: {target_mapping}\")\n", "\n", " # Select features and target\n", " feature_cols = [\n", " col for col in data.columns if col.endswith(\"_encoded\")\n", " ] + numerical_fields\n", "\n", " X = data[feature_cols]\n", " y = data[\"target\"]\n", "\n", " # Split the data\n", " X_train, X_test, y_train, y_test = train_test_split(\n", " X, y, test_size=0.2, random_state=42\n", " )\n", "\n", " print(f\"\\nTraining set size: {X_train.shape[0]} samples\")\n", " print(f\"Test set size: {X_test.shape[0]} samples\")\n", "\n", " return X_train, X_test, y_train, y_test, le_target, feature_cols\n", "\n", "\n", "X_train, X_test, y_train, y_test, target_encoder, feature_cols = data_prep(hds_data, hds_data_dictionary)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from xgboost import XGBClassifier\n", "def create_classifier(X_train, X_test, y_train, y_test, target_encoder):\n", " # Create and train the model\n", " xgb_model = XGBClassifier(random_state=42)\n", " xgb_model.fit(X_train, y_train)\n", "\n", " # Make predictions and evaluate\n", " y_pred = xgb_model.predict(X_test)\n", " print (f\"Model AUC: {round(roc_auc_score(y_test,y_pred), 5)}\")\n", "\n", " # Classification report\n", " print(\"\\nClassification Report:\")\n", " print(classification_report(y_test, y_pred, target_names=target_encoder.classes_))\n", "\n", " return xgb_model\n", "\n", "classifier = create_classifier(X_train, X_test, y_train, y_test, target_encoder)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def plot_feature_imp(classifier, data, feature_cols):\n", " importances = classifier.feature_importances_\n", "\n", " # Create a DataFrame for feature importances\n", " feature_importance_df = (\n", " pl.DataFrame({\"Feature\": feature_cols, \"Importance\": importances.tolist()})\n", " .with_columns(\n", " Feature=pl.when(pl.col(\"Feature\").str.ends_with(\"_encoded\"))\n", " .then(pl.col(\"Feature\").str.replace(r\"_encoded$\", \"\"))\n", " .otherwise(pl.col(\"Feature\"))\n", " )\n", " .with_columns(\n", " Category=pl.when(pl.col(\"Feature\").str.contains(\"_\", literal=True))\n", " .then(pl.col(\"Feature\").str.replace(r\"([^_]+)_.*\", \"${1}\"))\n", " .otherwise(pl.lit(\"Internal\"))\n", " )\n", " .sort(\"Importance\", descending=True)\n", " )\n", "\n", " # Get top N features by importance\n", " top_features_df = feature_importance_df.head(50)\n", "\n", " # Get the ordered list of feature names\n", " feature_order = top_features_df[\"Feature\"].to_list()\n", "\n", " # Correlation test of new features\n", " features_encoded_name = [\n", " f\"{feat}_encoded\" if f\"{feat}_encoded\" in feature_cols else feat\n", " for feat in feature_order\n", " ] \n", "\n", " similar_features = []\n", " n = len(features_encoded_name)\n", "\n", " for i in range(n):\n", " for j in range(i + 1, n):\n", " col1 = features_encoded_name[i]\n", " col2 = features_encoded_name[j]\n", "\n", " correlation = data.select(pl.corr(col1, col2)).item()\n", " if abs(correlation) >= 0.95:\n", " found = False\n", "\n", " for i, tup in enumerate(similar_features):\n", " if col1 in tup:\n", " similar_features[i] = tup + (col2,)\n", " found = True\n", " break\n", " elif col2 in tup:\n", " similar_features[i] = tup + (col1,)\n", " found = True\n", "\n", " if not found:\n", " similar_features.append((col1, col2))\n", " \n", "\n", " # Creating the group label\n", " group_mapping = {}\n", " for i, group in enumerate(similar_features, 1):\n", " for feature in group:\n", " group_mapping[feature] = f\"Group {i}\"\n", "\n", " top_features_df = top_features_df.with_columns(\n", " pl.col(\"Feature\").map_elements(lambda f: group_mapping.get(f, \"\"), return_dtype=pl.Utf8).alias(\"GroupLabel\")\n", " )\n", "\n", " # Plot feature importances\n", " fig = px.bar(\n", " top_features_df,\n", " x=\"Importance\",\n", " y=\"Feature\",\n", " orientation=\"h\",\n", " title=\"Feature Importance\",\n", " template=\"plotly_white\",\n", " color=\"Category\",\n", " text=\"GroupLabel\",\n", " color_discrete_map={\n", " \"Context\": \"orange\",\n", " \"IH\": \"green\",\n", " \"Customer\": \"blue\",\n", " \"Param\": \"lightblue\",\n", " \"DataLake\": \"red\",\n", " \"Internal\": \"gray\",\n", " \"Decision\": \"purple\",\n", " },\n", " )\n", "\n", " fig.update_layout(\n", " xaxis_title=\"Importance\",\n", " yaxis_title=\"Feature\",\n", " yaxis=dict(\n", " categoryorder=\"array\",\n", " categoryarray=feature_order,\n", " autorange=\"reversed\",\n", " dtick=1,\n", " ),\n", " )\n", "\n", " fig.update_traces(\n", " textposition=\"outside\",\n", " textfont=dict(color=\"black\", size=12),\n", " )\n", " return fig\n", "\n", "plot_feature_imp(classifier, X_train, feature_cols).update_layout(height=800).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Features that are strongly correlated are shown with an indication of the group (e.g. \"Group 1\"). Colors are used to differentiate the different sources of the features. In this demo data set, you see that Group and Channel are very important features, as is expected." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Finding new Features from the Data Lake\n", "\n", "Now, suppose you have external data from your data lake that you want to consider adding to Pega to improve the performance of your models.\n", "\n", "If you have such data, you can merge it with the HDS data and run the model again to see how these features fare against what ADM already uses.\n", "\n", "Such data is typically time-stamped, so we need to be careful to only pull in data from before the decisions were made. \n", "\n", "### Create (fake) External Data\n", "\n", "We first create an example of external data. All features are captured over time there, so there is a feature name, a timestamp, and a value.\n", "\n", "This code (and resulting data) are just an example. You can use any data you want, we just highlight the structure.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import random\n", "\n", "random.seed(101)\n", "datalake_fake_data = hds_data.with_columns(\n", " DataLake_BadFeature=pl.Series([random.random() for _ in range(hds_data.height)]),\n", " DataLake_GoodFeature=(pl.col(\"Decision_Outcome\") == \"Accepted\") * 0.9\n", " + pl.Series([random.random() for _ in range(hds_data.height)]) * 0.1,\n", " DataLake_GoodFeatureCorrelated=(pl.col(\"Decision_Outcome\") == \"Accepted\") * 0.8\n", " + pl.Series([random.random() for _ in range(hds_data.height)]) * 0.1\n", ").select(\n", " [\n", " pl.col(\"Customer_ID\"),\n", " pl.col(\"Decision_DecisionTime\").dt.truncate(\"1d\").alias(\"SnapshotTime\"),\n", " pl.col(\"DataLake_BadFeature\"),\n", " pl.col(\"DataLake_GoodFeature\"),\n", " pl.col(\"DataLake_GoodFeatureCorrelated\")\n", " ]\n", ").group_by(\n", " [\"Customer_ID\", \"SnapshotTime\"]\n", ").agg(\n", " cs.all().mean()\n", ").sort([\"Customer_ID\", \"SnapshotTime\"])\n", "\n", "datalake_fake_data.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Joining that data with the HDS data is straightforward: we match by customer ID and timestamp, but need to be careful to avoid leakage, so we only join in data for a particular customer from the data lake that is the latest snapshot before the timestamp of the HDS dataset. \n", "\n", "Polars provides a convenient way to do this with the join_asof function." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "augmented_data = hds_data.join_asof(\n", " datalake_fake_data,\n", " left_on=\"Decision_DecisionTime\",\n", " right_on=\"SnapshotTime\",\n", " by=\"Customer_ID\",\n", " check_sortedness=False,\n", ")\n", "augmented_data_dictionary = pl.concat(\n", " [\n", " hds_data_dictionary,\n", " pl.DataFrame(\n", " {\n", " \"Field\": [\"DataLake_BadFeature\", \"DataLake_GoodFeature\", \"DataLake_GoodFeatureCorrelated\"],\n", " \"Numeric\": [True, True, True],\n", " \"Category\": [\"DataLake\", \"DataLake\", \"DataLake\"],\n", " }\n", " ),\n", " ]\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "X_train, X_test, y_train, y_test, target_encoder, feature_cols = data_prep(augmented_data, augmented_data_dictionary)\n", "classifier = create_classifier(X_train, X_test, y_train, y_test, target_encoder)\n", "\n", "plot_feature_imp(classifier, X_train, feature_cols).update_layout(height=800).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Conclusions\n", "\n", "The resulting feature importance shows how the new \"GoodFeature\" and its correlated variant are on top of the list of features, and the \"BadFeature\" is at the bottom. You should consider the new features with high feature importance for inclusion in Pega.\n", "\n", "We also do a correlation check, so very similar features are labeled with their group index. In this example, you would want only the best feature of \"Group 1\" to be included, the other one won't add much values since it is so highly correlated.\n" ] } ], "metadata": { "kernelspec": { "display_name": "pega-datascientist-tools", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.6" } }, "nbformat": 4, "nbformat_minor": 4 }