Skip to content

Latest commit

Β 

History

History

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

README.md

Automated Drift and Trend Monitoring for ML Models on Amazon SageMaker

Architecture Diagram

MLOps Architecture

11-Step End-to-End Flow: This architecture shows the complete MLOps pipeline from data ingestion through training, deployment, inference monitoring, and governance. See ARCHITECTURE_STEPS.md for detailed descriptions of each numbered step.

Quickstart: Implementation Steps

Follow these steps to implement the 11-step architecture shown above. Each step corresponds to the numbered components in the diagram.

Prerequisites

SageMaker AI Domain Setup

This solution requires a SageMaker AI domain with MLflow tracking server. If you don't have one set up already, use this CloudFormation template to create a domain with VPC configuration:

SageMaker Domain with VPC Setup

Step 1: Configure Environment

Copy .env.example to .env and update with your specific environment variables. Settings in .env will override all other defaults and settings in config.yaml.

git clone https://github.com/aws-samples/sample-mlops-bestpractices.git
cd sagemaker-automated-drift-and-trend-monitoring
cp .env.example .env

Required .env settings:

  • SAGEMAKER_EXEC_ROLE - ARN of SageMaker execution role
  • MLFLOW_TRACKING_URI - ARN of SageMaker MLflow tracking server
  • DATA_S3_BUCKET - S3 bucket name for data storage
  • AWS_DEFAULT_REGION - AWS region (must match your domain region)

Steps 1-5: Training Pipeline (1_training_pipeline.ipynb)

Step 1 - Data Ingestion

  • S3 data loaded into Training Data table
  • The repository provides scripts to generate the training dataset (generate_datasets.py) if you would like to test with the sample dataset
  • Alternatively, you can replace training_data with your own dataset

Step 2 - Feature Engineering

  • PySpark processing transforms raw data into features
  • Handles missing values, derives velocity scores, time-based features

Step 3 - Model Training

  • XGBoost training on preprocessed features
  • Hyperparameters configured in pipeline definition

Step 4 - Model Evaluation & Registration

  • Training steps and evaluation steps log to SageMaker AI MLflow App
  • Unified experiment tracking and drift computation environment
  • Model registered with versioning and metadata

Step 5 - Model Deployment

  • Custom handler deployed to SageMaker AI inference endpoint
  • Once deployed, the inference handler writes all inferences to SQS and Lambda
  • All notebooks include convenience script invocations for role creation and infrastructure setup (SQS, Lambda, roles, etc.)

Steps 6-10: Inference Monitoring (2a_inference_monitoring.ipynb)

Step 6 - Real-Time Inference Logging

  • Endpoint logs predictions to SQS β†’ Lambda β†’ Athena inference_responses table
  • Async logging with zero latency impact

Step 7 - Ground Truth Collection (Optional for testing)

  • Simulate ground truth to test data and model drift detection
  • In production, fraud confirmations arrive from investigation teams

Step 8 - Ground Truth Backfill

  • Merge ground truth with inference responses
  • Reconcile actual fraud outcomes with predictions based on inference_id
  • Athena MERGE operation on Iceberg table (ACID compatible)

Step 9 - Scheduled Drift Detection

  • EventBridge triggers Lambda drift monitor (daily 2 AM UTC)
  • Compares training data (baseline) vs inference responses (current data)
  • Runs Evidently AI drift analysis (PSI, KS tests)

Step 10 - Drift Report Generation

  • lambda_drift_monitor uses Evidently to generate interactive charts
  • Reports logged to SageMaker AI MLflow App
  • Drift monitor checks thresholds and triggers alarms if exceeded
  • Results pushed to SQS β†’ Lambda writer β†’ monitoring_responses table

Step 11: Governance Dashboard (3_governance_dashboard.ipynb)

Step 11 - QuickSight Visualization

  • Create feature drift datasets, drift scores, and severity visualizations
  • Observe trends for feature drift, model drift, and data drift
  • Dashboard queries Athena tables directly (inference_responses, monitoring_responses)
  • Auto-refresh via EventBridge + Lambda (3 AM UTC daily)

Introduction

Machine learning models in production degrade silently. Feature distributions shift, fraud patterns evolve, and by the time business metrics reveal the problem, the damage is done. Most teams invest heavily in training pipelines but leave inference monitoring as an afterthought β€” relying on expensive managed platforms or discovering issues through customer complaints.

This solution provides an end-to-end, open-source MLOps system built on Amazon SageMaker, MLflow, and Evidently AI that closes the ML governance gap. It trains an XGBoost fraud detection model via SageMaker Pipelines, logs every prediction to an Athena Iceberg data lake with zero-latency async writes, and runs automated daily drift checks using EventBridge-triggered Lambda functions. Evidently AI generates interactive data drift and classification reports, while configurable thresholds in a central config.yaml let teams tune sensitivity for both data and model drift without code changes. SNS alerts fire when drift exceeds thresholds, and an Amazon QuickSight governance dashboard β€” refreshed automatically via a dedicated EventBridge + Lambda pipeline β€” surfaces inference trends, drift history, and model performance in a single pane of glass.

The result is a production-ready monitoring system with pay-as-you-go pricing that scales linearly with inference volume, runs entirely on open-source SDKs portable across clouds, and handles real-world challenges like delayed ground truth confirmations, concept drift, and multi-feature drift analysis. Three guided Jupyter notebooks walk you from training through monitoring to dashboard creation, making it straightforward to adapt this pattern to your own models and datasets.

Understanding Drift Through Visualization

The QuickSight governance dashboard features drift score trendlines that plot each feature's Population Stability Index (PSI) over time. These time-series visualizations let you identify which specific features are drifting (e.g., credit_limit spiking to 74.47, merchant_category_code at 28.25), distinguish temporary spikes from permanent shifts, and correlate drift events with model deployments or business changes. Instead of a single "drift detected" alert, you get granular per-feature analysis showing exactly which input distributions have changed since training β€” enabling targeted investigation of data quality issues, pipeline changes, or genuine shifts in customer behavior. For an accessible deep dive into interpreting drift scores and PSI calculations, see Understanding Drift Scores: A Visual Guide. To extend your dashboard with additional feature-level drift visuals (timelines, top drifting features, severity distributions), see Feature-Level Drift Analysis Implementation Guide.


Credit Card Fraud Detection with SageMaker Pipelines

Production-grade ML pipeline for credit card fraud detection using AWS SageMaker Pipelines, MLflow tracking, and Athena data lake integration with comprehensive inference monitoring and drift detection.

Why This Project Matters

Machine learning models degrade over time. Production models face data drift (feature distributions change), concept drift (prediction-target relationships shift), and performance degradation as the real world evolves. Without continuous monitoring, ML systems fail silently β€” making increasingly poor decisions while appearing healthy on dashboards.

This solution addresses a critical gap: lack of production-ready inference monitoring options in the ML ecosystem. While training pipelines are well-served by managed platforms, comprehensive inference monitoring with drift detection, ground truth integration, and automated alerting requires custom solutions or expensive enterprise platforms.

This is especially critical for fraud detection, where:

  • Fraudsters adapt: Attack patterns evolve to evade detection
  • Customer behavior shifts: Economic conditions, seasonal trends, new payment methods
  • Delayed ground truth: Fraud confirmation takes days or weeks, making real-time accuracy impossible to measure
  • High stakes: False positives annoy customers, false negatives cost money

The ML Governance Gap

Most organizations have robust training pipelines but lack production inference monitoring. They deploy models and hope for the best. When performance degrades, they discover it through business metrics (customer complaints, financial losses) rather than automated alerts. By then, damage is done.

Existing solutions are inadequate:

  • Managed platforms require significant upfront licensing fees and lock you into proprietary systems
  • Built-in monitoring tools often lack integration with open-source drift detection libraries and have limited serverless endpoint support
  • Custom solutions require months of engineering effort to build drift detection, ground truth integration, and alerting

This Project's Solution

A ** Open-source MLOps system** that establishes automated monitoring and governance for ML inference:

  • Training Pipeline: SageMaker Pipelines with automated preprocessing, training (XGBoost), evaluation, and deployment
  • Experiment Tracking: MLflow for model versioning, metrics, and artifact management
  • Data Lake: Athena Iceberg tables for training data and inference logging
  • Custom Inference Handler: Automatic logging of all predictions with buffered async writes (zero latency impact)
  • Monitoring & Drift Detection: Continuous model performance tracking with Evidently-powered interactive reports, automated daily checks, and SNS email alerts
  • Ground Truth Integration: Asynchronous ground truth capture with Athena MERGE updates for delayed fraud confirmations
  • Governance Dashboard: QuickSight dashboard with automated daily refresh showing inference trends, drift history, and model performance

Key Benefits:

  • Cost Efficient: Pay-as-you-go serverless architecture scales to zero when idle. Variable costs (inference invocations, Athena queries, Lambda executions) scale linearly with traffic, while fixed costs (S3 storage, MLflow tracking) remain minimal and predictable. No upfront licensing fees or reserved capacity required.
  • Portable: Open-source SDKs (MLflow, Evidently, Pandas) run anywhere β€” AWS, GCP, Azure, on-prem
  • Production Ready: Handles real-world ML challenges (delayed ground truth, data drift, concept drift, alerting)
  • Automated: EventBridge schedules drift checks at 2 AM, QuickSight refreshes at 3 AM β€” no manual intervention
  • Comprehensive: Training, inference, monitoring, drift detection, and governance in one integrated system

Inference Monitoring Process Flow

Inference Monitoring Process Flow

13-Step Detailed Flow: This diagram shows the complete inference monitoring pipeline from real-time predictions through ground truth backfill, drift detection, and governance visualization. Each data flow is numbered and labeled for clarity.

This detailed diagram shows the end-to-end inference monitoring flow with MLflow as the central monitoring interface:

Flow Breakdown:

1. Real-Time Inference (Top)

  • Application sends transaction with 30 features β†’ SageMaker Endpoint
  • Endpoint runs XGBoost model, returns prediction + fraud probability
  • Zero latency added to inference (async logging happens in background)

2. Async Logging Pipeline

  • Inference handler sends prediction to SQS queue (fire-and-forget)
  • Lambda consumer batches messages (10 msgs or 30s window)
  • Writes batch to Athena inference_responses table
  • Partitioned by date for efficient querying

3. Ground Truth Integration

  • Fraud investigations confirm actual fraud (days/weeks later)
  • Updates flow to ground_truth_updates table
  • Athena MERGE updates inference_responses with confirmed labels
  • Enables model performance tracking with real outcomes

4. MLflow Monitoring Interface (Central Hub) MLflow serves as the unified monitoring dashboard where all monitoring workflows converge:

Metrics Logged:

  • Data Drift: drifted_columns_count, drifted_columns_share, per-feature drift_score_* (via Evidently DataDriftPreset)
  • Model Performance: current_roc_auc, accuracy, precision, recall, plus Evidently classification metrics (evidently_accuracy, evidently_f1, etc.)
  • Degradation: roc_auc_degradation, roc_auc_degradation_pct
  • Detection Flags: data_drift_detected, model_drift_detected

Visualizations (artifacts/evidently_reports/):

  • Interactive Evidently HTML data drift report (PSI, KS, distribution comparisons per feature)
  • Interactive Evidently HTML classification report (ROC curve, PR curve, confusion matrix, accuracy, F1)

Artifacts:

  • evidently_reports/data_drift_*.html - Full interactive data drift dashboard
  • evidently_reports/classification_*.html - Full interactive classification dashboard
  • drift_reports/drift_summary_*.json - Structured JSON drift summary

5. Monitoring Workflows

  • Manual: Data scientists run 2a_inference_monitoring.ipynb β†’ query Athena β†’ run Evidently reports β†’ log interactive HTML reports to MLflow
  • Automated: EventBridge triggers Lambda daily β†’ runs Evidently DataDriftPreset + ClassificationPreset β†’ logs HTML reports & metrics to MLflow β†’ sends SNS alert if drift detected

6. Alerting

  • SNS sends email notifications when thresholds exceeded
  • Includes drifted features, performance metrics, and actionable recommendations
  • Email contains links to MLflow for detailed analysis

Key Differentiators:

βœ… MLflow as Single Pane of Glass - All monitoring metrics, Evidently HTML reports, and artifacts in one interface βœ… Evidently-Powered Drift Detection - Interactive HTML reports with PSI, KS, distribution comparisons per feature βœ… Automated + Manual - Both notebook-based exploration and scheduled Lambda checks βœ… Ground Truth Integration - Handles delayed fraud confirmation typical in fraud detection βœ… Zero Inference Latency - Async logging doesn't impact prediction response time βœ… Cost-Efficient - $5/month Lambda + $25/month Athena vs. $200+/month for SageMaker Model Monitor

Key Differentiators:

  1. Open-Source SDKs (MLflow, Evidently, Pandas, Scikit-learn) - Ensures portability across AWS, GCP, Azure, or on-prem. No vendor lock-in. Industry-standard tools = easier hiring.

  2. Cost Efficiency - Serverless architecture scales to zero when idle, charging only for actual usage rather than reserved capacity. Variable costs (inference invocations, Athena queries, Lambda executions) scale linearly with traffic. Fixed costs (S3 storage, MLflow tracking) remain minimal. No upfront licensing or minimum commitments required.

  3. Custom Inference Monitoring - Evidently-powered drift detection (DataDriftPreset for all features, ClassificationPreset for model performance), automated ground truth integration, and EventBridge/Lambda alerting with interactive HTML reports logged to MLflow. Most platforms lack comprehensive inference monitoring or charge premium rates.

  4. Production-Grade Without Platform Costs - SageMaker provides reliability (99.9% SLA) while MLflow provides portability. Best of both worlds: enterprise reliability + startup agility.

vs. Alternatives: This solution balances pay-as-you-go pricing, portability, and operational simplicity. Fully managed platforms require significant upfront licensing and vendor lock-in. Pure cloud-native solutions lack portability. Open-source platforms are portable but require substantial DevOps investment. This architecture delivers enterprise reliability with startup agility.

