TL;DR: This project shows how to build a fully serverless pipeline where uploads arrive via a REST endpoint, land in an ingest S3 bucket, get scanned by Amazon GuardDuty Malware Protection for S3, and—if malicious—are automatically quarantined to a separate bucket. We’ll walk through the architecture, the IAM & KMS gotchas, and share production-ready Lambda code you can use right away.
- Shift-left scanning: Catch malware before any consumer system reads the file.
- Serverless: No EC2 / antivirus engines to manage.
- Native: GuardDuty’s managed malware scans + S3 object tagging = simple and auditable.
- Secure by default: SSE-KMS everywhere, least-privilege IAM, and read-denies on unscanned objects.
Client API Gateway (HTTP API) Lambda: get-upload-url S3 (ingest, SSE-KMS)
| | | |
| POST /upload-url |------------------------>| |
|------------------------>| presigned PUT URL | |
| |<-----------------------| |
| PUT object to S3 (URL) | | |
|------------------------> S3 (ingest) ---(KMS)----> object stored, versioned |
| (1)
| GuardDuty S3 Malware Plan |
| (monitors the ingest bucket) |
| | |
| tags object: GuardDutyMalwareScanStatus |
| = NO_THREATS_FOUND | THREATS_FOUND | ... |
| | |
| EventBridge Rule (only THREATS_FOUND) ----------+-----> Lambda: dispatcher
| |
| copy to S3 (quarantine, SSE-KMS)
| |
v v
Consumers Forensics/Review
(get only clean) (restricted access)
Key points
- SSE-KMS is used on both buckets. GuardDuty must be allowed to decrypt the same KMS key used by the ingest bucket.
- GuardDuty sets
GuardDutyMalwareScanStatuson the object (NO_THREATS_FOUND,THREATS_FOUND,ACCESS_DENIED, etc). - An EventBridge rule triggers a dispatcher Lambda only on
THREATS_FOUND, which copies the exact version to a quarantine bucket (and optionally deletes the source). - The ingest bucket policy denies reads unless the tag is
NO_THREATS_FOUND, but it must exempt both the GuardDuty scan role and the dispatcher (including their assumed-role ARNs).
A tiny Lambda that returns a presigned S3 PUT URL scoped to a safe prefix. It takes JSON like:
{ "tenant_id": "dev", "file_name": "my.docx" }Handler (Python 3.12)
# lambda_get_upload_url.py
import json, os, time, uuid, boto3, urllib.parse
S3 = boto3.client("s3")
INGEST_BUCKET = os.environ["INGEST_BUCKET"] # e.g. my-ingest-bucket
UPLOAD_PREFIX = os.environ.get("UPLOAD_PREFIX", "uploads")
PRESIGN_EXPIRES = int(os.environ.get("PRESIGN_EXPIRES", "600")) # seconds
def _ok(body):
return {"statusCode": 200, "headers": {"content-type":"application/json"}, "body": json.dumps(body)}
def _err(code, msg):
return {"statusCode": code, "headers": {"content-type":"application/json"}, "body": json.dumps({"error": msg})}
def handler(event, context):
try:
body = {}
if "body" in event and event["body"]:
body = json.loads(event["body"]) if isinstance(event["body"], str) else event["body"]
tenant = (body.get("tenant_id") or "public").strip().replace("/", "_")
file_name = (body.get("file_name") or "file.bin").replace("/", "_")
# namespacing per-tenant, include a request id for uniqueness
key = f"{UPLOAD_PREFIX}/{tenant}/{uuid.uuid4()}_{file_name}"
# presign a PUT with minimal headers (let client set Content-Type)
url = S3.generate_presigned_url(
"put_object",
Params={
"Bucket": INGEST_BUCKET,
"Key": key
},
ExpiresIn=PRESIGN_EXPIRES
)
return _ok({"upload_url": url, "bucket": INGEST_BUCKET, "key": key})
except Exception as e:
return _err(500, str(e))Environment variables
INGEST_BUCKET– your ingest S3 bucket nameUPLOAD_PREFIX– optional, defaultuploadsPRESIGN_EXPIRES– optional, default600
API Gateway: Create an HTTP API with
POST /upload-urlintegration to this Lambda.
- Enable GuardDuty.
- Create a Malware Protection Plan targeting your ingest bucket.
- Give the plan a role trusted by
"Service": "malware-protection-plan.guardduty.amazonaws.com"that can:s3:GetObject*on the ingest bucket,s3:PutObject*Taggingon the ingest bucket,kms:Decrypt,kms:GenerateDataKeyon the ingest bucket’s CMK (via service:s3.<region>.amazonaws.com).
Role trust policy (plan role)
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": { "Service": "malware-protection-plan.guardduty.amazonaws.com" },
"Action": "sts:AssumeRole"
}]
}KMS key policy must include the plan role (and your dispatcher role). Example statements:
{
"Sid": "AllowGDS3ScanUseOfKey",
"Effect": "Allow",
"Principal": { "AWS": "arn:aws:iam::<ACCOUNT_ID>:role/<PlanRoleName>" },
"Action": ["kms:Decrypt","kms:GenerateDataKey","kms:DescribeKey"],
"Resource": "*",
"Condition": { "StringLike": { "kms:ViaService": "s3.<region>.amazonaws.com" } }
},
{
"Sid": "AllowDispatcherUseOfKey",
"Effect": "Allow",
"Principal": { "AWS": "arn:aws:iam::<ACCOUNT_ID>:role/<DispatcherRoleName>" },
"Action": ["kms:Encrypt","kms:Decrypt","kms:GenerateDataKey","kms:DescribeKey"],
"Resource": "*",
"Condition": { "StringLike": { "kms:ViaService": "s3.<region>.amazonaws.com" } }
}Tip: KMS permissions are the intersection of IAM policy and key policy. If either side is missing, GuardDuty will tag objects as
ACCESS_DENIED.
Trigger only on threats:
Event pattern (JSON)
{
"source": ["aws.guardduty"],
"detail-type": ["GuardDuty Malware Protection Scan Outcome"],
"detail": { "scanResultDetails": { "scanResultStatus": ["THREATS_FOUND"] } }
}Handler (Python 3.12)
This code:
- Parses GuardDuty’s event safely,
- Copies the exact offending version to the quarantine bucket (SSE-KMS),
- Optionally deletes the source,
- No-ops on other outcomes.
# lambda_dispatcher.py
import json, os, boto3, botocore
S3 = boto3.client("s3")
QUAR_BUCKET = os.environ.get("QUARANTINE_BUCKET") # e.g., my-quarantine-bucket
INGEST_BUCKET = os.environ.get("INGEST_BUCKET") # e.g., my-ingest-bucket
KMS_KEY_ARN = os.environ.get("KMS_KEY_ARN") # CMK for quarantine bucket (optional if bucket default)
DELETE_SOURCE_ON_THREAT = os.environ.get("DELETE_SOURCE_ON_THREAT", "false").lower() == "true"
def _extract(event):
d = event.get("detail", {}) or {}
result = (d.get("scanResultDetails") or {}).get("scanResultStatus")
o = d.get("s3ObjectDetails") or {}
bucket = o.get("bucketName")
key = o.get("objectKey")
version = o.get("versionId")
if (not bucket or not key) and "objectArn" in o:
try:
# arn:aws:s3:::bucket/key
_, _, _, _, bkt_and_key = o["objectArn"].split(":", 4)
bkt, k = bkt_and_key.split("/", 1)
bucket = bucket or bkt
key = key or k
except Exception:
pass
return bucket, key, version, result
def _latest_version(bucket, key):
try:
resp = S3.head_object(Bucket=bucket, Key=key)
return resp.get("VersionId")
except botocore.exceptions.ClientError:
return None
def copy_to_quarantine(bucket, key, version=None):
if not QUAR_BUCKET:
raise RuntimeError("QUARANTINE_BUCKET env var is not set")
dest_key = f"quarantine/{key}"
copy_source = {"Bucket": bucket, "Key": key}
if version:
copy_source["VersionId"] = version
args = {
"Bucket": QUAR_BUCKET,
"Key": dest_key,
"CopySource": copy_source,
"MetadataDirective": "COPY",
"TaggingDirective": "REPLACE", # don't carry ingest tags into quarantine
}
if KMS_KEY_ARN:
args.update({
"ServerSideEncryption": "aws:kms",
"SSEKMSKeyId": KMS_KEY_ARN,
"BucketKeyEnabled": True,
})
S3.copy_object(**args)
if DELETE_SOURCE_ON_THREAT:
del_args = {"Bucket": bucket, "Key": key}
if version:
del_args["VersionId"] = version
S3.delete_object(**del_args)
def handler(event, context):
bucket, key, version, result = _extract(event)
print(f"Scan outcome: result={result} bucket={bucket} key={key} version={version}")
if result != "THREATS_FOUND":
return {"statusCode": 200} # nothing to do
if not bucket or not key:
print("Missing bucket/key in event; nothing copied.")
return {"statusCode": 200}
if not version:
version = _latest_version(bucket, key)
print(f"Resolved latest version: {version}")
try:
copy_to_quarantine(bucket, key, version)
print(f"Quarantined s3://{bucket}/{key} (v={version}) -> s3://{QUAR_BUCKET}/quarantine/{key}")
return {"statusCode": 200}
except botocore.exceptions.ClientError as e:
print(f"[ERROR] Copy failed: {e}")
# Avoid endless retries; log and return 200
return {"statusCode": 200, "error": str(e)}Environment variables
QUARANTINE_BUCKET– your quarantine S3 bucketINGEST_BUCKET– ingest bucket (handy for logging)KMS_KEY_ARN– CMK used by quarantine (optional if bucket default)DELETE_SOURCE_ON_THREAT–"true"/"false"(defaultfalse)
Dispatcher role permissions
- On ingest bucket:
s3:ListBucket,s3:ListBucketVersions,s3:GetObject*,s3:GetObjectVersion* - On quarantine bucket:
s3:PutObject* - KMS on both keys (if different):
kms:Encrypt/Decrypt/GenerateDataKey/DescribeKeywithkms:ViaService = "s3.<region>.amazonaws.com"
…and make sure those roles are also present in the KMS key policy (see earlier).
Do not use NotPrincipal with role ARNs; GuardDuty uses assumed-role principals. Instead, deny reads unless the tag is NO_THREATS_FOUND, and exempt both the IAM role ARNs and their assumed-role ARNs via aws:PrincipalArn.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DenyReadUntilClean",
"Effect": "Deny",
"Principal": "*",
"Action": ["s3:GetObject","s3:GetObjectVersion"],
"Resource": "arn:aws:s3:::<INGEST_BUCKET>/*",
"Condition": {
"StringNotEquals": {
"s3:ExistingObjectTag/GuardDutyMalwareScanStatus": "NO_THREATS_FOUND"
},
"ArnNotEquals": {
"aws:PrincipalArn": [
"arn:aws:iam::<ACCOUNT_ID>:role/<DispatcherRoleName>",
"arn:aws:iam::<ACCOUNT_ID>:role/<PlanRoleName>",
"arn:aws:sts::<ACCOUNT_ID>:assumed-role/<DispatcherRoleName>/*",
"arn:aws:sts::<ACCOUNT_ID>:assumed-role/<PlanRoleName>/*",
"arn:aws:iam::<ACCOUNT_ID>:root"
]
}
}
},
{
"Sid": "DenyReadWhenNoTag",
"Effect": "Deny",
"Principal": "*",
"Action": ["s3:GetObject","s3:GetObjectVersion"],
"Resource": "arn:aws:s3:::<INGEST_BUCKET>/*",
"Condition": {
"Null": { "s3:ExistingObjectTag/GuardDutyMalwareScanStatus": "true" },
"ArnNotEquals": {
"aws:PrincipalArn": [
"arn:aws:iam::<ACCOUNT_ID>:role/<DispatcherRoleName>",
"arn:aws:iam::<ACCOUNT_ID>:role/<PlanRoleName>",
"arn:aws:sts::<ACCOUNT_ID>:assumed-role/<DispatcherRoleName>/*",
"arn:aws:sts::<ACCOUNT_ID>:assumed-role/<PlanRoleName>/*",
"arn:aws:iam::<ACCOUNT_ID>:root"
]
}
}
}
]
}# 1) Get presigned URL
BASE="https://<api-id>.execute-api.<region>.amazonaws.com"
RESP=$(curl -sS -f -X POST "$BASE/upload-url" -H 'content-type: application/json' -d '{"tenant_id":"dev","file_name":"hello.txt"}')
URL=$(echo "$RESP" | python3 -c 'import sys,json;print(json.load(sys.stdin)["upload_url"])')
BUCKET=$(echo "$RESP"| python3 -c 'import sys,json;print(json.load(sys.stdin)["bucket"])')
KEY=$(echo "$RESP" | python3 -c 'import sys,json;print(json.load(sys.stdin)["key"])')
# 2) Upload
printf '%s' 'hello world' > /tmp/hello.txt
curl -sS --fail-with-body --http1.1 -T /tmp/hello.txt "$URL"
# 3) Poll the tag
for i in $(seq 1 18); do
aws s3api get-object-tagging --bucket "$BUCKET" --key "$KEY" | python3 -c 'import sys,json;t=json.load(sys.stdin).get("TagSet",[]);print({d["Key"]:d["Value"] for d in t}.get("GuardDutyMalwareScanStatus","<no-tag-yet>"))'
sleep 5
done
# Expect: NO_THREATS_FOUNDprintf '%s' 'X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*' > /tmp/eicar.com
RESP=$(curl -sS -f -X POST "$BASE/upload-url" -H 'content-type: application/json' -d '{"tenant_id":"dev","file_name":"eicar.com"}')
URL=$(echo "$RESP" | python3 -c 'import sys,json;print(json.load(sys.stdin)["upload_url"])')
BUCKET=$(echo "$RESP"| python3 -c 'import sys,json;print(json.load(sys.stdin)["bucket"])')
KEY=$(echo "$RESP" | python3 -c 'import sys,json;print(json.load(sys.stdin)["key"])')
curl -sS --fail-with-body --http1.1 -T /tmp/eicar.com "$URL"
for i in $(seq 1 18); do
aws s3api get-object-tagging --bucket "$BUCKET" --key "$KEY" | python3 -c 'import sys,json;t=json.load(sys.stdin).get("TagSet",[]);print({d["Key"]:d["Value"] for d in t}.get("GuardDutyMalwareScanStatus","<no-tag-yet>"))'
sleep 5
done
# Expect: THREATS_FOUND
# Quarantine check
aws s3api head-object --bucket <QUARANTINE_BUCKET> --key "quarantine/$KEY" || echo "not quarantined"If you ever see
ACCESS_DENIED, it almost always means the plan role isn’t permitted by the KMS key policy of the ingest bucket’s CMK, or your bucket policy Deny is catching the assumed-role principal.
-
ACCESS_DENIEDstatus on the tag
Add the plan role to the KMS key policy used by the ingest bucket (and keep IAM policy aligned). Confirm the plan actually uses that role. -
Bucket policy Deny still blocks GuardDuty
Don’t useNotPrincipalwith IAM role ARNs; instead useArnNotEqualsonaws:PrincipalArnand include both IAM role and assumed-role ARNs. -
Dispatcher fails on clean events
Trigger it only onTHREATS_FOUND, or make the handler no-op whenscanResultStatus != THREATS_FOUND(code above). -
Versioning
Enable S3 Versioning on both buckets. Always copy a specific version to quarantine. -
Cross-account
Add the other account’s role ARNs to the KMS key policy and adjust bucket policy principals accordingly.
- Where to hook: When a customer uploads an attachment in Connect Chat, route the object into the same ingest bucket/prefix (or a dedicated Connect prefix).
- Scan & quarantine: GuardDuty plan covers that prefix; the same dispatcher moves threats to quarantine.
- Agent UX: Use a Lambda Function in your Contact Flow to fetch an artifact status (read the object tag) and decide whether to show or block the download.
The core pattern stays identical.
This pattern gives you:
- Safe upload via presigned URLs,
- Automated malware scanning using a fully managed service,
- Strict read controls enforced by S3 (tag-gated access),
- A reliable quarantine path for forensics.
It’s cloud-native, low-ops, and ready for production once you layer in alerting (EventBridge → SNS/Slack), retention policies, and access controls on the quarantine bucket.