11-Step End-to-End Flow: This architecture shows the complete MLOps pipeline from data ingestion through training, deployment, inference monitoring, and governance. See ARCHITECTURE_STEPS.md for detailed descriptions of each numbered step.
Follow these steps to implement the 11-step architecture shown above. Each step corresponds to the numbered components in the diagram.
SageMaker AI Domain Setup
This solution requires a SageMaker AI domain with MLflow tracking server. If you don't have one set up already, use this CloudFormation template to create a domain with VPC configuration:
SageMaker Domain with VPC Setup
Copy .env.example to .env and update with your specific environment variables. Settings in .env will override all other defaults and settings in config.yaml.
git clone https://github.com/aws-samples/sample-mlops-bestpractices.git
cd sagemaker-automated-drift-and-trend-monitoring
cp .env.example .envRequired .env settings:
SAGEMAKER_EXEC_ROLE- ARN of SageMaker execution roleMLFLOW_TRACKING_URI- ARN of SageMaker MLflow tracking serverDATA_S3_BUCKET- S3 bucket name for data storageAWS_DEFAULT_REGION- AWS region (must match your domain region)
Step 1 - Data Ingestion
- S3 data loaded into Training Data table
- The repository provides scripts to generate the training dataset (
generate_datasets.py) if you would like to test with the sample dataset - Alternatively, you can replace
training_datawith your own dataset
Step 2 - Feature Engineering
- PySpark processing transforms raw data into features
- Handles missing values, derives velocity scores, time-based features
Step 3 - Model Training
- XGBoost training on preprocessed features
- Hyperparameters configured in pipeline definition
Step 4 - Model Evaluation & Registration
- Training steps and evaluation steps log to SageMaker AI MLflow App
- Unified experiment tracking and drift computation environment
- Model registered with versioning and metadata
Step 5 - Model Deployment
- Custom handler deployed to SageMaker AI inference endpoint
- Once deployed, the inference handler writes all inferences to SQS and Lambda
- All notebooks include convenience script invocations for role creation and infrastructure setup (SQS, Lambda, roles, etc.)
Step 6 - Real-Time Inference Logging
- Endpoint logs predictions to SQS β Lambda β Athena
inference_responsestable - Async logging with zero latency impact
Step 7 - Ground Truth Collection (Optional for testing)
- Simulate ground truth to test data and model drift detection
- In production, fraud confirmations arrive from investigation teams
Step 8 - Ground Truth Backfill
- Merge ground truth with inference responses
- Reconcile actual fraud outcomes with predictions based on
inference_id - Athena MERGE operation on Iceberg table (ACID compatible)
Step 9 - Scheduled Drift Detection
- EventBridge triggers Lambda drift monitor (daily 2 AM UTC)
- Compares training data (baseline) vs inference responses (current data)
- Runs Evidently AI drift analysis (PSI, KS tests)
Step 10 - Drift Report Generation
lambda_drift_monitoruses Evidently to generate interactive charts- Reports logged to SageMaker AI MLflow App
- Drift monitor checks thresholds and triggers alarms if exceeded
- Results pushed to SQS β Lambda writer β
monitoring_responsestable
Step 11 - QuickSight Visualization
- Create feature drift datasets, drift scores, and severity visualizations
- Observe trends for feature drift, model drift, and data drift
- Dashboard queries Athena tables directly (inference_responses, monitoring_responses)
- Auto-refresh via EventBridge + Lambda (3 AM UTC daily)
Machine learning models in production degrade silently. Feature distributions shift, fraud patterns evolve, and by the time business metrics reveal the problem, the damage is done. Most teams invest heavily in training pipelines but leave inference monitoring as an afterthought β relying on expensive managed platforms or discovering issues through customer complaints.
This solution provides an end-to-end, open-source MLOps system built on Amazon SageMaker, MLflow, and Evidently AI that closes the ML governance gap. It trains an XGBoost fraud detection model via SageMaker Pipelines, logs every prediction to an Athena Iceberg data lake with zero-latency async writes, and runs automated daily drift checks using EventBridge-triggered Lambda functions. Evidently AI generates interactive data drift and classification reports, while configurable thresholds in a central config.yaml let teams tune sensitivity for both data and model drift without code changes. SNS alerts fire when drift exceeds thresholds, and an Amazon QuickSight governance dashboard β refreshed automatically via a dedicated EventBridge + Lambda pipeline β surfaces inference trends, drift history, and model performance in a single pane of glass.
The result is a production-ready monitoring system with pay-as-you-go pricing that scales linearly with inference volume, runs entirely on open-source SDKs portable across clouds, and handles real-world challenges like delayed ground truth confirmations, concept drift, and multi-feature drift analysis. Three guided Jupyter notebooks walk you from training through monitoring to dashboard creation, making it straightforward to adapt this pattern to your own models and datasets.
Understanding Drift Through Visualization
The QuickSight governance dashboard features drift score trendlines that plot each feature's Population Stability Index (PSI) over time. These time-series visualizations let you identify which specific features are drifting (e.g., credit_limit spiking to 74.47, merchant_category_code at 28.25), distinguish temporary spikes from permanent shifts, and correlate drift events with model deployments or business changes. Instead of a single "drift detected" alert, you get granular per-feature analysis showing exactly which input distributions have changed since training β enabling targeted investigation of data quality issues, pipeline changes, or genuine shifts in customer behavior. For an accessible deep dive into interpreting drift scores and PSI calculations, see Understanding Drift Scores: A Visual Guide. To extend your dashboard with additional feature-level drift visuals (timelines, top drifting features, severity distributions), see Feature-Level Drift Analysis Implementation Guide.
Production-grade ML pipeline for credit card fraud detection using AWS SageMaker Pipelines, MLflow tracking, and Athena data lake integration with comprehensive inference monitoring and drift detection.
Why This Project Matters
Machine learning models degrade over time. Production models face data drift (feature distributions change), concept drift (prediction-target relationships shift), and performance degradation as the real world evolves. Without continuous monitoring, ML systems fail silently β making increasingly poor decisions while appearing healthy on dashboards.
This solution addresses a critical gap: lack of production-ready inference monitoring options in the ML ecosystem. While training pipelines are well-served by managed platforms, comprehensive inference monitoring with drift detection, ground truth integration, and automated alerting requires custom solutions or expensive enterprise platforms.
This is especially critical for fraud detection, where:
- Fraudsters adapt: Attack patterns evolve to evade detection
- Customer behavior shifts: Economic conditions, seasonal trends, new payment methods
- Delayed ground truth: Fraud confirmation takes days or weeks, making real-time accuracy impossible to measure
- High stakes: False positives annoy customers, false negatives cost money
The ML Governance Gap
Most organizations have robust training pipelines but lack production inference monitoring. They deploy models and hope for the best. When performance degrades, they discover it through business metrics (customer complaints, financial losses) rather than automated alerts. By then, damage is done.
Existing solutions are inadequate:
- Managed platforms require significant upfront licensing fees and lock you into proprietary systems
- Built-in monitoring tools often lack integration with open-source drift detection libraries and have limited serverless endpoint support
- Custom solutions require months of engineering effort to build drift detection, ground truth integration, and alerting
This Project's Solution
A ** Open-source MLOps system** that establishes automated monitoring and governance for ML inference:
- Training Pipeline: SageMaker Pipelines with automated preprocessing, training (XGBoost), evaluation, and deployment
- Experiment Tracking: MLflow for model versioning, metrics, and artifact management
- Data Lake: Athena Iceberg tables for training data and inference logging
- Custom Inference Handler: Automatic logging of all predictions with buffered async writes (zero latency impact)
- Monitoring & Drift Detection: Continuous model performance tracking with Evidently-powered interactive reports, automated daily checks, and SNS email alerts
- Ground Truth Integration: Asynchronous ground truth capture with Athena MERGE updates for delayed fraud confirmations
- Governance Dashboard: QuickSight dashboard with automated daily refresh showing inference trends, drift history, and model performance
Key Benefits:
- Cost Efficient: Pay-as-you-go serverless architecture scales to zero when idle. Variable costs (inference invocations, Athena queries, Lambda executions) scale linearly with traffic, while fixed costs (S3 storage, MLflow tracking) remain minimal and predictable. No upfront licensing fees or reserved capacity required.
- Portable: Open-source SDKs (MLflow, Evidently, Pandas) run anywhere β AWS, GCP, Azure, on-prem
- Production Ready: Handles real-world ML challenges (delayed ground truth, data drift, concept drift, alerting)
- Automated: EventBridge schedules drift checks at 2 AM, QuickSight refreshes at 3 AM β no manual intervention
- Comprehensive: Training, inference, monitoring, drift detection, and governance in one integrated system
13-Step Detailed Flow: This diagram shows the complete inference monitoring pipeline from real-time predictions through ground truth backfill, drift detection, and governance visualization. Each data flow is numbered and labeled for clarity.
This detailed diagram shows the end-to-end inference monitoring flow with MLflow as the central monitoring interface:
1. Real-Time Inference (Top)
- Application sends transaction with 30 features β SageMaker Endpoint
- Endpoint runs XGBoost model, returns prediction + fraud probability
- Zero latency added to inference (async logging happens in background)
2. Async Logging Pipeline
- Inference handler sends prediction to SQS queue (fire-and-forget)
- Lambda consumer batches messages (10 msgs or 30s window)
- Writes batch to Athena inference_responses table
- Partitioned by date for efficient querying
3. Ground Truth Integration
- Fraud investigations confirm actual fraud (days/weeks later)
- Updates flow to ground_truth_updates table
- Athena MERGE updates inference_responses with confirmed labels
- Enables model performance tracking with real outcomes
4. MLflow Monitoring Interface (Central Hub) MLflow serves as the unified monitoring dashboard where all monitoring workflows converge:
Metrics Logged:
- Data Drift:
drifted_columns_count,drifted_columns_share, per-featuredrift_score_*(via Evidently DataDriftPreset) - Model Performance:
current_roc_auc,accuracy,precision,recall, plus Evidently classification metrics (evidently_accuracy,evidently_f1, etc.) - Degradation:
roc_auc_degradation,roc_auc_degradation_pct - Detection Flags:
data_drift_detected,model_drift_detected
Visualizations (artifacts/evidently_reports/):
- Interactive Evidently HTML data drift report (PSI, KS, distribution comparisons per feature)
- Interactive Evidently HTML classification report (ROC curve, PR curve, confusion matrix, accuracy, F1)
Artifacts:
evidently_reports/data_drift_*.html- Full interactive data drift dashboardevidently_reports/classification_*.html- Full interactive classification dashboarddrift_reports/drift_summary_*.json- Structured JSON drift summary
5. Monitoring Workflows
- Manual: Data scientists run
2a_inference_monitoring.ipynbβ query Athena β run Evidently reports β log interactive HTML reports to MLflow - Automated: EventBridge triggers Lambda daily β runs Evidently DataDriftPreset + ClassificationPreset β logs HTML reports & metrics to MLflow β sends SNS alert if drift detected
6. Alerting
- SNS sends email notifications when thresholds exceeded
- Includes drifted features, performance metrics, and actionable recommendations
- Email contains links to MLflow for detailed analysis
β MLflow as Single Pane of Glass - All monitoring metrics, Evidently HTML reports, and artifacts in one interface β Evidently-Powered Drift Detection - Interactive HTML reports with PSI, KS, distribution comparisons per feature β Automated + Manual - Both notebook-based exploration and scheduled Lambda checks β Ground Truth Integration - Handles delayed fraud confirmation typical in fraud detection β Zero Inference Latency - Async logging doesn't impact prediction response time β Cost-Efficient - $5/month Lambda + $25/month Athena vs. $200+/month for SageMaker Model Monitor
Key Differentiators:
-
Open-Source SDKs (MLflow, Evidently, Pandas, Scikit-learn) - Ensures portability across AWS, GCP, Azure, or on-prem. No vendor lock-in. Industry-standard tools = easier hiring.
-
Cost Efficiency - Serverless architecture scales to zero when idle, charging only for actual usage rather than reserved capacity. Variable costs (inference invocations, Athena queries, Lambda executions) scale linearly with traffic. Fixed costs (S3 storage, MLflow tracking) remain minimal. No upfront licensing or minimum commitments required.
-
Custom Inference Monitoring - Evidently-powered drift detection (DataDriftPreset for all features, ClassificationPreset for model performance), automated ground truth integration, and EventBridge/Lambda alerting with interactive HTML reports logged to MLflow. Most platforms lack comprehensive inference monitoring or charge premium rates.
-
Production-Grade Without Platform Costs - SageMaker provides reliability (99.9% SLA) while MLflow provides portability. Best of both worlds: enterprise reliability + startup agility.
vs. Alternatives: This solution balances pay-as-you-go pricing, portability, and operational simplicity. Fully managed platforms require significant upfront licensing and vendor lock-in. Pure cloud-native solutions lack portability. Open-source platforms are portable but require substantial DevOps investment. This architecture delivers enterprise reliability with startup agility.
Ideal For: Teams needing production ML with comprehensive monitoring, audit trails, and multi-cloud optionality without enterprise platform licensing costs or operational complexity.
SageMaker provides a built-in DataCaptureConfig that captures raw request/response payloads from real-time endpoints and writes them to S3 in jsonl format. But it only supports real-time endpoints as of today.
DataCaptureConfig makes sense if you are using real-time endpoints. For this architecture β which prioritizes Evidently-powered drift detection, MLflow as the monitoring hub, Athena as the data lake, and cost efficiency β a custom SQSβLambdaβAthena pipeline is the better fit.
sagemaker-automated-drift-and-trend-monitoring/
βββ src/
β βββ pipeline/
β β βββ pipeline.py # Complete pipeline definition
β β βββ inference_handler.py # Custom inference handler with Athena logging
β β βββ deploy.py # Manual model deployment (with custom handler)
β β βββ train.py # Manual training script
β β βββ test_endpoint.py # Endpoint testing with analytics
β β βββ batch_transform.py # Batch transform for bulk scoring
β β βββ pipeline_cli.py # Pipeline CLI commands
β β βββ inference_requirements.txt # Custom handler dependencies (awswrangler)
β β βββ pipeline_steps/
β β β βββ preprocessing.py # Data preprocessing (ScriptProcessor)
β β β βββ preprocessing_pyspark.py # PySpark preprocessing alternative
β β β βββ train.py # XGBoost training script
β β β βββ evaluation.py # Model evaluation with quality gates
β β β βββ inference_monitoring.py # Drift detection (PSI, KS, model drift)
β β β βββ inference.py # Inference script
β β β βββ lambda_deploy_endpoint.py # Lambda for endpoint deployment
β β β βββ lambda_test_inference.py # Lambda for inference testing
β β β βββ deploy_endpoint.py # Endpoint deployment utilities
β β β βββ test_inference.py # Inference test utilities
β β β βββ requirements_train.txt # Training step dependencies
β β β βββ requirements_evaluation.txt # Evaluation step dependencies
β β β βββ requirements_preprocessing.txt # Preprocessing step dependencies
β β βββ athena/
β β βββ athena_client.py # Athena query operations
β β βββ athena_client_pyspark.py # PySpark Athena integration
β β βββ data_migrator.py # CSV β Athena Iceberg migration
β β βββ iceberg_manager.py # Iceberg table management
β β βββ schema_definitions.py # Table schema definitions
β βββ config/
β β βββ config.py # Configuration constants
β β βββ config.yaml # Central configuration
β βββ monitoring/
β β βββ evidently_reports.py # Evidently-based drift & classification reports
β β βββ monitor_model_performance.py # Performance monitoring & alerts
β β βββ lambda_drift_monitor.py # Automated drift monitoring (Evidently + MLflow)
β β βββ lambda_inference_logger.py # SQS-to-Athena inference log consumption
β β βββ generate_drift_dataset.py # Generate drifted test data
β β βββ simulate_ground_truth_from_athena.py # Ground truth simulator (dev/test)
β β βββ generate_ground_truth_confirmations.py # Generate test confirmations
β β βββ update_ground_truth.py # Merge ground truth into inference records
β βββ setup/
β β βββ setup_athena_tables.py # Create Athena DB and tables
β β βββ setup_inference_logging.py # SQS + Lambda inference logging
β β βββ setup_drift_monitoring.py # Drift monitoring infrastructure
β β βββ setup_scheduled_inference.py # EventBridge scheduled inference
β β βββ setup_scheduled_batch_transform.py # Scheduled batch transform
β β βββ upload_data_to_s3.py # Upload local CSV data to S3
β β βββ create_or_update_sagemaker_role.py # IAM role setup
β β βββ create_lambda_role.py # Lambda execution role
β β βββ deploy_drift_monitoring.sh # CI/CD deployment script
β βββ utils/
β β βββ aws_session.py # AWS session helpers
β β βββ aws_utils.py # AWS session and role utilities
β β βββ mlflow_utils.py # MLflow tracking helpers
β β βββ visualization_utils.py # Chart generation for MLflow
β βββ governance/
β βββ setup_quicksight_governance.py # QuickSight governance dashboard setup
βββ notebooks/
β βββ 1_training_pipeline.ipynb # Interactive pipeline control notebook
β βββ 2a_inference_monitoring.ipynb # Monitoring & drift detection notebook
β βββ inference_monitoring_with_pipeline.ipynb # Pipeline-based automated drift monitoring with Evidently
β βββ 3_governance_dashboard.ipynb # QuickSight governance dashboard setup
β βββ 4_optional_version_validation.ipynb # Version consistency validation (MLflow, SageMaker, Athena)
β βββ 5_optional_cleanup.ipynb # Resource cleanup (Lambda, SageMaker, Athena, S3, IAM)
βββ data/
β βββ creditcard_predictions_final.csv # Training data (284K rows, 30 features)
β βββ creditcard_ground_truth.csv # Ground truth labels
β βββ creditcard_drifted.csv # Drifted data for testing
βββ docs/
β βββ MetaMonitoring.png # 11-step end-to-end architecture diagram
β βββ inference_monitoring_processflow.png # 13-step inference monitoring flow
β βββ ARCHITECTURE_STEPS.md # Detailed descriptions of all 11 architecture steps
β βββ CRITICAL_CONFIG_SETTINGS.md # Essential config.yaml settings guide
β βββ GROUND_TRUTH_FLOW.md # Two-table ground truth architecture
β βββ generate_architecture_diagram.py # [Legacy] Generates architecture_diagram.png (AWS icons)
β βββ generate_inference_monitoring_diagram.py # [Legacy] Generates inference_monitoring_diagram.png (AWS icons)
β βββ generate_mlflow_evidently_diagram.py # Generates mermaid-diagram-mlflow-evidently.png (AWS icons)
β βββ icons/ # Official AWS Architecture Icons + third-party logos
β βββ guides/ # Architecture diagrams and references
β β βββ MetaMonitoring.png # Main architecture diagram (11 steps)
β β βββ inference_monitoring_processflow.png # Detailed monitoring flow (13 steps)
β β βββ architecture_diagram.png # [Legacy] Programmatically generated architecture
β β βββ inference_monitoring_diagram.png # [Legacy] Programmatically generated monitoring flow
β β βββ mermaid-diagram-mlflow-evidently.png # MLflow + Evidently monitoring flow
β βββ screenshots/ # Screenshots and visual guides
β βββ DirectTestingInSGPlayground-custom-handler.png # SageMaker Studio screenshot
β βββ quicksight/ # QuickSight dashboard screenshots
β βββ README.md # Detailed guide to understanding drift scores
β βββ Quicksight-Governance-dashboard.pdf # QuickSight governance dashboard
βββ main.py # CLI entry point
βββ .env.example # Environment template
βββ README.md # This file
The main architecture diagrams are:
- MetaMonitoring.png β 11-step end-to-end MLOps pipeline (hand-built in Excalidraw)
- inference_monitoring_processflow.png β 13-step detailed monitoring flow (hand-built in Excalidraw)
Editable source files:
docs/guides/architecture_diagram.excalidrawβ Open at excalidraw.comdocs/inference_monitoring_flow.excalidrawβ Generated programmatically, editable in Excalidraw
Previous programmatically-generated diagrams (now superseded by hand-built versions) can be regenerated:
pip install diagrams
brew install graphviz # macOS
python docs/generate_architecture_diagram.py
python docs/generate_inference_monitoring_diagram.py
python docs/generate_mlflow_evidently_diagram.py- Python 3.12+
- AWS account with SageMaker access
uvpackage manager
# Configure environment
cp .env.example .env
# Edit .env with your AWS settingsThe CSV data files (data/*.csv) are not checked into git. If you don't plan to use your own Athena tables for training and need local data to get started, use the generator script to produce synthetic datasets with the same schema and statistical properties:
# Generate all three CSVs (predictions, drifted, ground truth)
python data/generate_datasets.py
# Or generate individually
python data/generate_datasets.py --predictions
python data/generate_datasets.py --drifted
python data/generate_datasets.py --ground-truthThis creates files with a generated_ prefix. Rename them to use in the pipeline:
cd data
mv generated_creditcard_predictions_final.csv creditcard_predictions_final.csv
mv generated_creditcard_drifted.csv creditcard_drifted.csv
mv generated_creditcard_ground_truth.csv creditcard_ground_truth.csvVerify the files are in place:
ls -lh data/*.csv
# Expected files:
# creditcard_predictions_final.csv (~50 MB, 284K rows, training data)
# creditcard_ground_truth.csv (ground truth labels)
# creditcard_drifted.csv (drifted data for testing)Step 2: Upload to S3
# Preview what will be uploaded (dry run)
python -m src.setup.upload_data_to_s3 --dry-run
# Upload all CSV files to S3
python -m src.setup.upload_data_to_s3
# Upload a single file
python -m src.setup.upload_data_to_s3 --file data/creditcard_predictions_final.csvThis uploads the CSV files to s3://<DATA_S3_BUCKET>/fraud-detection/data/. The S3 paths are used by the Athena data migration step (main.py setup --migrate-data) and by the inference monitoring notebook for baseline drift detection.
# MLflow - Use ARN format for programmatic access
MLFLOW_TRACKING_URI=arn:aws:sagemaker:us-east-1:YOUR_ACCOUNT:mlflow-app/app-YOUR_ID
MLFLOW_EXPERIMENT_NAME=credit-card-fraud-detection-training
MLFLOW_MODEL_NAME=xgboost-fraud-detector
# AWS
AWS_DEFAULT_REGION=us-east-1
SAGEMAKER_EXEC_ROLE=arn:aws:iam::YOUR_ACCOUNT:role/SageMakerExecutionRole
# Athena
ATHENA_DATABASE=fraud_detection
DATA_S3_BUCKET=fraud-detection-data-lake-YOUR_ACCOUNT
ATHENA_OUTPUT_S3=s3://fraud-detection-data-lake-YOUR_ACCOUNT/athena-query-results/
# SQS
SQS_QUEUE_URL=<Set after configuring SQS>
SQS_QUEUE_NAME=fraud-inference-logs
LAMBDA_LOGGER_NAME=fraud-inference-log-consumerAdd the following inline policy to your execution role through the UI to give S3 access:
json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:CreateBucket",
"s3:PutBucketVersioning",
"s3:PutObject",
"s3:GetObject",
"s3:ListBucket",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<DATA_S3_BUCKET>",
"arn:aws:s3:::<DATA_S3_BUCKET>/*"
]
}
]
}# Upload CSV data to S3 (required before migration)
python -m src.setup.upload_data_to_s3
# Create S3 bucket, Athena database, Iceberg tables, and migrate CSV data
python main.py setup --migrate-dataThis creates:
- S3 bucket for data lake and model artifacts
- Athena database
fraud_detection - 5 Iceberg tables:
training_data,inference_responses,ground_truth,ground_truth_updates,drifted_data - Migrates CSV data to Athena for pipeline access
Add the following inline policy to your execution role to allow it to manage Lambda functions.
{
"Effect": "Allow",
"Action": [
"lambda:CreateFunction",
"lambda:UpdateFunctionCode",
"lambda:UpdateFunctionConfiguration",
"lambda:GetFunction",
"lambda:DeleteFunction",
"lambda:InvokeFunction"
],
"Resource": "arn:aws:lambda:us-east-1:YOUR_ACCOUNT:function:fraud-detection-*"
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": "arn:aws:iam::YOUR_ACCOUNT:role/*",
"Condition": {
"StringEquals": {
"iam:PassedToService": "lambda.amazonaws.com"
}
}
}Add the following trust policy to allow Lambda to assume the execution role
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": [
"sagemaker.amazonaws.com",
"lambda.amazonaws.com"
]
},
"Action": "sts:AssumeRole"
}
]
}Add the following permissions to allow the creation of an SQS queue and lambda function for inference logging to Athena:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sqs:CreateQueue",
"sqs:GetQueueUrl"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:SendMessage"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"lambda:CreateFunction",
"lambda:UpdateFunctionCode",
"lambda:GetFunction",
"lambda:CreateEventSourceMapping"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": "<EXECUTION_ROLE>",
"Condition": {
"StringEquals": {
"iam:PassedToService": "lambda.amazonaws.com"
}
}
}
]
}The CloudWatchLogsFullAccess managed policy only covers Logs. To publish custom metrics, create alarms, and build dashboards (Cell 40 in 2a_inference_monitoring.ipynb), add this inline policy to your execution role:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"cloudwatch:PutMetricData",
"cloudwatch:PutMetricAlarm",
"cloudwatch:DescribeAlarms",
"cloudwatch:PutDashboard",
"cloudwatch:GetDashboard",
"cloudwatch:ListDashboards"
],
"Resource": "*"
}
]
}uv run main.py setup-logging
Update the SQS_URL in .env with the created SQS queue.
Option 1: Jupyter Notebook (Recommended)
# In SageMaker Studio, open:
notebooks/1_training_pipeline.ipynb
# Run cells sequentially:
# Cell 1-3: Setup and configuration
# Cell 4: Create/update pipeline definition
# Cell 5: Execute pipeline and wait for completion (~25 minutes)Option 2: CLI
# Create pipeline
python main.py pipeline create --pipeline-name fraud-detection-pipeline
# Execute pipeline
python main.py pipeline start --pipeline-name fraud-detection-pipeline --wait
# Check status
python main.py pipeline status --pipeline-name fraud-detection-pipeline# 1. Open monitoring notebook
# notebooks/2a_inference_monitoring.ipynb
# 2. Run inference tests (Cells 1-9 in the notebook)
# Or via CLI:
# uv run main.py test --endpoint-name fraud-detector-endpoint --num-samples 50
# 3. Simulate ground truth (for development/testing)
python -m src.monitoring.simulate_ground_truth_from_athena --accuracy 0.85
# 4. Apply ground truth updates
python -m src.monitoring.update_ground_truth --mode batch
# 5. Run monitoring & drift detection (Cells 27-40 in 2a_inference_monitoring.ipynb)
# Generates 8+ charts, calculates metrics, detects driftExpected Output:
- Predictions logged to Athena automatically
- 100% ground truth coverage (simulated)
- Performance metrics: ROC-AUC, precision, recall
- Drift detection with visualizations
- 8 charts uploaded to MLflow
The pipeline deploys models with a custom inference handler instead of SageMaker's built-in XGBoost serving:
| Feature | Built-in XGBoost | Custom Handler |
|---|---|---|
| Predictions | β | β |
| Input Format | CSV, libsvm | JSON (extensible) |
| Output Format | Single probability | Detailed predictions + probabilities |
| Athena Logging | β Manual | β Automatic |
| MLflow Integration | β | β |
| Performance Metrics | β | β Latency, confidence |
| Monitoring Flags | β | β High/low confidence detection |
| Production-Ready | β Enterprise-grade |
Every inference request is logged with:
- Prediction metadata: inference_id, timestamp, prediction, probabilities
- Performance: latency (total, preprocessing, model), confidence scores
- Transaction context: transaction_id, amount, customer_id
- Model info: endpoint_name, model_version, mlflow_run_id
Inference logs are written asynchronously via SQS to avoid impacting prediction latency:
Custom Handler β SQS (fire-and-forget, per prediction)
β
SQS batches messages (10 msgs or 30s window)
β
Lambda triggered (lambda_inference_logger.py)
β
INSERT INTO fraud_detection.inference_responses
- Zero latency impact β SQS
send_messageis fire-and-forget in the inference handler - Batched writes β Lambda receives up to 10 SQS records per invocation
- Athena INSERT β Lambda builds a multi-row
INSERT INTOstatement and executes via Athena - Graceful failure β SQS retains messages for 24 hours if Lambda fails
Request (30 features):
{
"transaction_hour": 14,
"transaction_day_of_week": 2,
"transaction_amount": 149.62,
"customer_age": 42,
"customer_gender": 0,
"distance_from_home_km": 5.2,
"merchant_category_code": 5411,
"chip_transaction": 1,
"num_transactions_24h": 3,
"credit_limit": 5000.0,
"available_credit_ratio": 0.75
// ... (all 30 training features)
}Response:
{
"predictions": [0],
"probabilities": {
"fraud": [0.1234],
"non_fraud": [0.8766]
}
}After deployment, you can test the endpoint directly in SageMaker Studio without writing code.
Access Path:
SageMaker Studio > Deployments > Endpoints > Fraud Detector Endpoint > Playground
Steps:
-
Navigate to endpoint:
- Open SageMaker Studio
- Left sidebar: Deployments β Endpoints
- Click on
fraud-detector-endpoint - Click "Playground" tab
-
Configure request:
- Content-Type:
application/json - Request body: Paste JSON payload with 30 features
- Content-Type:
-
Example payload:
{ "transaction_hour": 14, "transaction_day_of_week": 2, "transaction_amount": 149.62, "transaction_type_code": 1, "customer_age": 42, "customer_gender": 0, "customer_tenure_months": 36, "account_age_days": 1095, "distance_from_home_km": 5.2, "distance_from_last_transaction_km": 2.3, "time_since_last_transaction_min": 120, "online_transaction": 1, "international_transaction": 0, "high_risk_country": 0, "merchant_category_code": 5411, "merchant_reputation_score": 0.85, "chip_transaction": 1, "pin_used": 1, "card_present": 1, "cvv_match": 1, "address_verification_match": 1, "num_transactions_24h": 3, "num_transactions_7days": 12, "avg_transaction_amount_30days": 125.50, "max_transaction_amount_30days": 450.00, "velocity_score": 0.3, "recurring_transaction": 0, "previous_fraud_incidents": 0, "credit_limit": 5000.0, "available_credit_ratio": 0.75 } -
Click "Test"
-
Expected response:
{ "predictions": [0], "probabilities": { "fraud": [0.1234], "non_fraud": [0.8766] } }
Screenshot: See docs/screenshots/DirectTestingInSGPlayground-custom-handler.png for visual guide
When to use:
- Quick manual testing without code
- Debugging endpoint issues
- Demonstrating to stakeholders
- Validating deployment after pipeline execution
- Generating sample payloads for documentation
Note: The playground sends requests to your deployed endpoint, so predictions will be automatically logged to Athena (custom handler feature).
- Inference Handler (SQS):
src/train_pipeline/pipeline_steps/inference.pyβ sends each prediction to SQS - Lambda Consumer:
lambda_inference_logger.pyβ reads SQS batches, writes to Athena - Setup Script:
src/setup/setup_inference_logging.pyβ creates SQS queue + Lambda + event source mapping - Legacy Handler (Direct Athena):
src/train_pipeline/inference_handler.pyβ buffered awswrangler writes (alternative) - Dependencies:
src/train_pipeline/inference_requirements.txt - Pipeline Integration:
src/train_pipeline/pipeline.py
How it works (SQS path):
# In the inference handler (predict_fn)
sqs.send_message(
QueueUrl=SQS_QUEUE_URL,
MessageBody=json.dumps({
'inference_id': str(uuid.uuid4()),
'request_timestamp': datetime.utcnow().isoformat(),
'prediction': int(predictions[idx]),
'probability_fraud': fraud_prob,
# ... 22 more fields
}),
)
# Lambda consumer (lambda_inference_logger.py)
# Triggered by SQS with batch of up to 10 messages
# Builds: INSERT INTO fraud_detection.inference_responses VALUES (row1), (row2), ...
# Executes via athena.start_query_execution()- Preprocess - Read from Athena, validate, encode categorical, split train/test (80/20)
- Train - XGBoost with automatic class imbalance handling (scale_pos_weight)
- Evaluate - Calculate ROC-AUC, PR-AUC, precision, recall, F1, confusion matrix
- Quality Gate - Register model only if ROC-AUC β₯ 0.85 AND PR-AUC β₯ 0.50
- Deploy - Create endpoint with custom handler and Athena logging
Via Notebook:
# In 1_training_pipeline.ipynb Cell 4
result = pipeline_builder.upsert_pipeline(
pipeline_name='fraud-detection-pipeline',
pipeline_display_name='Credit Card Fraud Detection Pipeline'
)Via CLI:
python main.py pipeline create --pipeline-name fraud-detection-pipelineVia Notebook:
# Cell 5: Execute and wait
execution = pipeline.start()
execution.wait(delay=60, max_attempts=30)Via CLI:
# With waiting
python main.py pipeline start --pipeline-name fraud-detection-pipeline --wait
# Without waiting (monitor in console)
python main.py pipeline start --pipeline-name fraud-detection-pipelineAWS Console:
SageMaker β Pipelines β fraud-detection-pipeline β Executions
CLI:
# Get latest execution status
python main.py pipeline status --pipeline-name fraud-detection-pipeline
# List all executions
python main.py pipeline list-executions --pipeline-name fraud-detection-pipelineExpected Duration: ~25 minutes
- Preprocess: 3-5 min
- Train: 10-15 min
- Evaluate: 2-3 min
- Deploy: 5-7 min
ARN Format (Programmatic Access):
# Use ARN for SDK and pipeline access
MLFLOW_TRACKING_URI = "arn:aws:sagemaker:us-east-1:<ACCOUNT_ID>:mlflow-app/app-ABC123"
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)HTTPS Format (Web UI Only):
# Use HTTPS URL only for opening in browser
https://abc123def.mlflow-tracking.us-east-1.sagemaker.aws/
The training step automatically logs to MLflow:
# In training step
mlflow.xgboost.autolog() # Automatic parameter and metric logging
with mlflow.start_run():
model.fit(X_train, y_train)
# MLflow automatically logs:
# - Hyperparameters (max_depth, learning_rate, etc.)
# - Training metrics (logloss, error)
# - Model artifacts
# - Feature importance
# - Confusion matrixFrom SageMaker Studio:
1. Left sidebar β Partner AI Apps β MLflow
2. Click on your MLflow app
3. Opens in new tab with authentication
Experiments:
credit-card-fraud-detection-training- Training runscredit-card-fraud-detection-inference- Inference monitoringcredit-card-fraud-detection-batch- Batch transform jobs
Models are automatically registered on successful evaluation:
# In evaluation step (if ROC-AUC β₯ 0.85)
mlflow.register_model(
model_uri=f"runs:/{run_id}/model",
name="xgboost-fraud-detector"
)Model Versions:
- Version 1, 2, 3... (incremented automatically)
- Stages:
NoneβStagingβProductionβArchived - Tags: training_date, pipeline_execution_id, roc_auc, pr_auc
Inference (T+0) β SQS β Lambda β Athena: inference_responses (ground_truth=NULL)
β
Ground Truth Capture (T+1 to T+30) β ground_truth_updates
β
Batch MERGE β inference_responses (ground_truth populated)
β
Monitoring β Calculate metrics where ground_truth IS NOT NULL
π IMPORTANT: Production Implementation Required
The current implementation uses simulation for development/testing. In production, replace with actual business processes:
Production Sources:
- Fraud Investigation Systems: Confirmed fraud cases from investigation team
- Chargeback Notifications: Disputed transactions from payment processors
- Customer Complaints: Fraud reports from customer service
- Merchant Reports: Suspicious activity flagged by merchants
- Law Enforcement: Cases confirmed by authorities
Implementation Pattern:
# Example: Fraud investigation system integration
def capture_investigation_result(transaction_id, is_fraud, source):
"""
Called when fraud investigation completes.
Replace simulate_ground_truth_from_athena.py with this in production.
"""
ground_truth_update = {
'transaction_id': transaction_id,
'inference_id': lookup_inference_id(transaction_id),
'actual_fraud': is_fraud,
'confirmation_timestamp': datetime.now(),
'confirmation_source': source, # e.g., 'fraud_investigation'
'investigation_notes': get_investigation_notes(transaction_id)
}
# Write to ground_truth_updates table
write_to_athena(ground_truth_update)Development/Testing (Current):
# Simulates realistic ground truth with configurable accuracy
python -m src.monitoring.simulate_ground_truth_from_athena --accuracy 0.85This creates realistic confirmations:
- Fraud cases: 1-7 days confirmation delay
- Non-fraud: 1-30 days confirmation delay
- Configurable accuracy (default 85%)
- Realistic confirmation sources
# Batch mode: Process all pending updates
python -m src.monitoring.update_ground_truth --mode batch
# Streaming mode: Process recent updates (last 24 hours)
python -m src.monitoring.update_ground_truth --mode streaming --window-hours 24This performs Athena MERGE to update ground_truth column in inference_responses.
# Monitor last 30 days
python -m src.monitoring.monitor_model_performance --days 30
# With custom alert threshold
python -m src.monitoring.monitor_model_performance \
--days 30 \
--alert-threshold 0.80 \
--endpoint fraud-detector-prodOutput:
Ground Truth Coverage:
Total predictions: 50,000
With ground truth: 12,500 (25.00%)
Avg days to confirmation: 8.5
Overall Performance (last 30 days):
ROC-AUC: 0.8742
PR-AUC: 0.6521
Precision: 0.7245
Recall: 0.6891
F1-Score: 0.7063
β No performance degradation detected
All drift detection is powered by Evidently AI (v0.7.x). The notebook and Lambda both use evidently_reports.py which wraps Evidently's DataDriftPreset and ClassificationPreset.
Evidently's DataDriftPreset automatically selects the appropriate statistical test per feature (KS for numeric, chi-square for categorical) and computes drift scores. The interactive HTML report includes per-feature distribution comparisons, PSI values, and statistical test results.
Legacy reference: The calculate_psi() and calculate_ks_statistic() functions in lambda_drift_monitor.py are preserved as legacy examples showing how to compute these metrics manually without Evidently.
Population Stability Index (PSI):
PSI measures how much a feature's distribution has shifted between the baseline (training) period and the current (inference) period.
Formula:
PSI = Ξ£ (actual_pct_i - expected_pct_i) Γ ln(actual_pct_i / expected_pct_i)
Where expected_pct_i and actual_pct_i are the proportion of observations in bin i for the baseline and current distributions, respectively.
How it's computed (see inference_monitoring.py:86-124):
- Create bin edges from the baseline (training) data using percentile-based breakpoints (10 bins by default)
- Histogram both baseline and current data into the same bins
- Normalize each histogram to proportions (percentage of total in each bin)
- Apply a floor of
0.0001to avoidln(0)or division by zero - Sum the PSI contribution from each bin:
(actual% - expected%) Γ ln(actual% / expected%)
Interpretation thresholds:
| PSI Value | Interpretation | Action |
|---|---|---|
| < 0.1 | No significant shift | None |
| 0.1 β 0.2 | Moderate shift | Monitor closely |
| β₯ 0.2 | Significant shift | Investigate / retrain |
Avg PSI (Population-Level Drift Score):
Avg PSI is the simple arithmetic mean of per-feature PSI values across all monitored features. It provides a single scalar summarizing overall dataset drift.
# From detect_data_drift() β summary
avg_psi = mean(psi_feature_1, psi_feature_2, ..., psi_feature_N)This metric is logged to MLflow as drift_avg_psi alongside drift_max_psi and drift_median_psi.
A feature is flagged as drifted if either its PSI β₯ 0.2 or its KS test p-value < 0.05.
Feature Extraction for Drift Detection:
The drift detection process (Cell 31 in 2a_inference_monitoring.ipynb) analyzes all 30 training features by parsing the input_features JSON column:
import json
# Parse JSON to extract all features from inference data
feature_rows = []
for idx, row in inference_raw.iterrows():
try:
# Parse the input_features JSON string
features_dict = json.loads(row['input_features'])
# Extract exactly the 30 training features
feature_values = {feat: features_dict.get(feat, np.nan) for feat in TRAINING_FEATURES}
feature_rows.append(feature_values)
except Exception as e:
print(f' β Failed to parse row {idx}: {e}')
continue
# Create DataFrame with all 30 features
inference_numeric = pd.DataFrame(feature_rows)
# Run drift detection on ALL features
drift_results = detect_data_drift(
baseline_data=baseline_numeric[common_features], # 284K rows Γ 30 features
current_data=inference_numeric, # Inference rows Γ 30 features
feature_names=common_features, # All 30 features
threshold_psi=0.2,
threshold_ks=0.05,
)Expected Output:
================================================================================
DATA DRIFT ANALYSIS SUMMARY
================================================================================
Total features analyzed: 30 β All features extracted from JSON
Drifted features: 5
Drift percentage: 16.7%
Avg PSI: 0.0842
Max PSI: 0.2145
β DRIFTED FEATURES (5):
================================================================================
π΄ CRITICAL transaction_amount: PSI=0.2145
π MODERATE customer_age: PSI=0.1523
π MODERATE distance_from_home_km: PSI=0.1205
π‘ MINOR online_transaction: PSI=0.0923
π‘ MINOR merchant_reputation_score: PSI=0.0812
Chi-Square Test:
# For categorical features
chi2_stat, p_value = chi2_test(reference_counts, current_counts)
# Alert if p_value < 0.05 (distribution changed significantly)Kolmogorov-Smirnov (KS) Test:
# For continuous feature distributions
ks_stat, p_value = ks_test(reference_values, current_values)
# Alert if p_value < 0.05This project uses Evidently for advanced drift detection with configurable statistical tests. For fraud detection, we default to the Kolmogorov-Smirnov (KS) test over Wasserstein distance.
KS Test Advantages:
- Tail Sensitivity β - Detects maximum difference in CDFs, crucial for catching rare fraud pattern changes
- Statistical Significance - Returns p-value (e.g., p < 0.05 = 95% confidence), helps distinguish real drift from noise
- Distribution-Free - No assumptions about underlying distributions (robust to non-normal data)
- Shape Detection - Catches changes in distribution shape, not just location/scale shifts
- Interpretable - KS statistic = max vertical distance between CDFs (0-1 scale)
Why Not Wasserstein?
- Less Tail-Sensitive - Can miss subtle changes in extreme values (where fraud patterns emerge)
- No P-Value - Harder to set universal thresholds across different features
- Mean-Focused - Better for overall distribution shifts than detecting new fraud tactics
- Computationally Heavier - O(n log n) vs O(n) for KS test
Real-World Example:
Scenario: New fraud ring targeting $500-$600 transactions
Baseline: Transactions spread $0-$1000 (normal distribution)
Current: Slight spike at $500-600 (2% of transactions)
Wasserstein: 0.08 (small overall shift, might not alert)
KS Test: 0.15, p=0.001 (significant tail change, alerts)
For fraud detection, catching rare pattern changes early (KS strength) is more critical than measuring overall distribution distance (Wasserstein strength).
Configure the statistical test in .env:
# Statistical test for numerical features
# Options: 'ks' (default), 'wasserstein', 'kl_div', 'psi', 'jensenshannon'
EVIDENTLY_NUM_STAT_TEST=ks
# KS test thresholds
KS_DRIFT_THRESHOLD=0.1 # Moderate drift threshold
KS_PVALUE_THRESHOLD=0.05 # Statistical significance (95% confidence)
KS_ENABLE_CDF_PLOTS=true # Generate CDF comparison visualizations
KS_MAX_FEATURES_TO_PLOT=10 # Top N drifted features to visualize
# Categorical feature test (unchanged)
EVIDENTLY_CAT_STAT_TEST=chi_squareConfiguration in Code:
# src/config/config.py (lines 202-210)
EVIDENTLY_NUM_STAT_TEST = os.getenv('EVIDENTLY_NUM_STAT_TEST', 'ks')
KS_DRIFT_THRESHOLD = float(os.getenv('KS_DRIFT_THRESHOLD', '0.1'))
KS_PVALUE_THRESHOLD = float(os.getenv('KS_PVALUE_THRESHOLD', '0.05'))When KS test is enabled (EVIDENTLY_NUM_STAT_TEST=ks), additional visualizations are generated:
1. CDF Comparison Plots - Shows empirical CDFs for baseline vs current distributions with maximum KS distance marked
2. KS Statistics Heatmap - Two-panel visualization:
- Left: KS statistic bars (0-1 scale, color-coded by severity)
- Right: -log10(p-value) bars (statistical significance)
3. Enhanced MLflow Metrics:
# Aggregated KS metrics
ks_mean_ks_statistic
ks_max_ks_statistic
ks_median_ks_statistic
ks_features_moderate_drift # 0.1 β€ KS < 0.2
ks_features_high_drift # KS β₯ 0.2
ks_features_significant # p-value < 0.05The KS test uses both the KS statistic and p-value for robust drift detection:
# Drift detection logic
if ks_stat > 0.2 and p_value < 0.05:
severity = "CRITICAL" # Immediate investigation
elif ks_stat > 0.1 and p_value < 0.05:
severity = "MODERATE" # Monitor closely
elif ks_stat > 0.1 or p_value < 0.05:
severity = "WARNING" # Single threshold met
else:
severity = "NORMAL"This dual-threshold approach reduces false positives while maintaining high sensitivity to real drift.
To use Wasserstein distance instead:
# In .env
EVIDENTLY_NUM_STAT_TEST=wasserstein
EVIDENTLY_DRIFT_THRESHOLD=0.2 # Overall drift thresholdWhen to use Wasserstein:
- General ML pipelines (non-fraud use cases)
- Measuring overall distribution distance
- When p-values are not required
- Benchmarking against academic papers using EMD
Best Practice: Monitor both metrics and compare. If both spike β high confidence drift. If only KS spikes β investigate tail changes.
Metrics Tracked:
- ROC-AUC degradation (alert if drops > 5%)
- Precision/Recall changes
- False positive rate increase
- False negative rate increase
- High confidence error rate
Evidently Reports Logged to MLflow:
evidently_reports/data_drift_*.htmlβ Interactive data drift dashboard with per-feature PSI, KS, distribution comparisonsevidently_reports/classification_*.htmlβ Interactive classification dashboard with ROC curve, PR curve, confusion matrix, accuracy, F1drift_reports/drift_summary_*.jsonβ Structured JSON summary
All reports are logged to MLflow experiment: credit-card-fraud-detection-monitoring
| Chart | Purpose | Location | When Generated |
|---|---|---|---|
| Preprocessing | |||
| Class Distribution | Verify imbalance handling | MLflow Training Run | During preprocessing |
| Feature Correlations | Identify multicollinearity | MLflow Training Run | During preprocessing |
| Missing Values | Data quality check | MLflow Training Run | During preprocessing |
| Feature Distributions | Understand data | MLflow Training Run | During preprocessing |
| Training | |||
| Training Loss Curve | Convergence check | MLflow Training Run | During training |
| Feature Importance | Top predictive features | MLflow Training Run | After training |
| ROC Curve | Model discrimination | MLflow Training Run | During evaluation |
| Precision-Recall Curve | Performance at thresholds | MLflow Training Run | During evaluation |
| Confusion Matrix | Classification accuracy | MLflow Training Run | During evaluation |
| Monitoring (Evidently) | |||
| Data Drift HTML Report | Per-feature drift analysis (PSI, KS, distributions) | MLflow evidently_reports/ |
2a_inference_monitoring.ipynb Cell 37 |
| Classification HTML Report | Model performance (ROC, PR, confusion matrix, F1) | MLflow evidently_reports/ |
2a_inference_monitoring.ipynb Cell 37 |
| Drift Summary JSON | Structured drift summary | MLflow drift_reports/ |
2a_inference_monitoring.ipynb Cell 37 |
Accessing in MLflow:
1. Open MLflow UI (SageMaker Studio β Partner AI Apps)
2. Select experiment:
- Training: credit-card-fraud-detection-training
- Monitoring: credit-card-fraud-detection-monitoring
3. Click on run
4. Scroll to "Artifacts" section
5. Evidently HTML reports available under:
- evidently_reports/data_drift_*.html (open in browser for interactive dashboard)
- evidently_reports/classification_*.html (open in browser for interactive dashboard)
- drift_reports/drift_summary_*.json
Drift detection compares current inference feature distributions against a baseline. The baseline should represent the data the model was trained on.
Current approach (2a_inference_monitoring.ipynb):
baseline_df = pd.read_csv(CSV_TRAINING_DATA)This loads the CSV file directly from disk. Since the same data has already been migrated to the Athena training_data table (via main.py setup --migrate-data), the baseline can alternatively be loaded from Athena using PySpark or boto3:
Option 1: PySpark (for large-scale processing):
from athena.athena_client_pyspark import AthenaClientPySpark
client = AthenaClientPySpark(database="fraud_detection")
baseline_spark_df = client.read_table("training_data")
baseline_df = baseline_spark_df.toPandas() # Convert to pandas for drift analysisOption 2: Boto3 + Pandas (for monitoring/small queries):
import boto3
import pandas as pd
import io
import time
def read_athena_query(sql, database, output_location):
"""Execute Athena query and return pandas DataFrame."""
athena = boto3.client('athena')
s3 = boto3.client('s3')
# Start query execution
response = athena.start_query_execution(
QueryString=sql,
QueryExecutionContext={'Database': database},
ResultConfiguration={'OutputLocation': output_location}
)
execution_id = response['QueryExecutionId']
# Wait for completion
while True:
status = athena.get_query_execution(QueryExecutionId=execution_id)
state = status['QueryExecution']['Status']['State']
if state in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
break
time.sleep(1)
if state != 'SUCCEEDED':
raise Exception(f"Query failed: {state}")
# Download results from S3
result_s3_path = status['QueryExecution']['ResultConfiguration']['OutputLocation']
bucket, key = result_s3_path.replace("s3://", "").split("/", 1)
obj = s3.get_object(Bucket=bucket, Key=key)
return pd.read_csv(io.BytesIO(obj['Body'].read()))
# Usage
baseline_df = read_athena_query(
sql="SELECT * FROM training_data LIMIT 10000",
database="fraud_detection",
output_location="s3://your-bucket/athena-results/"
)Why use the Athena training_data table as baseline:
- Single source of truth β the same table the training pipeline reads from, so schema and column types are guaranteed to match
- No local file dependency β works in any SageMaker environment without needing the CSV on disk
- Consistent with pipeline β the preprocessing step reads from
training_datavia PySpark, so monitoring and training reference the same data - Versioned via Iceberg β if the training data is updated (new features, augmented samples), the Athena table reflects that without needing to re-distribute CSV files
Note: For the current PoC, CSV loading is sufficient. In production, the Athena table is the preferred source since it stays in sync with whatever the training pipeline consumes. awswrangler has been deprecated in favor of PySpark (for large-scale processing) and boto3 (for monitoring queries).
NEW: Automated drift detection with EventBridge, Lambda, and SNS email alerts.
The system automatically monitors for data drift and model performance drift, sending email alerts when thresholds are exceeded.
Architecture:
EventBridge Rule β Lambda Function β SNS Topic β Email
(daily at 2am) (drift detection) (alerts) (ops team)
β
Athena Data Lake
(inference_responses)
Option 1: Using Notebook (Recommended)
- Open
notebooks/2a_inference_monitoring.ipynb - Navigate to Section 6: Automated Drift Monitoring Setup
- Run cells 6.1-6.3 to deploy infrastructure
- Confirm email subscription
Option 2: Using CI/CD Script
export ALERT_EMAIL="ops@example.com"
export ATHENA_DATABASE="fraud_detection"
export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
export ATHENA_OUTPUT_S3="s3://fraud-detection-data-lake-skoppar-${AWS_ACCOUNT_ID}/athena-query-results/"
bash src/setup/deploy_drift_monitoring.shOption 3: Interactive Setup
python src/setup/setup_drift_monitoring.py- SNS Topic (
fraud-detection-drift-alerts) - Email notifications - Lambda Function (
fraud-detection-drift-monitor) - Drift detection logic - IAM Role - Lambda permissions (Athena, S3, SNS)
- EventBridge Rule (
fraud-detection-drift-check) - Daily schedule (2 AM UTC)
Data Drift Detection:
- Engine: Evidently DataDriftPreset (KS, PSI, and other statistical tests)
- Scope: All 30 training features
- Output: Interactive HTML report + per-column drift scores
- Method: Compares recent 24h inference data to training baseline
Model Performance Drift:
- Engine: Evidently ClassificationPreset (accuracy, F1, ROC, PR, confusion matrix)
- Threshold: >5% ROC-AUC degradation (configurable)
- Method: Compares current performance to baseline (0.92)
- Requires: Ground truth labels for predictions
MLflow Integration:
- Automatic Logging: All drift metrics and Evidently HTML reports logged to MLflow on each check
- Experiment:
fraud-detection-drift_monitoring - Metrics Logged:
- Per-feature drift scores (
drift_score_<feature_name>) - Aggregate drift statistics (drifted_columns_count, drifted_columns_share)
- Model performance metrics (ROC-AUC, accuracy, precision, recall)
- Evidently classification metrics (
evidently_accuracy,evidently_f1, etc.) - Sample sizes and detection flags
- Per-feature drift scores (
- Artifacts:
evidently_reports/data_drift_*.htmlβ Interactive data drift dashboardevidently_reports/classification_*.htmlβ Interactive classification dashboarddrift_reports/drift_summary_*.jsonβ Structured JSON summary
- Configuration: Set
MLFLOW_TRACKING_URIenvironment variable in Lambda
================================================================================
ML MODEL DRIFT ALERT
================================================================================
Time: 2026-02-24 02:00:15
Detection Engine: Evidently AI
π΄ DATA DRIFT DETECTED (Evidently DataDriftPreset)
Features Analyzed: 30
Drifted Features: 5 (16.7%)
Drifted Columns Share: 16.7%
Top Drifted Features (by drift score):
- transaction_amount: drift_score=0.0012
- customer_age: drift_score=0.0034
- distance_from_home_km: drift_score=0.0089
π΄ MODEL PERFORMANCE DRIFT DETECTED (Evidently ClassificationPreset)
Baseline ROC-AUC: 0.9200
Current ROC-AUC: 0.5713
Degradation: 0.3487 (37.9%)
RECOMMENDED ACTIONS:
1. Review Evidently HTML reports in MLflow monitoring experiment
2. Investigate root cause
3. Consider retraining with recent data
4. Review decision thresholds
================================================================================
Thresholds (customizable):
export DATA_DRIFT_THRESHOLD=0.2 # PSI threshold
export MODEL_DRIFT_THRESHOLD=0.05 # 5% performance degradation
export BASELINE_ROC_AUC=0.92 # Expected model performance
export MLFLOW_TRACKING_URI="arn:aws:sagemaker:us-east-1:ACCOUNT:mlflow-app/app-ID" # For MLflow loggingSchedule Options:
# Daily at 2 AM UTC (default)
cron(0 2 * * ? *)
# Every 6 hours
cron(0 */6 * * ? *)
# Every Monday at 9 AM
cron(0 9 ? * MON *)Manual Trigger:
# Via AWS CLI
aws lambda invoke \
--function-name fraud-detection-drift-monitor \
output.json
# Via notebook (Cell 6.4)
lambda_client.invoke(FunctionName='fraud-detection-drift-monitor')View Logs:
aws logs tail /aws/lambda/fraud-detection-drift-monitor --followsrc/drift_monitoring/lambda_drift_monitor.py- Lambda function codesrc/drift_monitoring/lambda_inference_logger.py- Lambda inference loggersrc/drift_monitoring/generate_drift_dataset.py- Generate test drift datasrc/setup/setup_drift_monitoring.py- Interactive setupsrc/setup/deploy_drift_monitoring.sh- CI/CD deployment scriptnotebooks/2a_inference_monitoring.ipynb- Section 6 (setup cells)
Data Drift (PSI Thresholds):
| PSI Value | Interpretation | Action |
|---|---|---|
| < 0.1 | No significant shift | None |
| 0.1 - 0.2 | Moderate shift | Monitor closely |
| β₯ 0.2 | Significant shift | Alert triggered |
Model Drift (Performance Degradation):
| Degradation | Interpretation | Action |
|---|---|---|
| < 3% | Normal variance | None |
| 3-5% | Minor degradation | Monitor |
| > 5% | Significant degradation | Alert triggered |
Change Thresholds:
# In 2a_inference_monitoring.ipynb, Cell 6.6
lambda_client.update_function_configuration(
FunctionName='fraud-detection-drift-monitor',
Environment={
'Variables': {
'DATA_DRIFT_THRESHOLD': '0.15', # More sensitive
'MODEL_DRIFT_THRESHOLD': '0.03', # 3% degradation
# ... other vars
}
}
)Change Schedule:
aws events put-rule \
--name fraud-detection-drift-check \
--schedule-expression "cron(0 */6 * * ? *)" # Every 6 hoursDisable/Enable:
# In notebook or via AWS CLI
events = boto3.client('events')
# Disable
events.disable_rule(Name='fraud-detection-drift-check')
# Enable
events.enable_rule(Name='fraud-detection-drift-check')No Emails Received:
- Check email confirmation - did you click the SNS subscription link?
- Check spam folder
- Verify SNS subscription status:
aws sns list-subscriptions-by-topic \ --topic-arn arn:aws:sns:us-east-1:{account}:fraud-detection-drift-alerts
Lambda Errors:
# Check CloudWatch logs
aws logs tail /aws/lambda/fraud-detection-drift-monitor --since 1hCommon issues:
- Insufficient IAM permissions
- Athena query timeout (increase Lambda timeout)
- Not enough inference samples (need β₯100 samples)
No Drift Detected:
- Insufficient recent data (Lambda looks at last 24 hours)
- No ground truth for model drift (need labeled predictions)
- Drift below thresholds (check logs for actual PSI values)
To remove all drift monitoring resources:
# Delete EventBridge rule
aws events remove-targets --rule fraud-detection-drift-check --ids 1
aws events delete-rule --name fraud-detection-drift-check
# Delete Lambda function
aws lambda delete-function --function-name fraud-detection-drift-monitor
# Delete IAM role and policies
aws iam detach-role-policy \
--role-name fraud-detection-drift-monitor-role \
--policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
aws iam delete-role-policy \
--role-name fraud-detection-drift-monitor-role \
--policy-name SNSPublishPolicy
aws iam delete-role --role-name fraud-detection-drift-monitor-role
# Delete SNS topic
aws sns delete-topic --topic-arn arn:aws:sns:us-east-1:{account}:fraud-detection-drift-alerts- Set Appropriate Thresholds: Start with defaults (PSI=0.2, 5% degradation), adjust based on your use case
- Monitor Regularly: Review CloudWatch logs weekly to understand drift patterns
- Investigate Quickly: When alerts fire, investigate within 24 hours
- Update Baseline: After retraining, update
BASELINE_ROC_AUCin Lambda environment - Test Monthly: Manually trigger Lambda to ensure monitoring is working
- Keep Email List Current: Update SNS subscriptions when team members change
- Document Investigations: Log drift investigations in MLflow or tracking system
By default, 2a_inference_monitoring.ipynb checks if Lambda functions already exist before deploying them. This prevents unnecessary redeployment during notebook reruns.
Configuration Flag (Cell 26):
# Set to True to force redeployment even if Lambda exists
REDEPLOY_LAMBDAS = False # Default: skip if existsBehavior:
False(default): Checks if Lambda exists, skips deployment if foundTrue: Always redeploys Lambda functions, even if they already exist
Use Cases:
- Set to True when you've modified Lambda code (e.g.,
lambda_drift_monitor.py,lambda_inference_logger.py) - Keep False for routine notebook reruns to avoid unnecessary deployments and IAM role recreation
Affected Lambdas:
fraud-monitoring-results-writer(Cell 28) - Writes drift monitoring results to Athenafraud-detection-drift-monitor(Cell 60) - Performs automated drift detection
The ground truth simulator allows configurable drift scenarios for testing the monitoring system. This is critical for validating that drift detection triggers correctly before deploying to production.
Configuration (Cell 15 in 2a_inference_monitoring.ipynb):
# Ground Truth Simulation Configuration
SIM_ACCURACY = 0.85 # Base model accuracy (0.0-1.0)
# Feature drift (manifests as accuracy degradation)
SIM_FEATURE_DRIFT_MAG = 0.0 # How much features drift (0.0-1.0, semantic)
SIM_FEATURE_DRIFT_COUNT = 0 # Number of features drifting (semantic)
SIM_FEATURE_DRIFT_IMPACT = 0.0 # Accuracy reduction from feature drift (0.0-1.0)
# Model drift (direct accuracy degradation)
SIM_MODEL_DRIFT_MAG = 0.0 # Model accuracy degradation (0.0-1.0)How It Works:
- Base Accuracy: Starting model performance (default 85%)
- Feature Drift: Simulates feature distribution changes that degrade model accuracy
SIM_FEATURE_DRIFT_MAGandSIM_FEATURE_DRIFT_COUNTare semantic parameters (documentation only)SIM_FEATURE_DRIFT_IMPACTdirectly reduces effective accuracy (e.g., 0.10 = 10% reduction)
- Model Drift: Simulates direct model performance degradation
SIM_MODEL_DRIFT_MAGdirectly reduces effective accuracy (e.g., 0.15 = 15% reduction)
- Effective Accuracy:
base_accuracy - feature_drift_impact - model_drift_mag(minimum 50%)
Testing Scenarios:
Scenario 1 - No Drift (Baseline):
SIM_ACCURACY = 0.85
SIM_FEATURE_DRIFT_IMPACT = 0.0
SIM_MODEL_DRIFT_MAG = 0.0
# Effective accuracy: 85%
# Expected: No drift alertsScenario 2 - Feature Drift:
SIM_ACCURACY = 0.85
SIM_FEATURE_DRIFT_MAG = 0.5 # 50% drift magnitude (semantic)
SIM_FEATURE_DRIFT_COUNT = 10 # 10 features drifting (semantic)
SIM_FEATURE_DRIFT_IMPACT = 0.10 # 10% accuracy reduction
SIM_MODEL_DRIFT_MAG = 0.0
# Effective accuracy: 75%
# Expected: Data drift detected, accuracy degradation alertScenario 3 - Model Drift:
SIM_ACCURACY = 0.85
SIM_FEATURE_DRIFT_IMPACT = 0.0
SIM_MODEL_DRIFT_MAG = 0.15 # 15% degradation
# Effective accuracy: 70%
# Expected: Model drift alert, significant accuracy dropScenario 4 - Combined Drift:
SIM_ACCURACY = 0.85
SIM_FEATURE_DRIFT_IMPACT = 0.08 # 8% from features
SIM_MODEL_DRIFT_MAG = 0.07 # 7% from model
# Effective accuracy: 70%
# Expected: Both data drift and model drift detectedProduction Replacement:
Replace GroundTruthSimulator with actual fraud investigation system that writes to ground_truth_updates table:
- Chargeback notifications
- Customer fraud reports
- Investigation team confirmations
- Merchant alerts
All drift generation parameters are centralized in src/config/config.yaml under the drift_generation section. This eliminates hardcoded values and makes it easy to adjust drift amounts for testing.
Configuration Location: src/config/config.yaml (lines 241-383)
Default Drift Dataset (generate_drift_dataset.py):
drift_generation:
default_drift:
transaction_amount:
type: "multiplicative"
factor: 1.4 # 40% increase (simulates inflation)
noise: 0.1 # Β±10% random variation
distance_from_home_km:
type: "multiplicative"
factor: 2.0 # 100% increase (travel/remote transactions)
noise: 0.3 # Β±30% random variation
velocity_score:
type: "multiplicative"
factor: 1.5 # 50% increase (more active users)
noise: 0.2
num_transactions_24h:
type: "additive"
shift: 3 # Add 3 more transactions on average
noise: 1
transaction_timestamp:
type: "additive"
shift: 50000 # Shift time forward
noise: 5000
# Number of samples and reproducibility
num_samples: 5000
random_state: 123Variable Drift Patterns (generate_variable_drift_dataset.py):
The system supports 6 time-varying drift scenarios for realistic testing:
drift_generation:
variable_patterns:
run1: # Baseline - minimal drift
transaction_amount: {factor: 1.05, noise: 0.05}
distance_from_home_km: {factor: 1.1, noise: 0.1}
run2: # Distance spike - travel surge
distance_from_home_km: {factor: 3.5, noise: 0.5} # 3.5x increase!
velocity_score: {factor: 1.3, noise: 0.2}
run3: # Credit limit anomaly - system change
credit_limit: {factor: 8.0, noise: 2.0} # 8x increase! (PSI ~74)
merchant_category_code: {shift: 500, noise: 200}
run4: # High velocity period
velocity_score: {factor: 2.5, noise: 0.4}
num_transactions_24h: {shift: 5, noise: 2}
run5: # Recovery - returning to normal
transaction_amount: {factor: 1.2, noise: 0.15}
run6: # Account age anomaly - new user cohort
account_age_days: {factor: 0.3, noise: 0.1} # Younger accountsTo Adjust Drift Amounts:
-
Edit config.yaml:
drift_generation: variable_patterns: run3: credit_limit: factor: 1.5 # Reduce from 8.0 to 1.5 (20% drift instead of 800%) noise: 0.2 # Reduce from 2.0 to 0.2
-
Regenerate drifted datasets:
python src/drift_monitoring/generate_drift_dataset.py python src/drift_monitoring/generate_variable_drift_dataset.py
-
Test the new drift levels in notebook 2a or via endpoint invocations
Drift Types:
- Multiplicative:
new_value = original_value Γ (factor Β± noise Γ factor)- Example:
factor: 2.0, noise: 0.3β value is multiplied by 1.4-2.6x
- Example:
- Additive:
new_value = original_value + (shift Β± noise)- Example:
shift: 3, noise: 1β value increased by 2-4
- Example:
Time-Based Drift Detection:
The monitoring system now compares data within configurable time windows:
monitoring:
lookback_days: 30 # Default monitoring window
data_drift_lookback_days: 7 # Compare last 7 days for data drift
model_drift_lookback_days: 30 # Compare last 30 days for model performance
min_samples_for_drift: 100 # Minimum samples requiredBefore: Drift compared all historical inference data (LIMIT 5000)
After: Drift compares recent inference data (last N days) vs training baseline
This ensures:
- Fair comparison (recent vs recent data)
- Consistent time periods
- No mixing of old and new inference data
- Configurable via environment variables for Lambda deployments
Lambda Environment Variables:
# Override config.yaml defaults via environment variables
DATA_DRIFT_LOOKBACK_DAYS=14 # 2 weeks instead of 7 days
MODEL_DRIFT_LOOKBACK_DAYS=60 # 2 months instead of 30 daysEach drift monitoring execution creates a unique MLflow run with a UUID for complete traceability.
Implementation:
- Lambda function
fraud-detection-drift-monitorgenerates unique run ID on every execution - Stored in
monitoring_responsesAthena table undermlflow_run_idcolumn - Enables tracking of drift metrics over time in MLflow experiments
Verification (in 4_optional_version_validation.ipynb):
# Cell 12-13: Validate unique run IDs
query = """
SELECT monitoring_run_id, monitoring_timestamp, mlflow_run_id,
data_drift_detected, model_drift_detected
FROM fraud_detection.monitoring_responses
ORDER BY monitoring_timestamp DESC LIMIT 10
"""
unique_runs = athena_df['mlflow_run_id'].nunique()
print(f"β Unique MLflow run IDs: {unique_runs}")Expected Behavior:
- Each drift test generates new UUID (e.g.,
a3f7b2c1-...,d8e9f0a2-...) - No more generic "unknown" or "pipeline" values
- Full audit trail of all drift monitoring executions
MLflow Experiment:
- Experiment:
fraud-detection-drift_monitoring - Each run contains:
- Drift metrics (per-feature drift scores, aggregate statistics)
- Performance metrics (ROC-AUC, accuracy, precision, recall)
- Evidently HTML reports (data drift dashboard, classification dashboard)
- Detection flags (data_drift_detected, model_drift_detected)
The governance dashboard automatically refreshes daily with the latest drift monitoring results.
Architecture:
2:00 AM UTC β Drift Monitoring Lambda (fraud-detection-drift-monitor)
ββ Runs Evidently drift detection
ββ Logs metrics to MLflow
ββ Writes results to monitoring_responses Athena table
β±οΈ Wait 1 hour for data to settle...
3:00 AM UTC β QuickSight Refresh Lambda (quicksight-dashboard-refresh)
ββ Triggers SPICE ingestion for 3 datasets:
β ββ Inference Monitoring Dataset
β ββ Drift Monitoring Dataset
β ββ Feature Drift Analysis Dataset
ββ Dashboard shows updated data by morning
Components Created (Cells 74-75 in 2a_inference_monitoring.ipynb):
-
Lambda Function:
quicksight-dashboard-refresh- Runtime: Python 3.11
- Memory: 128 MB
- Environment Variables:
AWS_ACCOUNT_IDINFERENCE_DATASET_IDDRIFT_DATASET_IDFEATURE_DRIFT_DATASET_ID
-
IAM Role:
quicksight-dashboard-refresh-role- Permissions:
quicksight:CreateIngestion,quicksight:DescribeIngestion
- Permissions:
-
EventBridge Rule:
quicksight-dashboard-daily-refresh- Schedule:
cron(0 3 * * ? *)(3:00 AM UTC daily) - Target:
quicksight-dashboard-refreshLambda
- Schedule:
Manual Refresh (Cell 2 in 3_governance_dashboard.ipynb):
# Quick refresh cell at top of governance dashboard notebook
# Refreshes all 3 QuickSight datasets immediately
# Use after new drift monitoring runs or inference predictionsVerification:
# Check EventBridge schedule
aws events describe-rule --name quicksight-dashboard-daily-refresh
# View Lambda logs
aws logs tail /aws/lambda/quicksight-dashboard-refresh --follow
# Check QuickSight ingestion history
# Go to QuickSight Console β Datasets β Refresh tabBenefits:
- No manual dashboard refresh required
- Always see latest data by morning (after 3 AM UTC refresh)
- Automated after drift monitoring completes
- Cost: <$1/month (Lambda + EventBridge)
Customization:
# Change refresh schedule (in Cell 75)
events.put_rule(
Name='quicksight-dashboard-daily-refresh',
ScheduleExpression='cron(0 6 * * ? *)', # 6 AM UTC instead
State='ENABLED'
)training_data:
- 284,080 rows, 33 columns (30 features + 3 metadata)
- Partitioned by: None (static training data)
- Features: See "Feature Format" section below
inference_responses:
- All predictions from deployed endpoint
- Partitioned by:
year,month,day - Schema: inference_id, request_timestamp, prediction, probability_fraud, ground_truth, latency_ms, confidence_score, transaction_context
- The
ground_truthcolumn starts as NULL and gets populated by the ground truth update process
ground_truth_updates:
- Lightweight table for merging confirmed labels back into
inference_responses - Schema: inference_id, actual_fraud, confirmation_timestamp, confirmation_source, investigation_notes
- Populated by: Ground truth simulator (dev/test) or fraud investigation systems (production)
- Consumed by:
src/drift_monitoring/update_ground_truth.pywhich runs an Athena MERGE to update theground_truthcolumn ininference_responses - JOINs on
inference_id(nottransaction_id)
ground_truth (not used in current PoC):
- Designed for a future batch retraining workflow
- Stores complete feature rows (all 33 columns) alongside confirmed fraud labels
- Unlike
ground_truth_updates(which only patchesinference_responses), this table holds full feature vectors for training a new model version - Would be populated by a batch job that combines
inference_responses+ confirmed labels into training-ready rows - Not needed for the current monitoring workflow β only relevant when building an automated retraining pipeline
drifted_data:
- Samples flagged for drift analysis
- Used for retraining prioritization
inference_responses ground_truth_updates
(predictions, ground_truth=NULL) (inference_id + actual_fraud)
β β
βββββββββ MERGE ON inference_id βββββββ
β
βΌ
inference_responses
(ground_truth populated)
β
βΌ
Monitoring & Drift Detection
(metrics where ground_truth IS NOT NULL)
Key distinction:
ground_truth_updatesβ patches existing predictions (lightweight, used now)ground_truthβ stores full feature rows for retraining (heavyweight, future use)
Check inference logging:
-- Recent predictions
SELECT
inference_id,
request_timestamp,
prediction,
probability_fraud,
ground_truth,
confidence_score
FROM fraud_detection.inference_responses
ORDER BY request_timestamp DESC
LIMIT 10;Ground truth coverage:
-- Coverage by day
SELECT
DATE(request_timestamp) as date,
COUNT(*) as total_predictions,
SUM(CASE WHEN ground_truth IS NOT NULL THEN 1 ELSE 0 END) as with_ground_truth,
CAST(SUM(CASE WHEN ground_truth IS NOT NULL THEN 1 ELSE 0 END) AS DOUBLE) / COUNT(*) * 100 as coverage_pct
FROM fraud_detection.inference_responses
GROUP BY DATE(request_timestamp)
ORDER BY date DESC;Model performance:
-- Confusion matrix
SELECT
prediction,
ground_truth,
COUNT(*) as count
FROM fraud_detection.inference_responses
WHERE ground_truth IS NOT NULL
GROUP BY prediction, ground_truth;Your model expects these exact 30 features in JSON format:
{
"transaction_hour": 14,
"transaction_day_of_week": 2,
"transaction_amount": 149.62,
"transaction_type_code": 1,
"customer_age": 42,
"customer_gender": 0,
"customer_tenure_months": 36,
"account_age_days": 1095,
"distance_from_home_km": 5.2,
"distance_from_last_transaction_km": 2.3,
"time_since_last_transaction_min": 120,
"online_transaction": 1,
"international_transaction": 0,
"high_risk_country": 0,
"merchant_category_code": 5411,
"merchant_reputation_score": 0.85,
"chip_transaction": 1,
"pin_used": 1,
"card_present": 1,
"cvv_match": 1,
"address_verification_match": 1,
"num_transactions_24h": 3,
"num_transactions_7days": 12,
"avg_transaction_amount_30days": 125.50,
"max_transaction_amount_30days": 450.00,
"velocity_score": 0.3,
"recurring_transaction": 0,
"previous_fraud_incidents": 0,
"credit_limit": 5000.0,
"available_credit_ratio": 0.75
}| Category | Count | Features |
|---|---|---|
| Transaction | 4 | hour, day_of_week, amount, type_code |
| Customer Profile | 4 | age, gender, tenure_months, account_age_days |
| Geographic/Temporal | 4 | distance_from_home, distance_from_last, time_since_last, online |
| Risk Indicators | 2 | international, high_risk_country |
| Merchant | 2 | category_code, reputation_score |
| Payment Security | 5 | chip, pin, card_present, cvv_match, address_verification |
| Behavioral | 7 | num_transactions_24h/7d, avg/max_amount_30d, velocity, recurring, previous_fraud |
| Credit | 2 | credit_limit, available_credit_ratio |
# Upload CSV data to S3 (required before migration, files not in git)
python -m src.setup.upload_data_to_s3
# Create all infrastructure (S3, Athena DB, tables)
python main.py setup --migrate-data
# Just create infrastructure (no data migration)
python main.py setup# Create pipeline
python main.py pipeline create --pipeline-name fraud-detection-pipeline
# Update existing pipeline
python main.py pipeline update --pipeline-name fraud-detection-pipeline
# Start execution
python main.py pipeline start --pipeline-name fraud-detection-pipeline [--wait]
# Get status
python main.py pipeline status --pipeline-name fraud-detection-pipeline
# List executions
python main.py pipeline list-executions --pipeline-name fraud-detection-pipeline
# Delete pipeline
python main.py pipeline delete --pipeline-name fraud-detection-pipeline# Simulate ground truth (development)
python -m src.monitoring.simulate_ground_truth_from_athena --accuracy 0.85 [--limit 1000]
# Apply ground truth updates
python -m src.monitoring.update_ground_truth --mode batch
python -m src.monitoring.update_ground_truth --mode streaming --window-hours 24
# Monitor performance
python -m src.monitoring.monitor_model_performance --days 30 [--alert-threshold 0.80]# Create/update SageMaker execution role
python -m src.setup.create_or_update_sagemaker_role
# Create Lambda execution role
python -m src.setup.create_lambda_roleIssue: Preprocessing fails with "Unable to verify/create output bucket"
InvalidRequestException: Unable to verify/create output bucket fraud-detection-data-lake
Solution:
- Check
.envhas correct bucket name with account ID suffix:DATA_S3_BUCKET=fraud-detection-data-lake-YOUR_ACCOUNT
- Verify bucket exists:
aws s3 ls s3://fraud-detection-data-lake-YOUR_ACCOUNT/
- Create bucket if missing:
python main.py setup
Issue: Training fails with "Feature count mismatch"
ModelError: Expected 30 features, got 32
Solution:
- Model expects exactly 30 training features
- Check
TRAINING_FEATURESlist in2a_inference_monitoring.ipynbCell 4 - Do not include
transaction_idortransaction_timestamp(metadata columns)
Issue: Pipeline fails with "FileNotFoundError: 'awswrangler'"
FileNotFoundError: [Errno 2] No such file or directory: 'awswrangler'
Solution:
- Verify
inference_requirements.txtexists:cat src/train_pipeline/inference_requirements.txt
- Should contain:
awswrangler>=3.0.0 boto3>=1.34.0 pandas>=2.0.0 - Restart Jupyter kernel if running in notebook
- Redeploy pipeline
Issue: Can't access MLflow UI
Solution:
- Ensure you're in SageMaker Studio (not local environment)
- Navigate: Left sidebar β Partner AI Apps β MLflow
- Click on your MLflow app name
- Opens authenticated session in new tab
Issue: MLflow tracking URI not working
MlflowException: Unable to connect to tracking server
Solution:
- Check
.envhas ARN format (not HTTPS):# β Correct (for SDK) MLFLOW_TRACKING_URI=arn:aws:sagemaker:us-east-1:123456:mlflow-app/app-ABC123 # β Wrong (for web UI only) MLFLOW_TRACKING_URI=https://abc123.mlflow-tracking.us-east-1.sagemaker.aws/
- Verify IAM role has MLflow access:
python -m src.setup.create_or_update_sagemaker_role
Issue: Training dependencies not installed
ModuleNotFoundError: No module named 'sagemaker_mlflow'
Solution:
- Check training step has dependencies configured:
# In pipeline.py training step dependencies=['pipeline_steps/requirements_train.txt']
- Verify
requirements_train.txtcontains:sagemaker-mlflow>=0.1.0 mlflow>=2.17.0
Issue: No records in Athena after inference
Solution:
- Check endpoint uses custom handler (not built-in XGBoost)
- Verify CloudWatch logs show:
β Athena client initialized for inference logging - Make 50+ predictions to trigger buffer flush:
# Buffer flushes at 50 records # Cell 6 (5 predictions) + Cell 9 (50 predictions) = 55 total # This should automatically trigger the batch flush
- Or wait 5 minutes for time-based flush
Issue: Timestamp precision error in Athena queries
NOT_SUPPORTED: Incorrect timestamp precision for timestamp(6)
Solution:
- Athena Iceberg tables use
TIMESTAMP(3)(milliseconds) - Cast aggregations explicitly:
CAST(MIN(request_timestamp) AS TIMESTAMP(3))
- All timestamp aggregations (MIN, MAX, AVG) must be cast
- See
TIMESTAMP_FIX.mdfor complete fix details
Issue: Ground truth simulation returns 0 records
No predictions found without ground truth
Solution:
- Verify inference_responses table has data:
SELECT COUNT(*) FROM fraud_detection.inference_responses;
- Check predictions don't already have ground truth:
SELECT COUNT(*) FROM fraud_detection.inference_responses WHERE ground_truth IS NULL;
- Run inference tests first (notebook Cells 6-9)
Issue: IAM role lacks permissions
AccessDeniedException: User is not authorized to perform: athena:StartQueryExecution
Solution:
# Update SageMaker role with all required permissions
python -m src.setup.create_or_update_sagemaker_role
# Grants access to:
# - S3 (read/write)
# - Athena (queries, Iceberg tables)
# - Glue (data catalog)
# - Lake Formation (table permissions)
# - MLflow (tracking server)
# - Lambda (for deployment step)Issue: Corrupted Athena query results
SYNTAX_ERROR: line 1:8: Column 'cnt' cannot be resolved
Solution:
- Corrupted metadata in S3
- Clean up corrupted results:
./src/setup/cleanup_corrupted_athena_results.sh
- Or manually delete:
aws s3 rm s3://YOUR_BUCKET/athena-query-results/tables/ --recursive
Issue: Inference latency too high (>500ms)
Expected: 100-200ms P95 latency
Solution:
- Check if cold start (first request ~2-5 seconds)
- Verify endpoint instance type (ml.m5.xlarge recommended)
- Check CloudWatch metrics for throttling
- Consider increasing serverless concurrency
- Background Athena logging should add <50ms
# One-time setup
python -m src.setup.upload_data_to_s3
python main.py setup --migrate-dataResult: S3 bucket, Athena database, Iceberg tables created, CSV data uploaded and migrated
Option A: Jupyter Notebook (Recommended)
# In SageMaker Studio, open:
notebooks/1_training_pipeline.ipynb
# Run sequentially:
# - Cells 1-3: Setup and configuration
# - Cell 4: Create/update pipeline definition
# - Cell 5: Execute pipeline and wait (~25 minutes)Option B: CLI
python main.py pipeline create --pipeline-name fraud-detection-pipeline
python main.py pipeline start --pipeline-name fraud-detection-pipeline --waitResult:
- Model trained with XGBoost
- Evaluated (ROC-AUC, PR-AUC, confusion matrix)
- Registered in MLflow Model Registry
- Deployed to serverless endpoint with custom handler
Open: notebooks/2a_inference_monitoring.ipynb
This notebook handles the complete inference and monitoring workflow:
# In 2a_inference_monitoring.ipynb:
# Cell 1-2: Setup (load environment, initialize clients)
# Cell 4: Load test data and verify features
# Cell 5: Quick single test (optional)
# Cell 6: Single inference tests (5 samples)
# Cell 8: Generate playground samples (for SageMaker Studio testing)
# Cell 9: Bulk inference test (50 samples)Result:
- Predictions returned with fraud probabilities
- Automatically logged to
inference_responsestable (custom handler) - Latency metrics captured
Verify automatic logging:
# Cell 10: Check Athena for logged predictions
# Should show records after 100 predictions or 5 minutesFor Development/Testing:
# In 2a_inference_monitoring.ipynb:
# Cell 20: Run ground truth simulator
# - Simulates fraud investigation outcomes
# - Creates realistic confirmation delays
# - Configurable accuracy (default 85%)
# Or via CLI:
python -m src.monitoring.simulate_ground_truth_from_athena --accuracy 0.85For Production:
Replace simulation with actual fraud investigation system that writes to ground_truth_updates table:
- Chargeback notifications
- Customer fraud reports
- Investigation team confirmations
- Merchant alerts
Result: Ground truth updates in ground_truth_updates table
# In 2a_inference_monitoring.ipynb:
# Cell 19: Apply updates using notebook
# Or via CLI:
python -m src.monitoring.update_ground_truth --mode batchResult: inference_responses.ground_truth column populated via MERGE
# Cell 22: Check ground truth coverage
# Shows: total predictions, with/without ground truth, coverage %Expected: 100% coverage (simulated) or partial coverage (production)
# Cell 27: Initialize performance monitor
# Cell 28: Generate performance report
# Cell 29: Display metrics (ROC-AUC, precision, recall, F1)
# Cell 32: Run Evidently data drift detection (DataDriftPreset)
# Cell 33: Display interactive Evidently data drift report
# Cell 35: Run Evidently model drift detection (ClassificationPreset)
# Cell 36: Display interactive Evidently classification report
# Cell 38: Log Evidently HTML reports and metrics to MLflow
# Cell 40: Check for alerts (performance degradation)Result:
- Performance metrics calculated (where ground truth exists)
- Evidently data drift report generated (PSI, KS, distribution comparisons)
- Evidently classification report generated (ROC, PR, confusion matrix, F1)
- Interactive HTML reports logged to MLflow as artifacts
- Per-feature drift scores and classification metrics logged as MLflow metrics
- Alerts if performance degraded >5%
π Note: Evidently reports in cells 33 and 36 show as interactive HTML when running locally, but don't render on GitHub. See report screenshots or check MLflow artifacts under
evidently_reports/in production.
Alternative: CLI monitoring (no charts):
python -m src.monitoring.monitor_model_performance --days 30# In SageMaker Studio:
# 1. Left sidebar β Partner AI Apps β MLflow
# 2. Select experiments:
# - credit-card-fraud-detection-training (training runs)
# - credit-card-fraud-detection-inference (monitoring runs)
# 3. View charts in Artifacts sectionCharts available:
- Training: ROC curve, confusion matrix, feature importance
- Monitoring: Evidently interactive HTML reports (data drift dashboard, classification dashboard)
# If drift detected or performance degraded:
# Option A: Via 1_training_pipeline.ipynb
# Cell 5: Re-run pipeline execution
# Option B: Via CLI
python main.py pipeline start --pipeline-name fraud-detection-pipeline --waitResult:
- New model version trained with latest data
- Automatically registered in MLflow
- Deployed to same endpoint (replaces old model)
The entire PoC is driven by three notebooks in SageMaker Studio. Each notebook cell maps to a CLI command (see Next Steps for CI/CD equivalents).
| Notebook | Purpose | Key Cells |
|---|---|---|
1_training_pipeline.ipynb |
Training, evaluation, model registration, endpoint deployment | Cells 1-5 |
2a_inference_monitoring.ipynb |
Inference testing, ground truth, drift detection, CloudWatch alarms | Cells 1-40 |
inference_monitoring_with_pipeline.ipynb |
Pipeline-based automated monitoring: creates a SageMaker Pipeline (fraud-detection-monitoring-pipeline) with steps for ground truth simulation, drift computation, MLflow logging, Athena writes, threshold checks, and alarm creation |
Cells 1-end |
3_governance_dashboard.ipynb |
QuickSight governance dashboard: creates Athena data source, dataset, analysis with 6 visuals, and published dashboard β all via API | Cells 1-11 |
4_optional_version_validation.ipynb |
Version traceability: validates MLflow model version matches SageMaker endpoint and Athena logs | Cells 1-20 |
5_optional_cleanup.ipynb |
Resource cleanup: deletes all AWS resources (endpoints, Lambda, Athena tables, S3, CloudWatch) | Cells 1-38 |
Workflow:
- Train & Deploy:
1_training_pipeline.ipynbβ Cells 1-5 (creates pipeline, trains model, deploys endpoint) - Test Inference:
2a_inference_monitoring.ipynbβ Cells 1-9 (single + bulk predictions, auto-logged to Athena) - Simulate Ground Truth:
2a_inference_monitoring.ipynbβ Cells 19-24 (generate and apply ground truth labels) - Monitor & Detect Drift:
2a_inference_monitoring.ipynbβ Cells 26-39 (performance metrics, data drift, model drift, MLflow logging) - CloudWatch Alarms:
2a_inference_monitoring.ipynbβ Cell 40 (publish metrics, create alarms & dashboard) - MLflow Review: SageMaker Studio β Partner AI Apps β MLflow
- Retrain:
1_training_pipeline.ipynbβ Cell 5 (re-execute pipeline)
Alternative: Pipeline-Based Monitoring
inference_monitoring_with_pipeline.ipynb wraps steps 3-5 into a single SageMaker Pipeline (fraud-detection-monitoring-pipeline) with automated steps: SimulateGroundTruth β ComputeDrift β LogToMLflow β WriteToAthena β CheckThresholds β CreateAlarms. Use this for scheduled, hands-off monitoring.
All in SageMaker Studio β no CLI needed for development workflow.
notebooks/inference_monitoring_with_pipeline.ipynb sets up inference monitoring as a fully automated SageMaker Pipeline instead of running individual notebook cells manually.
What it does:
- Creates a 6-step SageMaker Pipeline: SimulateGroundTruth β ComputeDrift β LogToMLflow β WriteToAthena β CheckThresholds β CreateAlarms
- Auto-resolves the MLflow run ID and model version from the MLflow Model Registry (no hardcoded run IDs)
- Queries monitoring results from Athena and visualizes drift trends over time
- Configures EventBridge scheduling for daily automated execution (Section 11) β creates the IAM role, rule, and wires the pipeline as a direct EventBridge target using
SageMakerPipelineParameters
Key sections:
- Setup β auto-resolves model version from MLflow Model Registry (prefers Production stage)
- Pipeline creation and execution with configurable thresholds
- Athena result queries and drift trend visualization
- EventBridge scheduling for daily hands-off monitoring (no Lambda intermediary)
notebooks/3_governance_dashboard.ipynb programmatically creates a complete QuickSight governance dashboard β no manual UI steps required.
What it creates (all via QuickSight Definition API):
- Athena data source pointing to the
fraud_detectiondatabase - Dataset from
inference_responsestable with calculated fields (prediction_accuracy,risk_tier) - Analysis with 6 visuals: prediction volume over time, fraud probability distribution, prediction accuracy breakdown (donut), risk tier distribution, inference latency trend, and total inferences KPI
- Published dashboard with filtering and CSV export enabled
Prerequisites: QuickSight Enterprise subscription, data in inference_responses Athena table, IAM permissions for QuickSight/Athena/S3.
QuickSight Governance Dashboard:
Extending with Feature-Level Drift Analysis:
The default dashboard provides high-level inference and model performance metrics. For detailed feature-level drift analysis, you can extend the dashboard with additional visuals:
- Feature Drift Timeline: Line chart showing how individual features drift over time (compare credit_limit vs. merchant_category vs. account_age)
- Top Drifting Features: Horizontal bar chart ranking features by average drift score
- Drift Severity Distribution: Stacked bar chart showing Low/Moderate/Significant drift counts per feature
- Feature Drift Detail Table: Sortable table with all feature drift metrics
Implementation: The Feature-Level Drift Analysis Guide provides:
- Pre-built Athena view (
feature_drift_detail) that unpacks per-feature drift scores - Copy-paste Python code to add 6 new visuals to your dashboard
- SQL queries and visualization configurations
- Step-by-step implementation checklist
When to use: Add these visuals when you need to:
- Investigate which specific features are drifting (not just overall drift detection)
- Prioritize retraining efforts by identifying the worst offenders
- Distinguish data quality issues from genuine distribution shifts
- Monitor drift patterns across your 29+ input features
Training:
- Use Spot instances for training: significantly lower compute costs
- Training completes in 10-15 min, low spot interruption risk
- Configure in pipeline:
instance_type="ml.m5.xlarge",use_spot_instances=True
Inference:
- Serverless inference: Pay per invocation with automatic scaling to zero when idle
- Cold start: 10-30 seconds (acceptable for fraud detection)
- Alternative: Provision dedicated instances for high-volume scenarios with predictable traffic
Storage:
- Athena Iceberg tables: Pay per query, not storage
- S3 Intelligent-Tiering for model artifacts
- Compress old inference logs to Glacier after 90 days
Estimated Monthly Costs (1000 predictions/day):
- Training: ~$5-10 (spot instances, 2-3 retrainings/month)
- Inference: ~$20-30 (serverless)
- Storage: ~$5-10 (S3 + Athena)
- MLflow: Included in SageMaker AI Notebooks
- Total: ~$30-50/month
Models progress through stages in MLflow Model Registry:
- None - Newly registered, not validated
- Staging - Undergoing validation/testing
- Production - Serving live traffic
- Archived - Retired
# In MLflow UI or via SDK
from mlflow import MlflowClient
client = MlflowClient()
# Promote to staging
client.transition_model_version_stage(
name="xgboost-fraud-detector",
version=3,
stage="Staging"
)
# After validation, promote to production
client.transition_model_version_stage(
name="xgboost-fraud-detector",
version=3,
stage="Production"
)
# Archive old version
client.transition_model_version_stage(
name="xgboost-fraud-detector",
version=2,
stage="Archived"
)Each version includes:
- Metrics: ROC-AUC, PR-AUC, precision, recall, F1
- Parameters: max_depth, learning_rate, scale_pos_weight, etc.
- Tags:
training_date,pipeline_execution_id,deployed_endpoint - Artifacts: model.xgb, feature_metadata.json, preprocessing config
# If new model performs poorly:
# 1. Check recent performance
python -m src.monitoring.monitor_model_performance --days 7
# 2. If degraded, rollback to previous version in MLflow UI:
# - Mark current version as "Archived"
# - Mark previous version as "Production"
# 3. Redeploy with previous model version
# Update pipeline to use previous run_id
python main.py pipeline start --pipeline-name fraud-detection-pipelineEach notebook cell has a direct CLI equivalent. For CI/CD pipelines, replace notebook interactions with these commands:
| Phase | Notebook Cell | CLI Command |
|---|---|---|
| Infrastructure | 1_training_pipeline.ipynb Cell 3 |
python main.py setup --migrate-data |
| SQS + Lambda | 1_training_pipeline.ipynb Cell 3 |
python main.py setup-logging |
| Create Pipeline | 1_training_pipeline.ipynb Cell 4 |
python main.py pipeline create --pipeline-name fraud-detection-pipeline |
| Train & Deploy | 1_training_pipeline.ipynb Cell 5 |
python main.py pipeline start --pipeline-name fraud-detection-pipeline --wait |
| Test Inference | 2a_inference_monitoring.ipynb Cells 6-9 |
python main.py test --endpoint-name fraud-detector-endpoint --num-samples 100 |
| Simulate Ground Truth | 2a_inference_monitoring.ipynb Cell 19 |
python -m src.monitoring.simulate_ground_truth_from_athena --accuracy 0.85 |
| Apply Ground Truth | 2a_inference_monitoring.ipynb Cell 24 |
python -m src.monitoring.update_ground_truth --mode batch |
| Monitor Drift | 2a_inference_monitoring.ipynb Cells 26-39 |
python -m src.monitoring.monitor_model_performance --days 30 |
Automate drift checks with EventBridge + Lambda:
- EventBridge Rule: Schedule
monitor_model_performance.pydaily/hourly via a Lambda function - Lambda wrapper: Package the monitoring script as a Lambda that queries Athena and publishes CloudWatch metrics
- Trigger retraining: If drift exceeds thresholds, start the SageMaker Pipeline automatically
Connect CloudWatch alarms (created in Cell 40) to email or Slack:
# 1. Create SNS topic
aws sns create-topic --name FraudDetection-DriftAlerts
# 2. Subscribe email
aws sns subscribe \
--topic-arn arn:aws:sns:us-east-1:YOUR_ACCOUNT:FraudDetection-DriftAlerts \
--protocol email \
--notification-endpoint your-team@example.com
# 3. Update alarms to notify (repeat for each alarm)
aws cloudwatch put-metric-alarm \
--alarm-name FraudDetection-ModelDrift-ROC-AUC \
--alarm-actions arn:aws:sns:us-east-1:YOUR_ACCOUNT:FraudDetection-DriftAlerts \
# ... (keep existing alarm configuration)For Slack, use an SNS β Lambda β Slack webhook integration or AWS Chatbot.
MIT License - See LICENSE file for details
Documentation Version: 3.1 Last Updated: 2026-03-21 Pipeline Version: 1.0 Model Version: xgboost-fraud-detector v1+
| Date | Summary |
|---|---|
| 2026-03-24 | README Major Revision: Removed duplicate "Complete Architecture" ASCII art section (136 lines) since visual diagrams (architecture_diagram.png, inference_monitoring_diagram.png) provide better visualization. Enhanced Overview section with comprehensive explanation of ML governance importance, production monitoring challenges, and solution benefits. Added "Advanced Configuration" section documenting recent features: (1) REDEPLOY_LAMBDAS flag for Lambda redeployment control, (2) drift simulation configuration with test scenarios (SIM_ACCURACY, SIM_FEATURE_DRIFT_IMPACT, SIM_MODEL_DRIFT_MAG), (3) MLflow run ID tracking with unique UUIDs per drift test, (4) QuickSight dashboard auto-refresh automation (3 AM UTC daily after drift monitoring at 2 AM). Updated all three architecture diagrams to reflect latest pipeline additions. Added EventBridge + Lambda dataset refresh flow to Governance Dashboard (QuickSight) lane. Added config.yaml drift metrics configuration node with file icon. Regenerated all PNG diagrams. |
| 2026-03-21 | Added drift trend analysis visuals to 3_governance_dashboard.ipynb: second QuickSight dataset (monitoring_responses) and Sheet 2 with 6 new visuals (drift share over time, drifted feature counts, ROC-AUC baseline vs current, model performance multi-line, drift alerts timeline, drift KPI). Aligned setup_quicksight_governance.py column names with actual monitoring_responses Iceberg schema. Added Lake Formation grants for all tables (caller IAM role + Lambda role). Fixed SSO assumed-role ARN resolution for Lake Formation. Standardized table name to monitoring_responses across codebase. Added notebook cell to write Evidently drift metrics directly to monitoring_responses Athena table. Inference handler now resolves model_version and mlflow_run_id dynamically from MLflow registry at model load time. Pipeline model step passes MODEL_VERSION and MLFLOW_RUN_ID env vars. Added "Why Not SageMaker DataCaptureConfig?" section to README. Fixed notebook cell ordering, MLflow tracking URI, Lake Formation Principal parameter, screenshot path, and various module/dependency issues. Renamed directory from automated-drift_monitoring-evidently to sagemaker-automated-drift-and-trend-monitoring. |
| 2026-03-18 | Initial documentation v3.0 with Evidently integration, MLflow tracking, and SageMaker pipeline. |
This codebase has been migrated to SageMaker Python SDK v3.7.1, which reorganizes the SDK into separate packages (sagemaker-core, sagemaker-train, sagemaker-mlops, sagemaker-serve). Key changes:
- Training:
XGBoostestimator replaced byModelTrainerfromsagemaker.train.model_trainerwith explicitSourceCode,Compute, andOutputDataConfigobjects - Model registration:
RegisterModelstep collection replaced byModelStep+ModelBuilder.register()fromsagemaker.serve.model_builder - Model creation:
CreateModelStep+XGBoostModelreplaced byModelStep+ModelBuilder.build() - Deployment:
Model.deploy()replaced byModelBuilder.deploy()withServerlessInferenceConfigfromsagemaker.serve.serverless - Session:
sagemaker.Session()replaced bySessionfromsagemaker.core.helper.session_helper - Image URIs:
sagemaker.image_uris.retrieve()replaced byretrievefromsagemaker.core.image_uris - Processing: Imports moved from
sagemaker.processingtosagemaker.core.processing;SKLearnProcessorremoved (useScriptProcessororFrameworkProcessor) - Workflow: Pipeline and step imports moved from
sagemaker.workflow.*tosagemaker.mlops.workflow.*andsagemaker.core.workflow.*
All pipeline behavior, step flow, parameters, and environment variables are preserved β only the SDK API surface changed.