Ideal For: Teams needing production ML with comprehensive monitoring, audit trails, and multi-cloud optionality without enterprise platform licensing costs or operational complexity.

Why Not SageMaker DataCaptureConfig?

SageMaker provides a built-in DataCaptureConfig that captures raw request/response payloads from real-time endpoints and writes them to S3 in jsonl format. But it only supports real-time endpoints as of today.

DataCaptureConfig makes sense if you are using real-time endpoints. For this architecture — which prioritizes Evidently-powered drift detection, MLflow as the monitoring hub, Athena as the data lake, and cost efficiency — a custom SQS→Lambda→Athena pipeline is the better fit.

Project Structure

sagemaker-automated-drift-and-trend-monitoring/
β”œβ”€β”€ src/
β”‚   β”œβ”€β”€ pipeline/
β”‚   β”‚   β”œβ”€β”€ pipeline.py                       # Complete pipeline definition
β”‚   β”‚   β”œβ”€β”€ inference_handler.py              # Custom inference handler with Athena logging
β”‚   β”‚   β”œβ”€β”€ deploy.py                         # Manual model deployment (with custom handler)
β”‚   β”‚   β”œβ”€β”€ train.py                          # Manual training script
β”‚   β”‚   β”œβ”€β”€ test_endpoint.py                  # Endpoint testing with analytics
β”‚   β”‚   β”œβ”€β”€ batch_transform.py                # Batch transform for bulk scoring
β”‚   β”‚   β”œβ”€β”€ pipeline_cli.py                   # Pipeline CLI commands
β”‚   β”‚   β”œβ”€β”€ inference_requirements.txt        # Custom handler dependencies (awswrangler)
β”‚   β”‚   β”œβ”€β”€ pipeline_steps/
β”‚   β”‚   β”‚   β”œβ”€β”€ preprocessing.py              # Data preprocessing (ScriptProcessor)
β”‚   β”‚   β”‚   β”œβ”€β”€ preprocessing_pyspark.py      # PySpark preprocessing alternative
β”‚   β”‚   β”‚   β”œβ”€β”€ train.py                      # XGBoost training script
β”‚   β”‚   β”‚   β”œβ”€β”€ evaluation.py                 # Model evaluation with quality gates
β”‚   β”‚   β”‚   β”œβ”€β”€ inference_monitoring.py       # Drift detection (PSI, KS, model drift)
β”‚   β”‚   β”‚   β”œβ”€β”€ inference.py                  # Inference script
β”‚   β”‚   β”‚   β”œβ”€β”€ lambda_deploy_endpoint.py     # Lambda for endpoint deployment
β”‚   β”‚   β”‚   β”œβ”€β”€ lambda_test_inference.py      # Lambda for inference testing
β”‚   β”‚   β”‚   β”œβ”€β”€ deploy_endpoint.py            # Endpoint deployment utilities
β”‚   β”‚   β”‚   β”œβ”€β”€ test_inference.py             # Inference test utilities
β”‚   β”‚   β”‚   β”œβ”€β”€ requirements_train.txt        # Training step dependencies
β”‚   β”‚   β”‚   β”œβ”€β”€ requirements_evaluation.txt   # Evaluation step dependencies
β”‚   β”‚   β”‚   └── requirements_preprocessing.txt # Preprocessing step dependencies
β”‚   β”‚   └── athena/
β”‚   β”‚       β”œβ”€β”€ athena_client.py              # Athena query operations
β”‚   β”‚       β”œβ”€β”€ athena_client_pyspark.py      # PySpark Athena integration
β”‚   β”‚       β”œβ”€β”€ data_migrator.py              # CSV β†’ Athena Iceberg migration
β”‚   β”‚       β”œβ”€β”€ iceberg_manager.py            # Iceberg table management
β”‚   β”‚       └── schema_definitions.py         # Table schema definitions
β”‚   β”œβ”€β”€ config/
β”‚   β”‚   β”œβ”€β”€ config.py                         # Configuration constants
β”‚   β”‚   └── config.yaml                       # Central configuration
β”‚   β”œβ”€β”€ monitoring/
β”‚   β”‚   β”œβ”€β”€ evidently_reports.py              # Evidently-based drift & classification reports
β”‚   β”‚   β”œβ”€β”€ monitor_model_performance.py      # Performance monitoring & alerts
β”‚   β”‚   β”œβ”€β”€ lambda_drift_monitor.py           # Automated drift monitoring (Evidently + MLflow)
β”‚   β”‚   β”œβ”€β”€ lambda_inference_logger.py        # SQS-to-Athena inference log consumption
β”‚   β”‚   β”œβ”€β”€ generate_drift_dataset.py         # Generate drifted test data
β”‚   β”‚   β”œβ”€β”€ simulate_ground_truth_from_athena.py # Ground truth simulator (dev/test)
β”‚   β”‚   β”œβ”€β”€ generate_ground_truth_confirmations.py # Generate test confirmations
β”‚   β”‚   └── update_ground_truth.py            # Merge ground truth into inference records
β”‚   β”œβ”€β”€ setup/
β”‚   β”‚   β”œβ”€β”€ setup_athena_tables.py            # Create Athena DB and tables
β”‚   β”‚   β”œβ”€β”€ setup_inference_logging.py        # SQS + Lambda inference logging
β”‚   β”‚   β”œβ”€β”€ setup_drift_monitoring.py         # Drift monitoring infrastructure
β”‚   β”‚   β”œβ”€β”€ setup_scheduled_inference.py      # EventBridge scheduled inference
β”‚   β”‚   β”œβ”€β”€ setup_scheduled_batch_transform.py # Scheduled batch transform
β”‚   β”‚   β”œβ”€β”€ upload_data_to_s3.py              # Upload local CSV data to S3
β”‚   β”‚   β”œβ”€β”€ create_or_update_sagemaker_role.py # IAM role setup
β”‚   β”‚   β”œβ”€β”€ create_lambda_role.py             # Lambda execution role
β”‚   β”‚   └── deploy_drift_monitoring.sh        # CI/CD deployment script
β”‚   β”œβ”€β”€ utils/
β”‚   β”‚   β”œβ”€β”€ aws_session.py                    # AWS session helpers
β”‚   β”‚   β”œβ”€β”€ aws_utils.py                      # AWS session and role utilities
β”‚   β”‚   β”œβ”€β”€ mlflow_utils.py                   # MLflow tracking helpers
β”‚   β”‚   └── visualization_utils.py            # Chart generation for MLflow
β”‚   └── governance/
β”‚       └── setup_quicksight_governance.py    # QuickSight governance dashboard setup
β”œβ”€β”€ notebooks/
β”‚   β”œβ”€β”€ 1_training_pipeline.ipynb              # Interactive pipeline control notebook
β”‚   β”œβ”€β”€ 2a_inference_monitoring.ipynb            # Monitoring & drift detection notebook
β”‚   β”œβ”€β”€ inference_monitoring_with_pipeline.ipynb # Pipeline-based automated drift monitoring with Evidently
β”‚   β”œβ”€β”€ 3_governance_dashboard.ipynb            # QuickSight governance dashboard setup
β”‚   β”œβ”€β”€ 4_optional_version_validation.ipynb  # Version consistency validation (MLflow, SageMaker, Athena)
β”‚   └── 5_optional_cleanup.ipynb             # Resource cleanup (Lambda, SageMaker, Athena, S3, IAM)
β”œβ”€β”€ data/
β”‚   β”œβ”€β”€ creditcard_predictions_final.csv      # Training data (284K rows, 30 features)
β”‚   β”œβ”€β”€ creditcard_ground_truth.csv           # Ground truth labels
β”‚   └── creditcard_drifted.csv                # Drifted data for testing
β”œβ”€β”€ docs/
β”‚   β”œβ”€β”€ MetaMonitoring.png                    # 11-step end-to-end architecture diagram
β”‚   β”œβ”€β”€ inference_monitoring_processflow.png  # 13-step inference monitoring flow
β”‚   β”œβ”€β”€ ARCHITECTURE_STEPS.md                 # Detailed descriptions of all 11 architecture steps
β”‚   β”œβ”€β”€ CRITICAL_CONFIG_SETTINGS.md           # Essential config.yaml settings guide
β”‚   β”œβ”€β”€ GROUND_TRUTH_FLOW.md                  # Two-table ground truth architecture
β”‚   β”œβ”€β”€ generate_architecture_diagram.py      # [Legacy] Generates architecture_diagram.png (AWS icons)
β”‚   β”œβ”€β”€ generate_inference_monitoring_diagram.py # [Legacy] Generates inference_monitoring_diagram.png (AWS icons)
β”‚   β”œβ”€β”€ generate_mlflow_evidently_diagram.py  # Generates mermaid-diagram-mlflow-evidently.png (AWS icons)
β”‚   β”œβ”€β”€ icons/                                # Official AWS Architecture Icons + third-party logos
β”‚   β”œβ”€β”€ guides/                               # Architecture diagrams and references
β”‚   β”‚   β”œβ”€β”€ MetaMonitoring.png                # Main architecture diagram (11 steps)
β”‚   β”‚   β”œβ”€β”€ inference_monitoring_processflow.png # Detailed monitoring flow (13 steps)
β”‚   β”‚   β”œβ”€β”€ architecture_diagram.png          # [Legacy] Programmatically generated architecture
β”‚   β”‚   β”œβ”€β”€ inference_monitoring_diagram.png   # [Legacy] Programmatically generated monitoring flow
β”‚   β”‚   └── mermaid-diagram-mlflow-evidently.png # MLflow + Evidently monitoring flow
β”‚   └── screenshots/                          # Screenshots and visual guides
β”‚       β”œβ”€β”€ DirectTestingInSGPlayground-custom-handler.png  # SageMaker Studio screenshot
β”‚       └── quicksight/                       # QuickSight dashboard screenshots
β”‚           β”œβ”€β”€ README.md                     # Detailed guide to understanding drift scores
β”‚           └── Quicksight-Governance-dashboard.pdf  # QuickSight governance dashboard
β”œβ”€β”€ main.py                                   # CLI entry point
β”œβ”€β”€ .env.example                              # Environment template
└── README.md                                 # This file

Architecture Diagrams

The main architecture diagrams are:

  • MetaMonitoring.png β€” 11-step end-to-end MLOps pipeline (hand-built in Excalidraw)
  • inference_monitoring_processflow.png β€” 13-step detailed monitoring flow (hand-built in Excalidraw)

Editable source files:

  • docs/guides/architecture_diagram.excalidraw β€” Open at excalidraw.com
  • docs/inference_monitoring_flow.excalidraw β€” Generated programmatically, editable in Excalidraw

Legacy Diagram Generation

Previous programmatically-generated diagrams (now superseded by hand-built versions) can be regenerated:

pip install diagrams
brew install graphviz  # macOS

python docs/generate_architecture_diagram.py
python docs/generate_inference_monitoring_diagram.py
python docs/generate_mlflow_evidently_diagram.py

Quick Start

Prerequisites

  • Python 3.12+
  • AWS account with SageMaker access
  • uv package manager

Installation

# Configure environment
cp .env.example .env
# Edit .env with your AWS settings

Upload Data Files to S3

The CSV data files (data/*.csv) are not checked into git. If you don't plan to use your own Athena tables for training and need local data to get started, use the generator script to produce synthetic datasets with the same schema and statistical properties:

# Generate all three CSVs (predictions, drifted, ground truth)
python data/generate_datasets.py

# Or generate individually
python data/generate_datasets.py --predictions
python data/generate_datasets.py --drifted
python data/generate_datasets.py --ground-truth

This creates files with a generated_ prefix. Rename them to use in the pipeline:

cd data
mv generated_creditcard_predictions_final.csv creditcard_predictions_final.csv
mv generated_creditcard_drifted.csv creditcard_drifted.csv
mv generated_creditcard_ground_truth.csv creditcard_ground_truth.csv

Verify the files are in place:

ls -lh data/*.csv
# Expected files:
#   creditcard_predictions_final.csv  (~50 MB, 284K rows, training data)
#   creditcard_ground_truth.csv       (ground truth labels)
#   creditcard_drifted.csv            (drifted data for testing)

Step 2: Upload to S3

# Preview what will be uploaded (dry run)
python -m src.setup.upload_data_to_s3 --dry-run

# Upload all CSV files to S3
python -m src.setup.upload_data_to_s3

# Upload a single file
python -m src.setup.upload_data_to_s3 --file data/creditcard_predictions_final.csv

This uploads the CSV files to s3://<DATA_S3_BUCKET>/fraud-detection/data/. The S3 paths are used by the Athena data migration step (main.py setup --migrate-data) and by the inference monitoring notebook for baseline drift detection.

Configuration (.env) - change these in .env to match your environment

# MLflow - Use ARN format for programmatic access
MLFLOW_TRACKING_URI=arn:aws:sagemaker:us-east-1:YOUR_ACCOUNT:mlflow-app/app-YOUR_ID
MLFLOW_EXPERIMENT_NAME=credit-card-fraud-detection-training
MLFLOW_MODEL_NAME=xgboost-fraud-detector

# AWS
AWS_DEFAULT_REGION=us-east-1
SAGEMAKER_EXEC_ROLE=arn:aws:iam::YOUR_ACCOUNT:role/SageMakerExecutionRole

# Athena
ATHENA_DATABASE=fraud_detection
DATA_S3_BUCKET=fraud-detection-data-lake-YOUR_ACCOUNT
ATHENA_OUTPUT_S3=s3://fraud-detection-data-lake-YOUR_ACCOUNT/athena-query-results/

# SQS
SQS_QUEUE_URL=<Set after configuring SQS>
SQS_QUEUE_NAME=fraud-inference-logs

LAMBDA_LOGGER_NAME=fraud-inference-log-consumer

Add S3 Permissions For Data Migration

Add the following inline policy to your execution role through the UI to give S3 access:

json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:CreateBucket",
        "s3:PutBucketVersioning",
        "s3:PutObject",
        "s3:GetObject",
        "s3:ListBucket",
        "s3:DeleteObject"
      ],
      "Resource": [
        "arn:aws:s3:::<DATA_S3_BUCKET>",
        "arn:aws:s3:::<DATA_S3_BUCKET>/*"
      ]
    }
  ]
}

Setup Infrastructure

# Upload CSV data to S3 (required before migration)
python -m src.setup.upload_data_to_s3

# Create S3 bucket, Athena database, Iceberg tables, and migrate CSV data
python main.py setup --migrate-data

This creates:

  • S3 bucket for data lake and model artifacts
  • Athena database fraud_detection
  • 5 Iceberg tables: training_data, inference_responses, ground_truth, ground_truth_updates, drifted_data
  • Migrates CSV data to Athena for pipeline access

Add Permissions For Lambda Access

Add the following inline policy to your execution role to allow it to manage Lambda functions.

{
  "Effect": "Allow",
  "Action": [
    "lambda:CreateFunction",
    "lambda:UpdateFunctionCode",
    "lambda:UpdateFunctionConfiguration",
    "lambda:GetFunction",
    "lambda:DeleteFunction",
    "lambda:InvokeFunction"
  ],
  "Resource": "arn:aws:lambda:us-east-1:YOUR_ACCOUNT:function:fraud-detection-*"
},
{
  "Effect": "Allow",
  "Action": "iam:PassRole",
  "Resource": "arn:aws:iam::YOUR_ACCOUNT:role/*",
  "Condition": {
    "StringEquals": {
      "iam:PassedToService": "lambda.amazonaws.com"
    }
  }
}

Add the following trust policy to allow Lambda to assume the execution role

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": [
          "sagemaker.amazonaws.com",
          "lambda.amazonaws.com"
        ]
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

Configure SQS + Lambda Permissions

Add the following permissions to allow the creation of an SQS queue and lambda function for inference logging to Athena:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "sqs:CreateQueue",
        "sqs:GetQueueUrl"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "sqs:ReceiveMessage",
        "sqs:DeleteMessage",
        "sqs:GetQueueAttributes",
        "sqs:SendMessage"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "lambda:CreateFunction",
        "lambda:UpdateFunctionCode",
        "lambda:GetFunction",
        "lambda:CreateEventSourceMapping"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": "iam:PassRole",
      "Resource": "<EXECUTION_ROLE>",
      "Condition": {
        "StringEquals": {
          "iam:PassedToService": "lambda.amazonaws.com"
        }
      }
    }
  ]
}

Add CloudWatch Permissions (for Drift Monitoring Dashboard & Alarms)

The CloudWatchLogsFullAccess managed policy only covers Logs. To publish custom metrics, create alarms, and build dashboards (Cell 40 in 2a_inference_monitoring.ipynb), add this inline policy to your execution role:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "cloudwatch:PutMetricData",
        "cloudwatch:PutMetricAlarm",
        "cloudwatch:DescribeAlarms",
        "cloudwatch:PutDashboard",
        "cloudwatch:GetDashboard",
        "cloudwatch:ListDashboards"
      ],
      "Resource": "*"
    }
  ]
}

Setup SQS + Lambda

uv run main.py setup-logging

Update the SQS_URL in .env with the created SQS queue.

Run Complete Pipeline

Option 1: Jupyter Notebook (Recommended)

# In SageMaker Studio, open:
notebooks/1_training_pipeline.ipynb

# Run cells sequentially:
# Cell 1-3: Setup and configuration
# Cell 4: Create/update pipeline definition
# Cell 5: Execute pipeline and wait for completion (~25 minutes)

Option 2: CLI

# Create pipeline
python main.py pipeline create --pipeline-name fraud-detection-pipeline

# Execute pipeline
python main.py pipeline start --pipeline-name fraud-detection-pipeline --wait

# Check status
python main.py pipeline status --pipeline-name fraud-detection-pipeline

Test Inference & Monitoring

# 1. Open monitoring notebook
# notebooks/2a_inference_monitoring.ipynb

# 2. Run inference tests (Cells 1-9 in the notebook)
# Or via CLI:
# uv run main.py test --endpoint-name fraud-detector-endpoint --num-samples 50

# 3. Simulate ground truth (for development/testing)
python -m src.monitoring.simulate_ground_truth_from_athena --accuracy 0.85

# 4. Apply ground truth updates
python -m src.monitoring.update_ground_truth --mode batch

# 5. Run monitoring & drift detection (Cells 27-40 in 2a_inference_monitoring.ipynb)
# Generates 8+ charts, calculates metrics, detects drift

Expected Output:

  • Predictions logged to Athena automatically
  • 100% ground truth coverage (simulated)
  • Performance metrics: ROC-AUC, precision, recall
  • Drift detection with visualizations
  • 8 charts uploaded to MLflow

Custom Inference Handler

Why a Custom Handler?

The pipeline deploys models with a custom inference handler instead of SageMaker's built-in XGBoost serving:

Feature Built-in XGBoost Custom Handler
Predictions βœ… βœ…
Input Format CSV, libsvm JSON (extensible)
Output Format Single probability Detailed predictions + probabilities
Athena Logging ❌ Manual βœ… Automatic
MLflow Integration ❌ βœ…
Performance Metrics ❌ βœ… Latency, confidence
Monitoring Flags ❌ βœ… High/low confidence detection
Production-Ready ⚠️ Simple use cases βœ… Enterprise-grade

Key Features

1. Automatic Athena Logging

Every inference request is logged with:

  • Prediction metadata: inference_id, timestamp, prediction, probabilities
  • Performance: latency (total, preprocessing, model), confidence scores
  • Transaction context: transaction_id, amount, customer_id
  • Model info: endpoint_name, model_version, mlflow_run_id

2. SQS + Lambda Async Writes

Inference logs are written asynchronously via SQS to avoid impacting prediction latency:

Custom Handler β†’ SQS (fire-and-forget, per prediction)
                   ↓
              SQS batches messages (10 msgs or 30s window)
                   ↓
              Lambda triggered (lambda_inference_logger.py)
                   ↓
              INSERT INTO fraud_detection.inference_responses
  • Zero latency impact β€” SQS send_message is fire-and-forget in the inference handler
  • Batched writes β€” Lambda receives up to 10 SQS records per invocation
  • Athena INSERT β€” Lambda builds a multi-row INSERT INTO statement and executes via Athena
  • Graceful failure β€” SQS retains messages for 24 hours if Lambda fails

3. JSON Input/Output Format

Request (30 features):

{
  "transaction_hour": 14,
  "transaction_day_of_week": 2,
  "transaction_amount": 149.62,
  "customer_age": 42,
  "customer_gender": 0,
  "distance_from_home_km": 5.2,
  "merchant_category_code": 5411,
  "chip_transaction": 1,
  "num_transactions_24h": 3,
  "credit_limit": 5000.0,
  "available_credit_ratio": 0.75
  // ... (all 30 training features)
}

Response:

{
  "predictions": [0],
  "probabilities": {
    "fraud": [0.1234],
    "non_fraud": [0.8766]
  }
}

Testing in SageMaker Studio Endpoint Playground

After deployment, you can test the endpoint directly in SageMaker Studio without writing code.

Access Path:

SageMaker Studio > Deployments > Endpoints > Fraud Detector Endpoint > Playground

Steps:

  1. Navigate to endpoint:

    • Open SageMaker Studio
    • Left sidebar: Deployments β†’ Endpoints
    • Click on fraud-detector-endpoint
    • Click "Playground" tab
  2. Configure request:

    • Content-Type: application/json
    • Request body: Paste JSON payload with 30 features
  3. Example payload:

    {
      "transaction_hour": 14,
      "transaction_day_of_week": 2,
      "transaction_amount": 149.62,
      "transaction_type_code": 1,
      "customer_age": 42,
      "customer_gender": 0,
      "customer_tenure_months": 36,
      "account_age_days": 1095,
      "distance_from_home_km": 5.2,
      "distance_from_last_transaction_km": 2.3,
      "time_since_last_transaction_min": 120,
      "online_transaction": 1,
      "international_transaction": 0,
      "high_risk_country": 0,
      "merchant_category_code": 5411,
      "merchant_reputation_score": 0.85,
      "chip_transaction": 1,
      "pin_used": 1,
      "card_present": 1,
      "cvv_match": 1,
      "address_verification_match": 1,
      "num_transactions_24h": 3,
      "num_transactions_7days": 12,
      "avg_transaction_amount_30days": 125.50,
      "max_transaction_amount_30days": 450.00,
      "velocity_score": 0.3,
      "recurring_transaction": 0,
      "previous_fraud_incidents": 0,
      "credit_limit": 5000.0,
      "available_credit_ratio": 0.75
    }
  4. Click "Test"

  5. Expected response:

    {
      "predictions": [0],
      "probabilities": {
        "fraud": [0.1234],
        "non_fraud": [0.8766]
      }
    }

Screenshot: See docs/screenshots/DirectTestingInSGPlayground-custom-handler.png for visual guide

When to use:

  • Quick manual testing without code
  • Debugging endpoint issues
  • Demonstrating to stakeholders
  • Validating deployment after pipeline execution
  • Generating sample payloads for documentation

Note: The playground sends requests to your deployed endpoint, so predictions will be automatically logged to Athena (custom handler feature).

Implementation Location

  • Inference Handler (SQS): src/train_pipeline/pipeline_steps/inference.py β€” sends each prediction to SQS
  • Lambda Consumer: lambda_inference_logger.py β€” reads SQS batches, writes to Athena
  • Setup Script: src/setup/setup_inference_logging.py β€” creates SQS queue + Lambda + event source mapping
  • Legacy Handler (Direct Athena): src/train_pipeline/inference_handler.py β€” buffered awswrangler writes (alternative)
  • Dependencies: src/train_pipeline/inference_requirements.txt
  • Pipeline Integration: src/train_pipeline/pipeline.py

How it works (SQS path):

# In the inference handler (predict_fn)
sqs.send_message(
    QueueUrl=SQS_QUEUE_URL,
    MessageBody=json.dumps({
        'inference_id': str(uuid.uuid4()),
        'request_timestamp': datetime.utcnow().isoformat(),
        'prediction': int(predictions[idx]),
        'probability_fraud': fraud_prob,
        # ... 22 more fields
    }),
)

# Lambda consumer (lambda_inference_logger.py)
# Triggered by SQS with batch of up to 10 messages
# Builds: INSERT INTO fraud_detection.inference_responses VALUES (row1), (row2), ...
# Executes via athena.start_query_execution()

SageMaker Pipeline Workflow

Pipeline Steps

  1. Preprocess - Read from Athena, validate, encode categorical, split train/test (80/20)
  2. Train - XGBoost with automatic class imbalance handling (scale_pos_weight)
  3. Evaluate - Calculate ROC-AUC, PR-AUC, precision, recall, F1, confusion matrix
  4. Quality Gate - Register model only if ROC-AUC β‰₯ 0.85 AND PR-AUC β‰₯ 0.50
  5. Deploy - Create endpoint with custom handler and Athena logging

Create Pipeline

Via Notebook:

# In 1_training_pipeline.ipynb Cell 4
result = pipeline_builder.upsert_pipeline(
    pipeline_name='fraud-detection-pipeline',
    pipeline_display_name='Credit Card Fraud Detection Pipeline'
)

Via CLI:

python main.py pipeline create --pipeline-name fraud-detection-pipeline

Execute Pipeline

Via Notebook:

# Cell 5: Execute and wait
execution = pipeline.start()
execution.wait(delay=60, max_attempts=30)

Via CLI:

# With waiting
python main.py pipeline start --pipeline-name fraud-detection-pipeline --wait

# Without waiting (monitor in console)
python main.py pipeline start --pipeline-name fraud-detection-pipeline

Monitor Execution

AWS Console:

SageMaker β†’ Pipelines β†’ fraud-detection-pipeline β†’ Executions

CLI:

# Get latest execution status
python main.py pipeline status --pipeline-name fraud-detection-pipeline

# List all executions
python main.py pipeline list-executions --pipeline-name fraud-detection-pipeline

Expected Duration: ~25 minutes

  • Preprocess: 3-5 min
  • Train: 10-15 min
  • Evaluate: 2-3 min
  • Deploy: 5-7 min

MLflow Integration

Configuration

ARN Format (Programmatic Access):

# Use ARN for SDK and pipeline access
MLFLOW_TRACKING_URI = "arn:aws:sagemaker:us-east-1:<ACCOUNT_ID>:mlflow-app/app-ABC123"
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)

HTTPS Format (Web UI Only):

# Use HTTPS URL only for opening in browser
https://abc123def.mlflow-tracking.us-east-1.sagemaker.aws/

Training Integration

The training step automatically logs to MLflow:

# In training step
mlflow.xgboost.autolog()  # Automatic parameter and metric logging

with mlflow.start_run():
    model.fit(X_train, y_train)
    # MLflow automatically logs:
    # - Hyperparameters (max_depth, learning_rate, etc.)
    # - Training metrics (logloss, error)
    # - Model artifacts
    # - Feature importance
    # - Confusion matrix

Accessing MLflow UI

From SageMaker Studio:

1. Left sidebar β†’ Partner AI Apps β†’ MLflow
2. Click on your MLflow app
3. Opens in new tab with authentication

Experiments:

  • credit-card-fraud-detection-training - Training runs
  • credit-card-fraud-detection-inference - Inference monitoring
  • credit-card-fraud-detection-batch - Batch transform jobs

Model Registry

Models are automatically registered on successful evaluation:

# In evaluation step (if ROC-AUC β‰₯ 0.85)
mlflow.register_model(
    model_uri=f"runs:/{run_id}/model",
    name="xgboost-fraud-detector"
)

Model Versions:

  • Version 1, 2, 3... (incremented automatically)
  • Stages: None β†’ Staging β†’ Production β†’ Archived
  • Tags: training_date, pipeline_execution_id, roc_auc, pr_auc

Inference Monitoring & Drift Detection

Monitoring Architecture

Inference (T+0) β†’ SQS β†’ Lambda β†’ Athena: inference_responses (ground_truth=NULL)
                                                    ↓
Ground Truth Capture (T+1 to T+30)        β†’ ground_truth_updates
                                                    ↓
Batch MERGE                               β†’ inference_responses (ground_truth populated)
                                                    ↓
Monitoring                                β†’ Calculate metrics where ground_truth IS NOT NULL

Ground Truth Capture

πŸ”” IMPORTANT: Production Implementation Required

The current implementation uses simulation for development/testing. In production, replace with actual business processes:

Production Sources:

  • Fraud Investigation Systems: Confirmed fraud cases from investigation team
  • Chargeback Notifications: Disputed transactions from payment processors
  • Customer Complaints: Fraud reports from customer service
  • Merchant Reports: Suspicious activity flagged by merchants
  • Law Enforcement: Cases confirmed by authorities

Implementation Pattern:

# Example: Fraud investigation system integration
def capture_investigation_result(transaction_id, is_fraud, source):
    """
    Called when fraud investigation completes.
    Replace simulate_ground_truth_from_athena.py with this in production.
    """
    ground_truth_update = {
        'transaction_id': transaction_id,
        'inference_id': lookup_inference_id(transaction_id),
        'actual_fraud': is_fraud,
        'confirmation_timestamp': datetime.now(),
        'confirmation_source': source,  # e.g., 'fraud_investigation'
        'investigation_notes': get_investigation_notes(transaction_id)
    }

    # Write to ground_truth_updates table
    write_to_athena(ground_truth_update)

Development/Testing (Current):

# Simulates realistic ground truth with configurable accuracy
python -m src.monitoring.simulate_ground_truth_from_athena --accuracy 0.85

This creates realistic confirmations:

  • Fraud cases: 1-7 days confirmation delay
  • Non-fraud: 1-30 days confirmation delay
  • Configurable accuracy (default 85%)
  • Realistic confirmation sources

Applying Ground Truth Updates

# Batch mode: Process all pending updates
python -m src.monitoring.update_ground_truth --mode batch

# Streaming mode: Process recent updates (last 24 hours)
python -m src.monitoring.update_ground_truth --mode streaming --window-hours 24

This performs Athena MERGE to update ground_truth column in inference_responses.

Performance Monitoring

# Monitor last 30 days
python -m src.monitoring.monitor_model_performance --days 30

# With custom alert threshold
python -m src.monitoring.monitor_model_performance \
  --days 30 \
  --alert-threshold 0.80 \
  --endpoint fraud-detector-prod

Output:

Ground Truth Coverage:
  Total predictions: 50,000
  With ground truth: 12,500 (25.00%)
  Avg days to confirmation: 8.5

Overall Performance (last 30 days):
  ROC-AUC: 0.8742
  PR-AUC: 0.6521
  Precision: 0.7245
  Recall: 0.6891
  F1-Score: 0.7063

βœ“ No performance degradation detected

Drift Detection Methods

All drift detection is powered by Evidently AI (v0.7.x). The notebook and Lambda both use evidently_reports.py which wraps Evidently's DataDriftPreset and ClassificationPreset.

Data Drift (Feature Distribution Changes)

Evidently's DataDriftPreset automatically selects the appropriate statistical test per feature (KS for numeric, chi-square for categorical) and computes drift scores. The interactive HTML report includes per-feature distribution comparisons, PSI values, and statistical test results.

Legacy reference: The calculate_psi() and calculate_ks_statistic() functions in lambda_drift_monitor.py are preserved as legacy examples showing how to compute these metrics manually without Evidently.

Population Stability Index (PSI):

PSI measures how much a feature's distribution has shifted between the baseline (training) period and the current (inference) period.

Formula:

PSI = Ξ£ (actual_pct_i - expected_pct_i) Γ— ln(actual_pct_i / expected_pct_i)

Where expected_pct_i and actual_pct_i are the proportion of observations in bin i for the baseline and current distributions, respectively.

How it's computed (see inference_monitoring.py:86-124):

  1. Create bin edges from the baseline (training) data using percentile-based breakpoints (10 bins by default)
  2. Histogram both baseline and current data into the same bins
  3. Normalize each histogram to proportions (percentage of total in each bin)
  4. Apply a floor of 0.0001 to avoid ln(0) or division by zero
  5. Sum the PSI contribution from each bin: (actual% - expected%) Γ— ln(actual% / expected%)

Interpretation thresholds:

PSI Value Interpretation Action
< 0.1 No significant shift None
0.1 – 0.2 Moderate shift Monitor closely
β‰₯ 0.2 Significant shift Investigate / retrain

Avg PSI (Population-Level Drift Score):

Avg PSI is the simple arithmetic mean of per-feature PSI values across all monitored features. It provides a single scalar summarizing overall dataset drift.

# From detect_data_drift() β†’ summary
avg_psi = mean(psi_feature_1, psi_feature_2, ..., psi_feature_N)

This metric is logged to MLflow as drift_avg_psi alongside drift_max_psi and drift_median_psi.

A feature is flagged as drifted if either its PSI β‰₯ 0.2 or its KS test p-value < 0.05.

Feature Extraction for Drift Detection:

The drift detection process (Cell 31 in 2a_inference_monitoring.ipynb) analyzes all 30 training features by parsing the input_features JSON column:

import json

# Parse JSON to extract all features from inference data
feature_rows = []
for idx, row in inference_raw.iterrows():
    try:
        # Parse the input_features JSON string
        features_dict = json.loads(row['input_features'])

        # Extract exactly the 30 training features
        feature_values = {feat: features_dict.get(feat, np.nan) for feat in TRAINING_FEATURES}
        feature_rows.append(feature_values)
    except Exception as e:
        print(f'  ⚠ Failed to parse row {idx}: {e}')
        continue

# Create DataFrame with all 30 features
inference_numeric = pd.DataFrame(feature_rows)

# Run drift detection on ALL features
drift_results = detect_data_drift(
    baseline_data=baseline_numeric[common_features],  # 284K rows Γ— 30 features
    current_data=inference_numeric,                   # Inference rows Γ— 30 features
    feature_names=common_features,                    # All 30 features
    threshold_psi=0.2,
    threshold_ks=0.05,
)

Expected Output:

================================================================================
DATA DRIFT ANALYSIS SUMMARY
================================================================================
  Total features analyzed: 30          ← All features extracted from JSON
  Drifted features: 5
  Drift percentage: 16.7%
  Avg PSI: 0.0842
  Max PSI: 0.2145

⚠ DRIFTED FEATURES (5):
================================================================================
  πŸ”΄ CRITICAL transaction_amount: PSI=0.2145
  🟠 MODERATE customer_age: PSI=0.1523
  🟠 MODERATE distance_from_home_km: PSI=0.1205
  🟑 MINOR online_transaction: PSI=0.0923
  🟑 MINOR merchant_reputation_score: PSI=0.0812

Chi-Square Test:

# For categorical features
chi2_stat, p_value = chi2_test(reference_counts, current_counts)

# Alert if p_value < 0.05 (distribution changed significantly)

Kolmogorov-Smirnov (KS) Test:

# For continuous feature distributions
ks_stat, p_value = ks_test(reference_values, current_values)

# Alert if p_value < 0.05

Statistical Tests: KS vs Wasserstein (Configuration Guide)

This project uses Evidently for advanced drift detection with configurable statistical tests. For fraud detection, we default to the Kolmogorov-Smirnov (KS) test over Wasserstein distance.

Why KS Test for Fraud Detection?

KS Test Advantages:

  • Tail Sensitivity ⭐ - Detects maximum difference in CDFs, crucial for catching rare fraud pattern changes
  • Statistical Significance - Returns p-value (e.g., p < 0.05 = 95% confidence), helps distinguish real drift from noise
  • Distribution-Free - No assumptions about underlying distributions (robust to non-normal data)
  • Shape Detection - Catches changes in distribution shape, not just location/scale shifts
  • Interpretable - KS statistic = max vertical distance between CDFs (0-1 scale)

Why Not Wasserstein?

  • Less Tail-Sensitive - Can miss subtle changes in extreme values (where fraud patterns emerge)
  • No P-Value - Harder to set universal thresholds across different features
  • Mean-Focused - Better for overall distribution shifts than detecting new fraud tactics
  • Computationally Heavier - O(n log n) vs O(n) for KS test

Real-World Example:

Scenario: New fraud ring targeting $500-$600 transactions

Baseline:  Transactions spread $0-$1000 (normal distribution)
Current:   Slight spike at $500-600 (2% of transactions)

Wasserstein: 0.08 (small overall shift, might not alert)
KS Test:     0.15, p=0.001 (significant tail change, alerts)

For fraud detection, catching rare pattern changes early (KS strength) is more critical than measuring overall distribution distance (Wasserstein strength).

Configuration

Configure the statistical test in .env:

# Statistical test for numerical features
# Options: 'ks' (default), 'wasserstein', 'kl_div', 'psi', 'jensenshannon'
EVIDENTLY_NUM_STAT_TEST=ks

# KS test thresholds
KS_DRIFT_THRESHOLD=0.1          # Moderate drift threshold
KS_PVALUE_THRESHOLD=0.05        # Statistical significance (95% confidence)
KS_ENABLE_CDF_PLOTS=true        # Generate CDF comparison visualizations
KS_MAX_FEATURES_TO_PLOT=10      # Top N drifted features to visualize

# Categorical feature test (unchanged)
EVIDENTLY_CAT_STAT_TEST=chi_square

Configuration in Code:

# src/config/config.py (lines 202-210)
EVIDENTLY_NUM_STAT_TEST = os.getenv('EVIDENTLY_NUM_STAT_TEST', 'ks')
KS_DRIFT_THRESHOLD = float(os.getenv('KS_DRIFT_THRESHOLD', '0.1'))
KS_PVALUE_THRESHOLD = float(os.getenv('KS_PVALUE_THRESHOLD', '0.05'))
KS-Specific Visualizations

When KS test is enabled (EVIDENTLY_NUM_STAT_TEST=ks), additional visualizations are generated:

1. CDF Comparison Plots - Shows empirical CDFs for baseline vs current distributions with maximum KS distance marked

2. KS Statistics Heatmap - Two-panel visualization:

  • Left: KS statistic bars (0-1 scale, color-coded by severity)
  • Right: -log10(p-value) bars (statistical significance)

3. Enhanced MLflow Metrics:

# Aggregated KS metrics
ks_mean_ks_statistic
ks_max_ks_statistic
ks_median_ks_statistic
ks_features_moderate_drift    # 0.1 ≀ KS < 0.2
ks_features_high_drift        # KS β‰₯ 0.2
ks_features_significant       # p-value < 0.05
Dual-Threshold Alerting

The KS test uses both the KS statistic and p-value for robust drift detection:

# Drift detection logic
if ks_stat > 0.2 and p_value < 0.05:
    severity = "CRITICAL"    # Immediate investigation
elif ks_stat > 0.1 and p_value < 0.05:
    severity = "MODERATE"    # Monitor closely
elif ks_stat > 0.1 or p_value < 0.05:
    severity = "WARNING"     # Single threshold met
else:
    severity = "NORMAL"

This dual-threshold approach reduces false positives while maintaining high sensitivity to real drift.

Switching to Wasserstein (If Needed)

To use Wasserstein distance instead:

# In .env
EVIDENTLY_NUM_STAT_TEST=wasserstein
EVIDENTLY_DRIFT_THRESHOLD=0.2    # Overall drift threshold

When to use Wasserstein:

  • General ML pipelines (non-fraud use cases)
  • Measuring overall distribution distance
  • When p-values are not required
  • Benchmarking against academic papers using EMD

Best Practice: Monitor both metrics and compare. If both spike β†’ high confidence drift. If only KS spikes β†’ investigate tail changes.

Concept Drift (Prediction-Target Relationship Changes)

Metrics Tracked:

  • ROC-AUC degradation (alert if drops > 5%)
  • Precision/Recall changes
  • False positive rate increase
  • False negative rate increase
  • High confidence error rate

Interactive Monitoring (Notebook)

Evidently Reports Logged to MLflow:

  1. evidently_reports/data_drift_*.html β€” Interactive data drift dashboard with per-feature PSI, KS, distribution comparisons
  2. evidently_reports/classification_*.html β€” Interactive classification dashboard with ROC curve, PR curve, confusion matrix, accuracy, F1
  3. drift_reports/drift_summary_*.json β€” Structured JSON summary

All reports are logged to MLflow experiment: credit-card-fraud-detection-monitoring

Visualizations Quick Reference

Chart Purpose Location When Generated
Preprocessing
Class Distribution Verify imbalance handling MLflow Training Run During preprocessing
Feature Correlations Identify multicollinearity MLflow Training Run During preprocessing
Missing Values Data quality check MLflow Training Run During preprocessing
Feature Distributions Understand data MLflow Training Run During preprocessing
Training
Training Loss Curve Convergence check MLflow Training Run During training
Feature Importance Top predictive features MLflow Training Run After training
ROC Curve Model discrimination MLflow Training Run During evaluation
Precision-Recall Curve Performance at thresholds MLflow Training Run During evaluation
Confusion Matrix Classification accuracy MLflow Training Run During evaluation
Monitoring (Evidently)
Data Drift HTML Report Per-feature drift analysis (PSI, KS, distributions) MLflow evidently_reports/ 2a_inference_monitoring.ipynb Cell 37
Classification HTML Report Model performance (ROC, PR, confusion matrix, F1) MLflow evidently_reports/ 2a_inference_monitoring.ipynb Cell 37
Drift Summary JSON Structured drift summary MLflow drift_reports/ 2a_inference_monitoring.ipynb Cell 37

Accessing in MLflow:

1. Open MLflow UI (SageMaker Studio β†’ Partner AI Apps)
2. Select experiment:
   - Training: credit-card-fraud-detection-training
   - Monitoring: credit-card-fraud-detection-monitoring
3. Click on run
4. Scroll to "Artifacts" section
5. Evidently HTML reports available under:
   - evidently_reports/data_drift_*.html (open in browser for interactive dashboard)
   - evidently_reports/classification_*.html (open in browser for interactive dashboard)
   - drift_reports/drift_summary_*.json

Baseline Data for Drift Detection

Drift detection compares current inference feature distributions against a baseline. The baseline should represent the data the model was trained on.

Current approach (2a_inference_monitoring.ipynb):

baseline_df = pd.read_csv(CSV_TRAINING_DATA)

This loads the CSV file directly from disk. Since the same data has already been migrated to the Athena training_data table (via main.py setup --migrate-data), the baseline can alternatively be loaded from Athena using PySpark or boto3:

Option 1: PySpark (for large-scale processing):

from athena.athena_client_pyspark import AthenaClientPySpark

client = AthenaClientPySpark(database="fraud_detection")
baseline_spark_df = client.read_table("training_data")
baseline_df = baseline_spark_df.toPandas()  # Convert to pandas for drift analysis

Option 2: Boto3 + Pandas (for monitoring/small queries):

import boto3
import pandas as pd
import io
import time

def read_athena_query(sql, database, output_location):
    """Execute Athena query and return pandas DataFrame."""
    athena = boto3.client('athena')
    s3 = boto3.client('s3')

    # Start query execution
    response = athena.start_query_execution(
        QueryString=sql,
        QueryExecutionContext={'Database': database},
        ResultConfiguration={'OutputLocation': output_location}
    )
    execution_id = response['QueryExecutionId']

    # Wait for completion
    while True:
        status = athena.get_query_execution(QueryExecutionId=execution_id)
        state = status['QueryExecution']['Status']['State']
        if state in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
            break
        time.sleep(1)

    if state != 'SUCCEEDED':
        raise Exception(f"Query failed: {state}")

    # Download results from S3
    result_s3_path = status['QueryExecution']['ResultConfiguration']['OutputLocation']
    bucket, key = result_s3_path.replace("s3://", "").split("/", 1)
    obj = s3.get_object(Bucket=bucket, Key=key)

    return pd.read_csv(io.BytesIO(obj['Body'].read()))

# Usage
baseline_df = read_athena_query(
    sql="SELECT * FROM training_data LIMIT 10000",
    database="fraud_detection",
    output_location="s3://your-bucket/athena-results/"
)

Why use the Athena training_data table as baseline:

  • Single source of truth β€” the same table the training pipeline reads from, so schema and column types are guaranteed to match
  • No local file dependency β€” works in any SageMaker environment without needing the CSV on disk
  • Consistent with pipeline β€” the preprocessing step reads from training_data via PySpark, so monitoring and training reference the same data
  • Versioned via Iceberg β€” if the training data is updated (new features, augmented samples), the Athena table reflects that without needing to re-distribute CSV files

Note: For the current PoC, CSV loading is sufficient. In production, the Athena table is the preferred source since it stays in sync with whatever the training pipeline consumes. awswrangler has been deprecated in favor of PySpark (for large-scale processing) and boto3 (for monitoring queries).

Automated Drift Monitoring

NEW: Automated drift detection with EventBridge, Lambda, and SNS email alerts.

Overview

The system automatically monitors for data drift and model performance drift, sending email alerts when thresholds are exceeded.

Architecture:

EventBridge Rule β†’ Lambda Function β†’ SNS Topic β†’ Email
(daily at 2am)     (drift detection)   (alerts)     (ops team)
                         ↓
                  Athena Data Lake
                (inference_responses)

Quick Setup

Option 1: Using Notebook (Recommended)

  1. Open notebooks/2a_inference_monitoring.ipynb
  2. Navigate to Section 6: Automated Drift Monitoring Setup
  3. Run cells 6.1-6.3 to deploy infrastructure
  4. Confirm email subscription

Option 2: Using CI/CD Script

export ALERT_EMAIL="ops@example.com"
export ATHENA_DATABASE="fraud_detection"
export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
export ATHENA_OUTPUT_S3="s3://fraud-detection-data-lake-skoppar-${AWS_ACCOUNT_ID}/athena-query-results/"

bash src/setup/deploy_drift_monitoring.sh

Option 3: Interactive Setup

python src/setup/setup_drift_monitoring.py

What Gets Created

  1. SNS Topic (fraud-detection-drift-alerts) - Email notifications
  2. Lambda Function (fraud-detection-drift-monitor) - Drift detection logic
  3. IAM Role - Lambda permissions (Athena, S3, SNS)
  4. EventBridge Rule (fraud-detection-drift-check) - Daily schedule (2 AM UTC)

Monitoring Capabilities

Data Drift Detection:

  • Engine: Evidently DataDriftPreset (KS, PSI, and other statistical tests)
  • Scope: All 30 training features
  • Output: Interactive HTML report + per-column drift scores
  • Method: Compares recent 24h inference data to training baseline

Model Performance Drift:

  • Engine: Evidently ClassificationPreset (accuracy, F1, ROC, PR, confusion matrix)
  • Threshold: >5% ROC-AUC degradation (configurable)
  • Method: Compares current performance to baseline (0.92)
  • Requires: Ground truth labels for predictions

MLflow Integration:

  • Automatic Logging: All drift metrics and Evidently HTML reports logged to MLflow on each check
  • Experiment: fraud-detection-drift_monitoring
  • Metrics Logged:
    • Per-feature drift scores (drift_score_<feature_name>)
    • Aggregate drift statistics (drifted_columns_count, drifted_columns_share)
    • Model performance metrics (ROC-AUC, accuracy, precision, recall)
    • Evidently classification metrics (evidently_accuracy, evidently_f1, etc.)
    • Sample sizes and detection flags
  • Artifacts:
    • evidently_reports/data_drift_*.html β€” Interactive data drift dashboard
    • evidently_reports/classification_*.html β€” Interactive classification dashboard
    • drift_reports/drift_summary_*.json β€” Structured JSON summary
  • Configuration: Set MLFLOW_TRACKING_URI environment variable in Lambda

Email Alert Example

================================================================================
ML MODEL DRIFT ALERT
================================================================================
Time: 2026-02-24 02:00:15
Detection Engine: Evidently AI

πŸ”΄ DATA DRIFT DETECTED (Evidently DataDriftPreset)
Features Analyzed: 30
Drifted Features: 5 (16.7%)
Drifted Columns Share: 16.7%

Top Drifted Features (by drift score):
  - transaction_amount: drift_score=0.0012
  - customer_age: drift_score=0.0034
  - distance_from_home_km: drift_score=0.0089

πŸ”΄ MODEL PERFORMANCE DRIFT DETECTED (Evidently ClassificationPreset)
Baseline ROC-AUC: 0.9200
Current ROC-AUC: 0.5713
Degradation: 0.3487 (37.9%)

RECOMMENDED ACTIONS:
1. Review Evidently HTML reports in MLflow monitoring experiment
2. Investigate root cause
3. Consider retraining with recent data
4. Review decision thresholds
================================================================================

Configuration

Thresholds (customizable):

export DATA_DRIFT_THRESHOLD=0.2   # PSI threshold
export MODEL_DRIFT_THRESHOLD=0.05  # 5% performance degradation
export BASELINE_ROC_AUC=0.92      # Expected model performance
export MLFLOW_TRACKING_URI="arn:aws:sagemaker:us-east-1:ACCOUNT:mlflow-app/app-ID"  # For MLflow logging

Schedule Options:

# Daily at 2 AM UTC (default)
cron(0 2 * * ? *)

# Every 6 hours
cron(0 */6 * * ? *)

# Every Monday at 9 AM
cron(0 9 ? * MON *)

Testing

Manual Trigger:

# Via AWS CLI
aws lambda invoke \
    --function-name fraud-detection-drift-monitor \
    output.json

# Via notebook (Cell 6.4)
lambda_client.invoke(FunctionName='fraud-detection-drift-monitor')

View Logs:

aws logs tail /aws/lambda/fraud-detection-drift-monitor --follow

Files

  • src/drift_monitoring/lambda_drift_monitor.py - Lambda function code
  • src/drift_monitoring/lambda_inference_logger.py - Lambda inference logger
  • src/drift_monitoring/generate_drift_dataset.py - Generate test drift data
  • src/setup/setup_drift_monitoring.py - Interactive setup
  • src/setup/deploy_drift_monitoring.sh - CI/CD deployment script
  • notebooks/2a_inference_monitoring.ipynb - Section 6 (setup cells)

Thresholds Explained

Data Drift (PSI Thresholds):

PSI Value Interpretation Action
< 0.1 No significant shift None
0.1 - 0.2 Moderate shift Monitor closely
β‰₯ 0.2 Significant shift Alert triggered

Model Drift (Performance Degradation):

Degradation Interpretation Action
< 3% Normal variance None
3-5% Minor degradation Monitor
> 5% Significant degradation Alert triggered

Updating Configuration

Change Thresholds:

# In 2a_inference_monitoring.ipynb, Cell 6.6
lambda_client.update_function_configuration(
    FunctionName='fraud-detection-drift-monitor',
    Environment={
        'Variables': {
            'DATA_DRIFT_THRESHOLD': '0.15',  # More sensitive
            'MODEL_DRIFT_THRESHOLD': '0.03',  # 3% degradation
            # ... other vars
        }
    }
)

Change Schedule:

aws events put-rule \
    --name fraud-detection-drift-check \
    --schedule-expression "cron(0 */6 * * ? *)"  # Every 6 hours

Disable/Enable:

# In notebook or via AWS CLI
events = boto3.client('events')

# Disable
events.disable_rule(Name='fraud-detection-drift-check')

# Enable
events.enable_rule(Name='fraud-detection-drift-check')

Troubleshooting

No Emails Received:

  1. Check email confirmation - did you click the SNS subscription link?
  2. Check spam folder
  3. Verify SNS subscription status:
    aws sns list-subscriptions-by-topic \
        --topic-arn arn:aws:sns:us-east-1:{account}:fraud-detection-drift-alerts

Lambda Errors:

# Check CloudWatch logs
aws logs tail /aws/lambda/fraud-detection-drift-monitor --since 1h

Common issues:

  • Insufficient IAM permissions
  • Athena query timeout (increase Lambda timeout)
  • Not enough inference samples (need β‰₯100 samples)

No Drift Detected:

  • Insufficient recent data (Lambda looks at last 24 hours)
  • No ground truth for model drift (need labeled predictions)
  • Drift below thresholds (check logs for actual PSI values)

Cleanup

To remove all drift monitoring resources:

# Delete EventBridge rule
aws events remove-targets --rule fraud-detection-drift-check --ids 1
aws events delete-rule --name fraud-detection-drift-check

# Delete Lambda function
aws lambda delete-function --function-name fraud-detection-drift-monitor

# Delete IAM role and policies
aws iam detach-role-policy \
    --role-name fraud-detection-drift-monitor-role \
    --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
aws iam delete-role-policy \
    --role-name fraud-detection-drift-monitor-role \
    --policy-name SNSPublishPolicy
aws iam delete-role --role-name fraud-detection-drift-monitor-role

# Delete SNS topic
aws sns delete-topic --topic-arn arn:aws:sns:us-east-1:{account}:fraud-detection-drift-alerts

Best Practices

  1. Set Appropriate Thresholds: Start with defaults (PSI=0.2, 5% degradation), adjust based on your use case
  2. Monitor Regularly: Review CloudWatch logs weekly to understand drift patterns
  3. Investigate Quickly: When alerts fire, investigate within 24 hours
  4. Update Baseline: After retraining, update BASELINE_ROC_AUC in Lambda environment
  5. Test Monthly: Manually trigger Lambda to ensure monitoring is working
  6. Keep Email List Current: Update SNS subscriptions when team members change
  7. Document Investigations: Log drift investigations in MLflow or tracking system

Advanced Configuration

Lambda Redeployment Control

By default, 2a_inference_monitoring.ipynb checks if Lambda functions already exist before deploying them. This prevents unnecessary redeployment during notebook reruns.

Configuration Flag (Cell 26):

# Set to True to force redeployment even if Lambda exists
REDEPLOY_LAMBDAS = False  # Default: skip if exists

Behavior:

  • False (default): Checks if Lambda exists, skips deployment if found
  • True: Always redeploys Lambda functions, even if they already exist

Use Cases:

  • Set to True when you've modified Lambda code (e.g., lambda_drift_monitor.py, lambda_inference_logger.py)
  • Keep False for routine notebook reruns to avoid unnecessary deployments and IAM role recreation

Affected Lambdas:

  1. fraud-monitoring-results-writer (Cell 28) - Writes drift monitoring results to Athena
  2. fraud-detection-drift-monitor (Cell 60) - Performs automated drift detection

Drift Simulation Configuration

The ground truth simulator allows configurable drift scenarios for testing the monitoring system. This is critical for validating that drift detection triggers correctly before deploying to production.

Configuration (Cell 15 in 2a_inference_monitoring.ipynb):

# Ground Truth Simulation Configuration
SIM_ACCURACY = 0.85              # Base model accuracy (0.0-1.0)

# Feature drift (manifests as accuracy degradation)
SIM_FEATURE_DRIFT_MAG = 0.0      # How much features drift (0.0-1.0, semantic)
SIM_FEATURE_DRIFT_COUNT = 0      # Number of features drifting (semantic)
SIM_FEATURE_DRIFT_IMPACT = 0.0   # Accuracy reduction from feature drift (0.0-1.0)

# Model drift (direct accuracy degradation)
SIM_MODEL_DRIFT_MAG = 0.0        # Model accuracy degradation (0.0-1.0)

How It Works:

  • Base Accuracy: Starting model performance (default 85%)
  • Feature Drift: Simulates feature distribution changes that degrade model accuracy
    • SIM_FEATURE_DRIFT_MAG and SIM_FEATURE_DRIFT_COUNT are semantic parameters (documentation only)
    • SIM_FEATURE_DRIFT_IMPACT directly reduces effective accuracy (e.g., 0.10 = 10% reduction)
  • Model Drift: Simulates direct model performance degradation
    • SIM_MODEL_DRIFT_MAG directly reduces effective accuracy (e.g., 0.15 = 15% reduction)
  • Effective Accuracy: base_accuracy - feature_drift_impact - model_drift_mag (minimum 50%)

Testing Scenarios:

Scenario 1 - No Drift (Baseline):

SIM_ACCURACY = 0.85
SIM_FEATURE_DRIFT_IMPACT = 0.0
SIM_MODEL_DRIFT_MAG = 0.0
# Effective accuracy: 85%
# Expected: No drift alerts

Scenario 2 - Feature Drift:

SIM_ACCURACY = 0.85
SIM_FEATURE_DRIFT_MAG = 0.5      # 50% drift magnitude (semantic)
SIM_FEATURE_DRIFT_COUNT = 10     # 10 features drifting (semantic)
SIM_FEATURE_DRIFT_IMPACT = 0.10  # 10% accuracy reduction
SIM_MODEL_DRIFT_MAG = 0.0
# Effective accuracy: 75%
# Expected: Data drift detected, accuracy degradation alert

Scenario 3 - Model Drift:

SIM_ACCURACY = 0.85
SIM_FEATURE_DRIFT_IMPACT = 0.0
SIM_MODEL_DRIFT_MAG = 0.15       # 15% degradation
# Effective accuracy: 70%
# Expected: Model drift alert, significant accuracy drop

Scenario 4 - Combined Drift:

SIM_ACCURACY = 0.85
SIM_FEATURE_DRIFT_IMPACT = 0.08  # 8% from features
SIM_MODEL_DRIFT_MAG = 0.07       # 7% from model
# Effective accuracy: 70%
# Expected: Both data drift and model drift detected

Production Replacement: Replace GroundTruthSimulator with actual fraud investigation system that writes to ground_truth_updates table:

  • Chargeback notifications
  • Customer fraud reports
  • Investigation team confirmations
  • Merchant alerts

Drift Data Generation Configuration

All drift generation parameters are centralized in src/config/config.yaml under the drift_generation section. This eliminates hardcoded values and makes it easy to adjust drift amounts for testing.

Configuration Location: src/config/config.yaml (lines 241-383)

Default Drift Dataset (generate_drift_dataset.py):

drift_generation:
  default_drift:
    transaction_amount:
      type: "multiplicative"
      factor: 1.4      # 40% increase (simulates inflation)
      noise: 0.1       # Β±10% random variation
    
    distance_from_home_km:
      type: "multiplicative"
      factor: 2.0      # 100% increase (travel/remote transactions)
      noise: 0.3       # Β±30% random variation
    
    velocity_score:
      type: "multiplicative"
      factor: 1.5      # 50% increase (more active users)
      noise: 0.2
    
    num_transactions_24h:
      type: "additive"
      shift: 3         # Add 3 more transactions on average
      noise: 1
    
    transaction_timestamp:
      type: "additive"
      shift: 50000     # Shift time forward
      noise: 5000
  
  # Number of samples and reproducibility
  num_samples: 5000
  random_state: 123

Variable Drift Patterns (generate_variable_drift_dataset.py):

The system supports 6 time-varying drift scenarios for realistic testing:

drift_generation:
  variable_patterns:
    run1:  # Baseline - minimal drift
      transaction_amount: {factor: 1.05, noise: 0.05}
      distance_from_home_km: {factor: 1.1, noise: 0.1}
    
    run2:  # Distance spike - travel surge
      distance_from_home_km: {factor: 3.5, noise: 0.5}  # 3.5x increase!
      velocity_score: {factor: 1.3, noise: 0.2}
    
    run3:  # Credit limit anomaly - system change
      credit_limit: {factor: 8.0, noise: 2.0}  # 8x increase! (PSI ~74)
      merchant_category_code: {shift: 500, noise: 200}
    
    run4:  # High velocity period
      velocity_score: {factor: 2.5, noise: 0.4}
      num_transactions_24h: {shift: 5, noise: 2}
    
    run5:  # Recovery - returning to normal
      transaction_amount: {factor: 1.2, noise: 0.15}
    
    run6:  # Account age anomaly - new user cohort
      account_age_days: {factor: 0.3, noise: 0.1}  # Younger accounts

To Adjust Drift Amounts:

  1. Edit config.yaml:

    drift_generation:
      variable_patterns:
        run3:
          credit_limit:
            factor: 1.5  # Reduce from 8.0 to 1.5 (20% drift instead of 800%)
            noise: 0.2   # Reduce from 2.0 to 0.2
  2. Regenerate drifted datasets:

    python src/drift_monitoring/generate_drift_dataset.py
    python src/drift_monitoring/generate_variable_drift_dataset.py
  3. Test the new drift levels in notebook 2a or via endpoint invocations

Drift Types:

  • Multiplicative: new_value = original_value Γ— (factor Β± noise Γ— factor)
    • Example: factor: 2.0, noise: 0.3 β†’ value is multiplied by 1.4-2.6x
  • Additive: new_value = original_value + (shift Β± noise)
    • Example: shift: 3, noise: 1 β†’ value increased by 2-4

Time-Based Drift Detection:

The monitoring system now compares data within configurable time windows:

monitoring:
  lookback_days: 30                  # Default monitoring window
  data_drift_lookback_days: 7        # Compare last 7 days for data drift
  model_drift_lookback_days: 30      # Compare last 30 days for model performance
  min_samples_for_drift: 100         # Minimum samples required

Before: Drift compared all historical inference data (LIMIT 5000)
After: Drift compares recent inference data (last N days) vs training baseline

This ensures:

  • Fair comparison (recent vs recent data)
  • Consistent time periods
  • No mixing of old and new inference data
  • Configurable via environment variables for Lambda deployments

Lambda Environment Variables:

# Override config.yaml defaults via environment variables
DATA_DRIFT_LOOKBACK_DAYS=14     # 2 weeks instead of 7 days
MODEL_DRIFT_LOOKBACK_DAYS=60    # 2 months instead of 30 days

MLflow Run ID Tracking

Each drift monitoring execution creates a unique MLflow run with a UUID for complete traceability.

Implementation:

  • Lambda function fraud-detection-drift-monitor generates unique run ID on every execution
  • Stored in monitoring_responses Athena table under mlflow_run_id column
  • Enables tracking of drift metrics over time in MLflow experiments

Verification (in 4_optional_version_validation.ipynb):

# Cell 12-13: Validate unique run IDs
query = """
    SELECT monitoring_run_id, monitoring_timestamp, mlflow_run_id,
           data_drift_detected, model_drift_detected
    FROM fraud_detection.monitoring_responses
    ORDER BY monitoring_timestamp DESC LIMIT 10
"""
unique_runs = athena_df['mlflow_run_id'].nunique()
print(f"βœ“ Unique MLflow run IDs: {unique_runs}")

Expected Behavior:

  • Each drift test generates new UUID (e.g., a3f7b2c1-..., d8e9f0a2-...)
  • No more generic "unknown" or "pipeline" values
  • Full audit trail of all drift monitoring executions

MLflow Experiment:

  • Experiment: fraud-detection-drift_monitoring
  • Each run contains:
    • Drift metrics (per-feature drift scores, aggregate statistics)
    • Performance metrics (ROC-AUC, accuracy, precision, recall)
    • Evidently HTML reports (data drift dashboard, classification dashboard)
    • Detection flags (data_drift_detected, model_drift_detected)

QuickSight Dashboard Auto-Refresh

The governance dashboard automatically refreshes daily with the latest drift monitoring results.

Architecture:

2:00 AM UTC β†’ Drift Monitoring Lambda (fraud-detection-drift-monitor)
              β”œβ”€ Runs Evidently drift detection
              β”œβ”€ Logs metrics to MLflow
              └─ Writes results to monitoring_responses Athena table

                 ⏱️ Wait 1 hour for data to settle...

3:00 AM UTC β†’ QuickSight Refresh Lambda (quicksight-dashboard-refresh)
              β”œβ”€ Triggers SPICE ingestion for 3 datasets:
              β”‚  β”œβ”€ Inference Monitoring Dataset
              β”‚  β”œβ”€ Drift Monitoring Dataset
              β”‚  └─ Feature Drift Analysis Dataset
              └─ Dashboard shows updated data by morning

Components Created (Cells 74-75 in 2a_inference_monitoring.ipynb):

  1. Lambda Function: quicksight-dashboard-refresh

    • Runtime: Python 3.11
    • Memory: 128 MB
    • Environment Variables:
      • AWS_ACCOUNT_ID
      • INFERENCE_DATASET_ID
      • DRIFT_DATASET_ID
      • FEATURE_DRIFT_DATASET_ID
  2. IAM Role: quicksight-dashboard-refresh-role

    • Permissions: quicksight:CreateIngestion, quicksight:DescribeIngestion
  3. EventBridge Rule: quicksight-dashboard-daily-refresh

    • Schedule: cron(0 3 * * ? *) (3:00 AM UTC daily)
    • Target: quicksight-dashboard-refresh Lambda

Manual Refresh (Cell 2 in 3_governance_dashboard.ipynb):

# Quick refresh cell at top of governance dashboard notebook
# Refreshes all 3 QuickSight datasets immediately
# Use after new drift monitoring runs or inference predictions

Verification:

# Check EventBridge schedule
aws events describe-rule --name quicksight-dashboard-daily-refresh

# View Lambda logs
aws logs tail /aws/lambda/quicksight-dashboard-refresh --follow

# Check QuickSight ingestion history
# Go to QuickSight Console β†’ Datasets β†’ Refresh tab

Benefits:

  • No manual dashboard refresh required
  • Always see latest data by morning (after 3 AM UTC refresh)
  • Automated after drift monitoring completes
  • Cost: <$1/month (Lambda + EventBridge)

Customization:

# Change refresh schedule (in Cell 75)
events.put_rule(
    Name='quicksight-dashboard-daily-refresh',
    ScheduleExpression='cron(0 6 * * ? *)',  # 6 AM UTC instead
    State='ENABLED'
)

Athena Data Lake

Iceberg Tables

training_data:

  • 284,080 rows, 33 columns (30 features + 3 metadata)
  • Partitioned by: None (static training data)
  • Features: See "Feature Format" section below

inference_responses:

  • All predictions from deployed endpoint
  • Partitioned by: year, month, day
  • Schema: inference_id, request_timestamp, prediction, probability_fraud, ground_truth, latency_ms, confidence_score, transaction_context
  • The ground_truth column starts as NULL and gets populated by the ground truth update process

ground_truth_updates:

  • Lightweight table for merging confirmed labels back into inference_responses
  • Schema: inference_id, actual_fraud, confirmation_timestamp, confirmation_source, investigation_notes
  • Populated by: Ground truth simulator (dev/test) or fraud investigation systems (production)
  • Consumed by: src/drift_monitoring/update_ground_truth.py which runs an Athena MERGE to update the ground_truth column in inference_responses
  • JOINs on inference_id (not transaction_id)

ground_truth (not used in current PoC):

  • Designed for a future batch retraining workflow
  • Stores complete feature rows (all 33 columns) alongside confirmed fraud labels
  • Unlike ground_truth_updates (which only patches inference_responses), this table holds full feature vectors for training a new model version
  • Would be populated by a batch job that combines inference_responses + confirmed labels into training-ready rows
  • Not needed for the current monitoring workflow β€” only relevant when building an automated retraining pipeline

drifted_data:

  • Samples flagged for drift analysis
  • Used for retraining prioritization

Ground Truth Data Flow

inference_responses                ground_truth_updates
(predictions, ground_truth=NULL)   (inference_id + actual_fraud)
         β”‚                                    β”‚
         └──────── MERGE ON inference_id β”€β”€β”€β”€β”€β”€β”˜
                          β”‚
                          β–Ό
              inference_responses
              (ground_truth populated)
                          β”‚
                          β–Ό
               Monitoring & Drift Detection
              (metrics where ground_truth IS NOT NULL)

Key distinction:

  • ground_truth_updates β†’ patches existing predictions (lightweight, used now)
  • ground_truth β†’ stores full feature rows for retraining (heavyweight, future use)

Querying Athena

Check inference logging:

-- Recent predictions
SELECT
    inference_id,
    request_timestamp,
    prediction,
    probability_fraud,
    ground_truth,
    confidence_score
FROM fraud_detection.inference_responses
ORDER BY request_timestamp DESC
LIMIT 10;

Ground truth coverage:

-- Coverage by day
SELECT
    DATE(request_timestamp) as date,
    COUNT(*) as total_predictions,
    SUM(CASE WHEN ground_truth IS NOT NULL THEN 1 ELSE 0 END) as with_ground_truth,
    CAST(SUM(CASE WHEN ground_truth IS NOT NULL THEN 1 ELSE 0 END) AS DOUBLE) / COUNT(*) * 100 as coverage_pct
FROM fraud_detection.inference_responses
GROUP BY DATE(request_timestamp)
ORDER BY date DESC;

Model performance:

-- Confusion matrix
SELECT
    prediction,
    ground_truth,
    COUNT(*) as count
FROM fraud_detection.inference_responses
WHERE ground_truth IS NOT NULL
GROUP BY prediction, ground_truth;

Feature Format

Training Features (30 total)

Your model expects these exact 30 features in JSON format:

{
  "transaction_hour": 14,
  "transaction_day_of_week": 2,
  "transaction_amount": 149.62,
  "transaction_type_code": 1,
  "customer_age": 42,
  "customer_gender": 0,
  "customer_tenure_months": 36,
  "account_age_days": 1095,
  "distance_from_home_km": 5.2,
  "distance_from_last_transaction_km": 2.3,
  "time_since_last_transaction_min": 120,
  "online_transaction": 1,
  "international_transaction": 0,
  "high_risk_country": 0,
  "merchant_category_code": 5411,
  "merchant_reputation_score": 0.85,
  "chip_transaction": 1,
  "pin_used": 1,
  "card_present": 1,
  "cvv_match": 1,
  "address_verification_match": 1,
  "num_transactions_24h": 3,
  "num_transactions_7days": 12,
  "avg_transaction_amount_30days": 125.50,
  "max_transaction_amount_30days": 450.00,
  "velocity_score": 0.3,
  "recurring_transaction": 0,
  "previous_fraud_incidents": 0,
  "credit_limit": 5000.0,
  "available_credit_ratio": 0.75
}

⚠️ Note: Do NOT use V1, V2, V3 features - those are from a different dataset.

Feature Categories

Category Count Features
Transaction 4 hour, day_of_week, amount, type_code
Customer Profile 4 age, gender, tenure_months, account_age_days
Geographic/Temporal 4 distance_from_home, distance_from_last, time_since_last, online
Risk Indicators 2 international, high_risk_country
Merchant 2 category_code, reputation_score
Payment Security 5 chip, pin, card_present, cvv_match, address_verification
Behavioral 7 num_transactions_24h/7d, avg/max_amount_30d, velocity, recurring, previous_fraud
Credit 2 credit_limit, available_credit_ratio

CLI Reference

Setup & Infrastructure

# Upload CSV data to S3 (required before migration, files not in git)
python -m src.setup.upload_data_to_s3

# Create all infrastructure (S3, Athena DB, tables)
python main.py setup --migrate-data

# Just create infrastructure (no data migration)
python main.py setup

Pipeline Operations

# Create pipeline
python main.py pipeline create --pipeline-name fraud-detection-pipeline

# Update existing pipeline
python main.py pipeline update --pipeline-name fraud-detection-pipeline

# Start execution
python main.py pipeline start --pipeline-name fraud-detection-pipeline [--wait]

# Get status
python main.py pipeline status --pipeline-name fraud-detection-pipeline

# List executions
python main.py pipeline list-executions --pipeline-name fraud-detection-pipeline

# Delete pipeline
python main.py pipeline delete --pipeline-name fraud-detection-pipeline

Ground Truth & Monitoring

# Simulate ground truth (development)
python -m src.monitoring.simulate_ground_truth_from_athena --accuracy 0.85 [--limit 1000]

# Apply ground truth updates
python -m src.monitoring.update_ground_truth --mode batch
python -m src.monitoring.update_ground_truth --mode streaming --window-hours 24

# Monitor performance
python -m src.monitoring.monitor_model_performance --days 30 [--alert-threshold 0.80]

IAM Roles

# Create/update SageMaker execution role
python -m src.setup.create_or_update_sagemaker_role

# Create Lambda execution role
python -m src.setup.create_lambda_role

Troubleshooting

Pipeline Execution Issues

Issue: Preprocessing fails with "Unable to verify/create output bucket"

InvalidRequestException: Unable to verify/create output bucket fraud-detection-data-lake

Solution:

  1. Check .env has correct bucket name with account ID suffix:
    DATA_S3_BUCKET=fraud-detection-data-lake-YOUR_ACCOUNT
  2. Verify bucket exists:
    aws s3 ls s3://fraud-detection-data-lake-YOUR_ACCOUNT/
  3. Create bucket if missing:
    python main.py setup

Issue: Training fails with "Feature count mismatch"

ModelError: Expected 30 features, got 32

Solution:

  • Model expects exactly 30 training features
  • Check TRAINING_FEATURES list in 2a_inference_monitoring.ipynb Cell 4
  • Do not include transaction_id or transaction_timestamp (metadata columns)

Issue: Pipeline fails with "FileNotFoundError: 'awswrangler'"

FileNotFoundError: [Errno 2] No such file or directory: 'awswrangler'

Solution:

  1. Verify inference_requirements.txt exists:
    cat src/train_pipeline/inference_requirements.txt
  2. Should contain:
    awswrangler>=3.0.0
    boto3>=1.34.0
    pandas>=2.0.0
    
  3. Restart Jupyter kernel if running in notebook
  4. Redeploy pipeline

MLflow Integration Issues

Issue: Can't access MLflow UI

Solution:

  1. Ensure you're in SageMaker Studio (not local environment)
  2. Navigate: Left sidebar β†’ Partner AI Apps β†’ MLflow
  3. Click on your MLflow app name
  4. Opens authenticated session in new tab

Issue: MLflow tracking URI not working

MlflowException: Unable to connect to tracking server

Solution:

  1. Check .env has ARN format (not HTTPS):
    # βœ“ Correct (for SDK)
    MLFLOW_TRACKING_URI=arn:aws:sagemaker:us-east-1:123456:mlflow-app/app-ABC123
    
    # βœ— Wrong (for web UI only)
    MLFLOW_TRACKING_URI=https://abc123.mlflow-tracking.us-east-1.sagemaker.aws/
  2. Verify IAM role has MLflow access:
    python -m src.setup.create_or_update_sagemaker_role

Issue: Training dependencies not installed

ModuleNotFoundError: No module named 'sagemaker_mlflow'

Solution:

  1. Check training step has dependencies configured:
    # In pipeline.py training step
    dependencies=['pipeline_steps/requirements_train.txt']
  2. Verify requirements_train.txt contains:
    sagemaker-mlflow>=0.1.0
    mlflow>=2.17.0
    

Inference & Monitoring Issues

Issue: No records in Athena after inference

Solution:

  1. Check endpoint uses custom handler (not built-in XGBoost)
  2. Verify CloudWatch logs show:
    βœ“ Athena client initialized for inference logging
    
  3. Make 50+ predictions to trigger buffer flush:
    # Buffer flushes at 50 records
    # Cell 6 (5 predictions) + Cell 9 (50 predictions) = 55 total
    # This should automatically trigger the batch flush
  4. Or wait 5 minutes for time-based flush

Issue: Timestamp precision error in Athena queries

NOT_SUPPORTED: Incorrect timestamp precision for timestamp(6)

Solution:

  1. Athena Iceberg tables use TIMESTAMP(3) (milliseconds)
  2. Cast aggregations explicitly:
    CAST(MIN(request_timestamp) AS TIMESTAMP(3))
  3. All timestamp aggregations (MIN, MAX, AVG) must be cast
  4. See TIMESTAMP_FIX.md for complete fix details

Issue: Ground truth simulation returns 0 records

No predictions found without ground truth

Solution:

  1. Verify inference_responses table has data:
    SELECT COUNT(*) FROM fraud_detection.inference_responses;
  2. Check predictions don't already have ground truth:
    SELECT COUNT(*) FROM fraud_detection.inference_responses
    WHERE ground_truth IS NULL;
  3. Run inference tests first (notebook Cells 6-9)

Infrastructure Issues

Issue: IAM role lacks permissions

AccessDeniedException: User is not authorized to perform: athena:StartQueryExecution

Solution:

# Update SageMaker role with all required permissions
python -m src.setup.create_or_update_sagemaker_role

# Grants access to:
# - S3 (read/write)
# - Athena (queries, Iceberg tables)
# - Glue (data catalog)
# - Lake Formation (table permissions)
# - MLflow (tracking server)
# - Lambda (for deployment step)

Issue: Corrupted Athena query results

SYNTAX_ERROR: line 1:8: Column 'cnt' cannot be resolved

Solution:

  1. Corrupted metadata in S3
  2. Clean up corrupted results:
    ./src/setup/cleanup_corrupted_athena_results.sh
  3. Or manually delete:
    aws s3 rm s3://YOUR_BUCKET/athena-query-results/tables/ --recursive

Performance Issues

Issue: Inference latency too high (>500ms)

Expected: 100-200ms P95 latency

Solution:

  1. Check if cold start (first request ~2-5 seconds)
  2. Verify endpoint instance type (ml.m5.xlarge recommended)
  3. Check CloudWatch metrics for throttling
  4. Consider increasing serverless concurrency
  5. Background Athena logging should add <50ms

Complete End-to-End Workflow

1. Setup Infrastructure

# One-time setup
python -m src.setup.upload_data_to_s3
python main.py setup --migrate-data

Result: S3 bucket, Athena database, Iceberg tables created, CSV data uploaded and migrated


2. Train & Deploy Model via Pipeline

Option A: Jupyter Notebook (Recommended)

# In SageMaker Studio, open:
notebooks/1_training_pipeline.ipynb

# Run sequentially:
# - Cells 1-3: Setup and configuration
# - Cell 4: Create/update pipeline definition
# - Cell 5: Execute pipeline and wait (~25 minutes)

Option B: CLI

python main.py pipeline create --pipeline-name fraud-detection-pipeline
python main.py pipeline start --pipeline-name fraud-detection-pipeline --wait

Result:

  • Model trained with XGBoost
  • Evaluated (ROC-AUC, PR-AUC, confusion matrix)
  • Registered in MLflow Model Registry
  • Deployed to serverless endpoint with custom handler

3. Run Inference & Monitoring

Open: notebooks/2a_inference_monitoring.ipynb

This notebook handles the complete inference and monitoring workflow:

3a. Test Inference (Cells 1-9)

# In 2a_inference_monitoring.ipynb:
# Cell 1-2: Setup (load environment, initialize clients)
# Cell 4: Load test data and verify features
# Cell 5: Quick single test (optional)
# Cell 6: Single inference tests (5 samples)
# Cell 8: Generate playground samples (for SageMaker Studio testing)
# Cell 9: Bulk inference test (50 samples)

Result:

  • Predictions returned with fraud probabilities
  • Automatically logged to inference_responses table (custom handler)
  • Latency metrics captured

Verify automatic logging:

# Cell 10: Check Athena for logged predictions
# Should show records after 100 predictions or 5 minutes

3b. Simulate Ground Truth (Cells 20-22)

For Development/Testing:

# In 2a_inference_monitoring.ipynb:
# Cell 20: Run ground truth simulator
# - Simulates fraud investigation outcomes
# - Creates realistic confirmation delays
# - Configurable accuracy (default 85%)

# Or via CLI:
python -m src.monitoring.simulate_ground_truth_from_athena --accuracy 0.85

For Production: Replace simulation with actual fraud investigation system that writes to ground_truth_updates table:

  • Chargeback notifications
  • Customer fraud reports
  • Investigation team confirmations
  • Merchant alerts

Result: Ground truth updates in ground_truth_updates table

3c. Apply Ground Truth Updates (Cell 19 or CLI)

# In 2a_inference_monitoring.ipynb:
# Cell 19: Apply updates using notebook

# Or via CLI:
python -m src.monitoring.update_ground_truth --mode batch

Result: inference_responses.ground_truth column populated via MERGE

3d. Verify Coverage (Cell 22)

# Cell 22: Check ground truth coverage
# Shows: total predictions, with/without ground truth, coverage %

Expected: 100% coverage (simulated) or partial coverage (production)

3e. Monitor Performance & Detect Drift (Cells 27-40)

# Cell 27: Initialize performance monitor
# Cell 28: Generate performance report
# Cell 29: Display metrics (ROC-AUC, precision, recall, F1)
# Cell 32: Run Evidently data drift detection (DataDriftPreset)
# Cell 33: Display interactive Evidently data drift report
# Cell 35: Run Evidently model drift detection (ClassificationPreset)
# Cell 36: Display interactive Evidently classification report
# Cell 38: Log Evidently HTML reports and metrics to MLflow
# Cell 40: Check for alerts (performance degradation)

Result:

  • Performance metrics calculated (where ground truth exists)
  • Evidently data drift report generated (PSI, KS, distribution comparisons)
  • Evidently classification report generated (ROC, PR, confusion matrix, F1)
  • Interactive HTML reports logged to MLflow as artifacts
  • Per-feature drift scores and classification metrics logged as MLflow metrics
  • Alerts if performance degraded >5%

πŸ“ Note: Evidently reports in cells 33 and 36 show as interactive HTML when running locally, but don't render on GitHub. See report screenshots or check MLflow artifacts under evidently_reports/ in production.

Alternative: CLI monitoring (no charts):

python -m src.monitoring.monitor_model_performance --days 30

4. Review Results in MLflow

# In SageMaker Studio:
# 1. Left sidebar β†’ Partner AI Apps β†’ MLflow
# 2. Select experiments:
#    - credit-card-fraud-detection-training (training runs)
#    - credit-card-fraud-detection-inference (monitoring runs)
# 3. View charts in Artifacts section

Charts available:

  • Training: ROC curve, confusion matrix, feature importance
  • Monitoring: Evidently interactive HTML reports (data drift dashboard, classification dashboard)

5. Retrain if Needed

# If drift detected or performance degraded:

# Option A: Via 1_training_pipeline.ipynb
# Cell 5: Re-run pipeline execution

# Option B: Via CLI
python main.py pipeline start --pipeline-name fraud-detection-pipeline --wait

Result:

  • New model version trained with latest data
  • Automatically registered in MLflow
  • Deployed to same endpoint (replaces old model)

Summary: Notebook Workflow

The entire PoC is driven by three notebooks in SageMaker Studio. Each notebook cell maps to a CLI command (see Next Steps for CI/CD equivalents).

Notebook Purpose Key Cells
1_training_pipeline.ipynb Training, evaluation, model registration, endpoint deployment Cells 1-5
2a_inference_monitoring.ipynb Inference testing, ground truth, drift detection, CloudWatch alarms Cells 1-40
inference_monitoring_with_pipeline.ipynb Pipeline-based automated monitoring: creates a SageMaker Pipeline (fraud-detection-monitoring-pipeline) with steps for ground truth simulation, drift computation, MLflow logging, Athena writes, threshold checks, and alarm creation Cells 1-end
3_governance_dashboard.ipynb QuickSight governance dashboard: creates Athena data source, dataset, analysis with 6 visuals, and published dashboard β€” all via API Cells 1-11
4_optional_version_validation.ipynb Version traceability: validates MLflow model version matches SageMaker endpoint and Athena logs Cells 1-20
5_optional_cleanup.ipynb Resource cleanup: deletes all AWS resources (endpoints, Lambda, Athena tables, S3, CloudWatch) Cells 1-38

Workflow:

  1. Train & Deploy: 1_training_pipeline.ipynb β†’ Cells 1-5 (creates pipeline, trains model, deploys endpoint)
  2. Test Inference: 2a_inference_monitoring.ipynb β†’ Cells 1-9 (single + bulk predictions, auto-logged to Athena)
  3. Simulate Ground Truth: 2a_inference_monitoring.ipynb β†’ Cells 19-24 (generate and apply ground truth labels)
  4. Monitor & Detect Drift: 2a_inference_monitoring.ipynb β†’ Cells 26-39 (performance metrics, data drift, model drift, MLflow logging)
  5. CloudWatch Alarms: 2a_inference_monitoring.ipynb β†’ Cell 40 (publish metrics, create alarms & dashboard)
  6. MLflow Review: SageMaker Studio β†’ Partner AI Apps β†’ MLflow
  7. Retrain: 1_training_pipeline.ipynb β†’ Cell 5 (re-execute pipeline)

Alternative: Pipeline-Based Monitoring inference_monitoring_with_pipeline.ipynb wraps steps 3-5 into a single SageMaker Pipeline (fraud-detection-monitoring-pipeline) with automated steps: SimulateGroundTruth β†’ ComputeDrift β†’ LogToMLflow β†’ WriteToAthena β†’ CheckThresholds β†’ CreateAlarms. Use this for scheduled, hands-off monitoring.

All in SageMaker Studio β€” no CLI needed for development workflow.


Inference Monitoring with SageMaker Pipeline

notebooks/inference_monitoring_with_pipeline.ipynb sets up inference monitoring as a fully automated SageMaker Pipeline instead of running individual notebook cells manually.

What it does:

  • Creates a 6-step SageMaker Pipeline: SimulateGroundTruth β†’ ComputeDrift β†’ LogToMLflow β†’ WriteToAthena β†’ CheckThresholds β†’ CreateAlarms
  • Auto-resolves the MLflow run ID and model version from the MLflow Model Registry (no hardcoded run IDs)
  • Queries monitoring results from Athena and visualizes drift trends over time
  • Configures EventBridge scheduling for daily automated execution (Section 11) β€” creates the IAM role, rule, and wires the pipeline as a direct EventBridge target using SageMakerPipelineParameters

Key sections:

  1. Setup β€” auto-resolves model version from MLflow Model Registry (prefers Production stage)
  2. Pipeline creation and execution with configurable thresholds
  3. Athena result queries and drift trend visualization
  4. EventBridge scheduling for daily hands-off monitoring (no Lambda intermediary)

Governance Dashboard (QuickSight)

notebooks/3_governance_dashboard.ipynb programmatically creates a complete QuickSight governance dashboard β€” no manual UI steps required.

What it creates (all via QuickSight Definition API):

  • Athena data source pointing to the fraud_detection database
  • Dataset from inference_responses table with calculated fields (prediction_accuracy, risk_tier)
  • Analysis with 6 visuals: prediction volume over time, fraud probability distribution, prediction accuracy breakdown (donut), risk tier distribution, inference latency trend, and total inferences KPI
  • Published dashboard with filtering and CSV export enabled

Prerequisites: QuickSight Enterprise subscription, data in inference_responses Athena table, IAM permissions for QuickSight/Athena/S3.

QuickSight Governance Dashboard:

πŸ“„ View Governance Dashboard (PDF)

Extending with Feature-Level Drift Analysis:

The default dashboard provides high-level inference and model performance metrics. For detailed feature-level drift analysis, you can extend the dashboard with additional visuals:

  • Feature Drift Timeline: Line chart showing how individual features drift over time (compare credit_limit vs. merchant_category vs. account_age)
  • Top Drifting Features: Horizontal bar chart ranking features by average drift score
  • Drift Severity Distribution: Stacked bar chart showing Low/Moderate/Significant drift counts per feature
  • Feature Drift Detail Table: Sortable table with all feature drift metrics

Implementation: The Feature-Level Drift Analysis Guide provides:

  • Pre-built Athena view (feature_drift_detail) that unpacks per-feature drift scores
  • Copy-paste Python code to add 6 new visuals to your dashboard
  • SQL queries and visualization configurations
  • Step-by-step implementation checklist

When to use: Add these visuals when you need to:

  • Investigate which specific features are drifting (not just overall drift detection)
  • Prioritize retraining efforts by identifying the worst offenders
  • Distinguish data quality issues from genuine distribution shifts
  • Monitor drift patterns across your 29+ input features

Cost Optimization

Training:

  • Use Spot instances for training: significantly lower compute costs
  • Training completes in 10-15 min, low spot interruption risk
  • Configure in pipeline: instance_type="ml.m5.xlarge", use_spot_instances=True

Inference:

  • Serverless inference: Pay per invocation with automatic scaling to zero when idle
  • Cold start: 10-30 seconds (acceptable for fraud detection)
  • Alternative: Provision dedicated instances for high-volume scenarios with predictable traffic

Storage:

  • Athena Iceberg tables: Pay per query, not storage
  • S3 Intelligent-Tiering for model artifacts
  • Compress old inference logs to Glacier after 90 days

Estimated Monthly Costs (1000 predictions/day):

  • Training: ~$5-10 (spot instances, 2-3 retrainings/month)
  • Inference: ~$20-30 (serverless)
  • Storage: ~$5-10 (S3 + Athena)
  • MLflow: Included in SageMaker AI Notebooks
  • Total: ~$30-50/month

Model Versioning & Lifecycle

Version Stages

Models progress through stages in MLflow Model Registry:

  1. None - Newly registered, not validated
  2. Staging - Undergoing validation/testing
  3. Production - Serving live traffic
  4. Archived - Retired

Promoting Models

# In MLflow UI or via SDK
from mlflow import MlflowClient

client = MlflowClient()

# Promote to staging
client.transition_model_version_stage(
    name="xgboost-fraud-detector",
    version=3,
    stage="Staging"
)

# After validation, promote to production
client.transition_model_version_stage(
    name="xgboost-fraud-detector",
    version=3,
    stage="Production"
)

# Archive old version
client.transition_model_version_stage(
    name="xgboost-fraud-detector",
    version=2,
    stage="Archived"
)

Version Metadata

Each version includes:

  • Metrics: ROC-AUC, PR-AUC, precision, recall, F1
  • Parameters: max_depth, learning_rate, scale_pos_weight, etc.
  • Tags: training_date, pipeline_execution_id, deployed_endpoint
  • Artifacts: model.xgb, feature_metadata.json, preprocessing config

Rollback Strategy

# If new model performs poorly:

# 1. Check recent performance
python -m src.monitoring.monitor_model_performance --days 7

# 2. If degraded, rollback to previous version in MLflow UI:
#    - Mark current version as "Archived"
#    - Mark previous version as "Production"

# 3. Redeploy with previous model version
#    Update pipeline to use previous run_id
python main.py pipeline start --pipeline-name fraud-detection-pipeline

Next Steps

CI/CD: From Notebooks to Automation

Each notebook cell has a direct CLI equivalent. For CI/CD pipelines, replace notebook interactions with these commands:

Phase Notebook Cell CLI Command
Infrastructure 1_training_pipeline.ipynb Cell 3 python main.py setup --migrate-data
SQS + Lambda 1_training_pipeline.ipynb Cell 3 python main.py setup-logging
Create Pipeline 1_training_pipeline.ipynb Cell 4 python main.py pipeline create --pipeline-name fraud-detection-pipeline
Train & Deploy 1_training_pipeline.ipynb Cell 5 python main.py pipeline start --pipeline-name fraud-detection-pipeline --wait
Test Inference 2a_inference_monitoring.ipynb Cells 6-9 python main.py test --endpoint-name fraud-detector-endpoint --num-samples 100
Simulate Ground Truth 2a_inference_monitoring.ipynb Cell 19 python -m src.monitoring.simulate_ground_truth_from_athena --accuracy 0.85
Apply Ground Truth 2a_inference_monitoring.ipynb Cell 24 python -m src.monitoring.update_ground_truth --mode batch
Monitor Drift 2a_inference_monitoring.ipynb Cells 26-39 python -m src.monitoring.monitor_model_performance --days 30

Scheduled Monitoring

Automate drift checks with EventBridge + Lambda:

  1. EventBridge Rule: Schedule monitor_model_performance.py daily/hourly via a Lambda function
  2. Lambda wrapper: Package the monitoring script as a Lambda that queries Athena and publishes CloudWatch metrics
  3. Trigger retraining: If drift exceeds thresholds, start the SageMaker Pipeline automatically

SNS Notifications

Connect CloudWatch alarms (created in Cell 40) to email or Slack:

# 1. Create SNS topic
aws sns create-topic --name FraudDetection-DriftAlerts

# 2. Subscribe email
aws sns subscribe \
  --topic-arn arn:aws:sns:us-east-1:YOUR_ACCOUNT:FraudDetection-DriftAlerts \
  --protocol email \
  --notification-endpoint your-team@example.com

# 3. Update alarms to notify (repeat for each alarm)
aws cloudwatch put-metric-alarm \
  --alarm-name FraudDetection-ModelDrift-ROC-AUC \
  --alarm-actions arn:aws:sns:us-east-1:YOUR_ACCOUNT:FraudDetection-DriftAlerts \
  # ... (keep existing alarm configuration)

For Slack, use an SNS β†’ Lambda β†’ Slack webhook integration or AWS Chatbot.

License

MIT License - See LICENSE file for details


Documentation Version: 3.1 Last Updated: 2026-03-21 Pipeline Version: 1.0 Model Version: xgboost-fraud-detector v1+


Version History

Date Summary
2026-03-24 README Major Revision: Removed duplicate "Complete Architecture" ASCII art section (136 lines) since visual diagrams (architecture_diagram.png, inference_monitoring_diagram.png) provide better visualization. Enhanced Overview section with comprehensive explanation of ML governance importance, production monitoring challenges, and solution benefits. Added "Advanced Configuration" section documenting recent features: (1) REDEPLOY_LAMBDAS flag for Lambda redeployment control, (2) drift simulation configuration with test scenarios (SIM_ACCURACY, SIM_FEATURE_DRIFT_IMPACT, SIM_MODEL_DRIFT_MAG), (3) MLflow run ID tracking with unique UUIDs per drift test, (4) QuickSight dashboard auto-refresh automation (3 AM UTC daily after drift monitoring at 2 AM). Updated all three architecture diagrams to reflect latest pipeline additions. Added EventBridge + Lambda dataset refresh flow to Governance Dashboard (QuickSight) lane. Added config.yaml drift metrics configuration node with file icon. Regenerated all PNG diagrams.
2026-03-21 Added drift trend analysis visuals to 3_governance_dashboard.ipynb: second QuickSight dataset (monitoring_responses) and Sheet 2 with 6 new visuals (drift share over time, drifted feature counts, ROC-AUC baseline vs current, model performance multi-line, drift alerts timeline, drift KPI). Aligned setup_quicksight_governance.py column names with actual monitoring_responses Iceberg schema. Added Lake Formation grants for all tables (caller IAM role + Lambda role). Fixed SSO assumed-role ARN resolution for Lake Formation. Standardized table name to monitoring_responses across codebase. Added notebook cell to write Evidently drift metrics directly to monitoring_responses Athena table. Inference handler now resolves model_version and mlflow_run_id dynamically from MLflow registry at model load time. Pipeline model step passes MODEL_VERSION and MLFLOW_RUN_ID env vars. Added "Why Not SageMaker DataCaptureConfig?" section to README. Fixed notebook cell ordering, MLflow tracking URI, Lake Formation Principal parameter, screenshot path, and various module/dependency issues. Renamed directory from automated-drift_monitoring-evidently to sagemaker-automated-drift-and-trend-monitoring.
2026-03-18 Initial documentation v3.0 with Evidently integration, MLflow tracking, and SageMaker pipeline.

SageMaker SDK v3 Migration

This codebase has been migrated to SageMaker Python SDK v3.7.1, which reorganizes the SDK into separate packages (sagemaker-core, sagemaker-train, sagemaker-mlops, sagemaker-serve). Key changes:

  • Training: XGBoost estimator replaced by ModelTrainer from sagemaker.train.model_trainer with explicit SourceCode, Compute, and OutputDataConfig objects
  • Model registration: RegisterModel step collection replaced by ModelStep + ModelBuilder.register() from sagemaker.serve.model_builder
  • Model creation: CreateModelStep + XGBoostModel replaced by ModelStep + ModelBuilder.build()
  • Deployment: Model.deploy() replaced by ModelBuilder.deploy() with ServerlessInferenceConfig from sagemaker.serve.serverless
  • Session: sagemaker.Session() replaced by Session from sagemaker.core.helper.session_helper
  • Image URIs: sagemaker.image_uris.retrieve() replaced by retrieve from sagemaker.core.image_uris
  • Processing: Imports moved from sagemaker.processing to sagemaker.core.processing; SKLearnProcessor removed (use ScriptProcessor or FrameworkProcessor)
  • Workflow: Pipeline and step imports moved from sagemaker.workflow.* to sagemaker.mlops.workflow.* and sagemaker.core.workflow.*

All pipeline behavior, step flow, parameters, and environment variables are preserved β€” only the SDK API surface changed.