Skip to content

build a fully serverless pipeline where uploads arrive via a REST endpoint, land in an ingest S3 bucket, get scanned by Amazon GuardDuty Malware Protection for S3, and — if malicious — are automatically quarantined to a separate bucket

Notifications You must be signed in to change notification settings

gvoden/serverless-malware-scanning-guardduty

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 

Repository files navigation

Serverless Malware Scanning on S3 with Amazon GuardDuty (and a Quarantine Flow)

TL;DR: This project shows how to build a fully serverless pipeline where uploads arrive via a REST endpoint, land in an ingest S3 bucket, get scanned by Amazon GuardDuty Malware Protection for S3, and—if malicious—are automatically quarantined to a separate bucket. We’ll walk through the architecture, the IAM & KMS gotchas, and share production-ready Lambda code you can use right away.


Why this pattern?

  • Shift-left scanning: Catch malware before any consumer system reads the file.
  • Serverless: No EC2 / antivirus engines to manage.
  • Native: GuardDuty’s managed malware scans + S3 object tagging = simple and auditable.
  • Secure by default: SSE-KMS everywhere, least-privilege IAM, and read-denies on unscanned objects.

Architecture Overview

Client            API Gateway (HTTP API)     Lambda: get-upload-url        S3 (ingest, SSE-KMS)
  |                         |                         |                                |
  | POST /upload-url        |------------------------>|                                |
  |------------------------>|  presigned PUT URL     |                                 |
  |                         |<-----------------------|                                 |
  | PUT object to S3 (URL)  |                         |                                |
  |------------------------> S3 (ingest)  ---(KMS)----> object stored, versioned       |
  |                                                                               (1)
  |                                                     GuardDuty S3 Malware Plan      |
  |                                                     (monitors the ingest bucket)   |
  |                                                              |                     |
  |                                        tags object: GuardDutyMalwareScanStatus     |
  |                                        = NO_THREATS_FOUND | THREATS_FOUND | ...    |
  |                                                              |                     |
  |              EventBridge Rule (only THREATS_FOUND) ----------+-----> Lambda: dispatcher
  |                                                                         |
  |                                                          copy to S3 (quarantine, SSE-KMS)
  |                                                                         |
  v                                                                         v
Consumers                                                               Forensics/Review
(get only clean)                                                        (restricted access)

Key points

  1. SSE-KMS is used on both buckets. GuardDuty must be allowed to decrypt the same KMS key used by the ingest bucket.
  2. GuardDuty sets GuardDutyMalwareScanStatus on the object (NO_THREATS_FOUND, THREATS_FOUND, ACCESS_DENIED, etc).
  3. An EventBridge rule triggers a dispatcher Lambda only on THREATS_FOUND, which copies the exact version to a quarantine bucket (and optionally deletes the source).
  4. The ingest bucket policy denies reads unless the tag is NO_THREATS_FOUND, but it must exempt both the GuardDuty scan role and the dispatcher (including their assumed-role ARNs).

Components

1) API Gateway + get-upload-url Lambda

A tiny Lambda that returns a presigned S3 PUT URL scoped to a safe prefix. It takes JSON like:

{ "tenant_id": "dev", "file_name": "my.docx" }

Handler (Python 3.12)

# lambda_get_upload_url.py
import json, os, time, uuid, boto3, urllib.parse

S3 = boto3.client("s3")

INGEST_BUCKET   = os.environ["INGEST_BUCKET"]       # e.g. my-ingest-bucket
UPLOAD_PREFIX   = os.environ.get("UPLOAD_PREFIX", "uploads")
PRESIGN_EXPIRES = int(os.environ.get("PRESIGN_EXPIRES", "600"))  # seconds

def _ok(body):
    return {"statusCode": 200, "headers": {"content-type":"application/json"}, "body": json.dumps(body)}

def _err(code, msg):
    return {"statusCode": code, "headers": {"content-type":"application/json"}, "body": json.dumps({"error": msg})}

def handler(event, context):
    try:
        body = {}
        if "body" in event and event["body"]:
            body = json.loads(event["body"]) if isinstance(event["body"], str) else event["body"]
        tenant = (body.get("tenant_id") or "public").strip().replace("/", "_")
        file_name = (body.get("file_name") or "file.bin").replace("/", "_")

        # namespacing per-tenant, include a request id for uniqueness
        key = f"{UPLOAD_PREFIX}/{tenant}/{uuid.uuid4()}_{file_name}"

        # presign a PUT with minimal headers (let client set Content-Type)
        url = S3.generate_presigned_url(
            "put_object",
            Params={
                "Bucket": INGEST_BUCKET,
                "Key": key
            },
            ExpiresIn=PRESIGN_EXPIRES
        )

        return _ok({"upload_url": url, "bucket": INGEST_BUCKET, "key": key})
    except Exception as e:
        return _err(500, str(e))

Environment variables

  • INGEST_BUCKET – your ingest S3 bucket name
  • UPLOAD_PREFIX – optional, default uploads
  • PRESIGN_EXPIRES – optional, default 600

API Gateway: Create an HTTP API with POST /upload-url integration to this Lambda.


2) GuardDuty Malware Protection Plan (for S3)

  • Enable GuardDuty.
  • Create a Malware Protection Plan targeting your ingest bucket.
  • Give the plan a role trusted by "Service": "malware-protection-plan.guardduty.amazonaws.com" that can:
    • s3:GetObject* on the ingest bucket,
    • s3:PutObject*Tagging on the ingest bucket,
    • kms:Decrypt, kms:GenerateDataKey on the ingest bucket’s CMK (via service: s3.<region>.amazonaws.com).

Role trust policy (plan role)

{
  "Version": "2012-10-17",
  "Statement": [{
    "Effect": "Allow",
    "Principal": { "Service": "malware-protection-plan.guardduty.amazonaws.com" },
    "Action": "sts:AssumeRole"
  }]
}

KMS key policy must include the plan role (and your dispatcher role). Example statements:

{
  "Sid": "AllowGDS3ScanUseOfKey",
  "Effect": "Allow",
  "Principal": { "AWS": "arn:aws:iam::<ACCOUNT_ID>:role/<PlanRoleName>" },
  "Action": ["kms:Decrypt","kms:GenerateDataKey","kms:DescribeKey"],
  "Resource": "*",
  "Condition": { "StringLike": { "kms:ViaService": "s3.<region>.amazonaws.com" } }
},
{
  "Sid": "AllowDispatcherUseOfKey",
  "Effect": "Allow",
  "Principal": { "AWS": "arn:aws:iam::<ACCOUNT_ID>:role/<DispatcherRoleName>" },
  "Action": ["kms:Encrypt","kms:Decrypt","kms:GenerateDataKey","kms:DescribeKey"],
  "Resource": "*",
  "Condition": { "StringLike": { "kms:ViaService": "s3.<region>.amazonaws.com" } }
}

Tip: KMS permissions are the intersection of IAM policy and key policy. If either side is missing, GuardDuty will tag objects as ACCESS_DENIED.


3) EventBridge Rule → dispatcher Lambda

Trigger only on threats:

Event pattern (JSON)

{
  "source": ["aws.guardduty"],
  "detail-type": ["GuardDuty Malware Protection Scan Outcome"],
  "detail": { "scanResultDetails": { "scanResultStatus": ["THREATS_FOUND"] } }
}

Handler (Python 3.12)

This code:

  • Parses GuardDuty’s event safely,
  • Copies the exact offending version to the quarantine bucket (SSE-KMS),
  • Optionally deletes the source,
  • No-ops on other outcomes.
# lambda_dispatcher.py
import json, os, boto3, botocore

S3 = boto3.client("s3")
QUAR_BUCKET   = os.environ.get("QUARANTINE_BUCKET")   # e.g., my-quarantine-bucket
INGEST_BUCKET = os.environ.get("INGEST_BUCKET")       # e.g., my-ingest-bucket
KMS_KEY_ARN   = os.environ.get("KMS_KEY_ARN")         # CMK for quarantine bucket (optional if bucket default)
DELETE_SOURCE_ON_THREAT = os.environ.get("DELETE_SOURCE_ON_THREAT", "false").lower() == "true"

def _extract(event):
    d = event.get("detail", {}) or {}
    result = (d.get("scanResultDetails") or {}).get("scanResultStatus")
    o = d.get("s3ObjectDetails") or {}
    bucket = o.get("bucketName")
    key = o.get("objectKey")
    version = o.get("versionId")
    if (not bucket or not key) and "objectArn" in o:
        try:
            # arn:aws:s3:::bucket/key
            _, _, _, _, bkt_and_key = o["objectArn"].split(":", 4)
            bkt, k = bkt_and_key.split("/", 1)
            bucket = bucket or bkt
            key = key or k
        except Exception:
            pass
    return bucket, key, version, result

def _latest_version(bucket, key):
    try:
        resp = S3.head_object(Bucket=bucket, Key=key)
        return resp.get("VersionId")
    except botocore.exceptions.ClientError:
        return None

def copy_to_quarantine(bucket, key, version=None):
    if not QUAR_BUCKET:
        raise RuntimeError("QUARANTINE_BUCKET env var is not set")

    dest_key = f"quarantine/{key}"
    copy_source = {"Bucket": bucket, "Key": key}
    if version:
        copy_source["VersionId"] = version

    args = {
        "Bucket": QUAR_BUCKET,
        "Key": dest_key,
        "CopySource": copy_source,
        "MetadataDirective": "COPY",
        "TaggingDirective": "REPLACE",  # don't carry ingest tags into quarantine
    }
    if KMS_KEY_ARN:
        args.update({
            "ServerSideEncryption": "aws:kms",
            "SSEKMSKeyId": KMS_KEY_ARN,
            "BucketKeyEnabled": True,
        })

    S3.copy_object(**args)

    if DELETE_SOURCE_ON_THREAT:
        del_args = {"Bucket": bucket, "Key": key}
        if version:
            del_args["VersionId"] = version
        S3.delete_object(**del_args)

def handler(event, context):
    bucket, key, version, result = _extract(event)
    print(f"Scan outcome: result={result} bucket={bucket} key={key} version={version}")

    if result != "THREATS_FOUND":
        return {"statusCode": 200}  # nothing to do

    if not bucket or not key:
        print("Missing bucket/key in event; nothing copied.")
        return {"statusCode": 200}

    if not version:
        version = _latest_version(bucket, key)
        print(f"Resolved latest version: {version}")

    try:
        copy_to_quarantine(bucket, key, version)
        print(f"Quarantined s3://{bucket}/{key} (v={version}) -> s3://{QUAR_BUCKET}/quarantine/{key}")
        return {"statusCode": 200}
    except botocore.exceptions.ClientError as e:
        print(f"[ERROR] Copy failed: {e}")
        # Avoid endless retries; log and return 200
        return {"statusCode": 200, "error": str(e)}

Environment variables

  • QUARANTINE_BUCKET – your quarantine S3 bucket
  • INGEST_BUCKET – ingest bucket (handy for logging)
  • KMS_KEY_ARN – CMK used by quarantine (optional if bucket default)
  • DELETE_SOURCE_ON_THREAT"true" / "false" (default false)

Dispatcher role permissions

  • On ingest bucket: s3:ListBucket, s3:ListBucketVersions, s3:GetObject*, s3:GetObjectVersion*
  • On quarantine bucket: s3:PutObject*
  • KMS on both keys (if different): kms:Encrypt/Decrypt/GenerateDataKey/DescribeKey with kms:ViaService = "s3.<region>.amazonaws.com"

…and make sure those roles are also present in the KMS key policy (see earlier).


4) Ingest bucket policy (deny until clean, with exemptions)

Do not use NotPrincipal with role ARNs; GuardDuty uses assumed-role principals. Instead, deny reads unless the tag is NO_THREATS_FOUND, and exempt both the IAM role ARNs and their assumed-role ARNs via aws:PrincipalArn.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DenyReadUntilClean",
      "Effect": "Deny",
      "Principal": "*",
      "Action": ["s3:GetObject","s3:GetObjectVersion"],
      "Resource": "arn:aws:s3:::<INGEST_BUCKET>/*",
      "Condition": {
        "StringNotEquals": {
          "s3:ExistingObjectTag/GuardDutyMalwareScanStatus": "NO_THREATS_FOUND"
        },
        "ArnNotEquals": {
          "aws:PrincipalArn": [
            "arn:aws:iam::<ACCOUNT_ID>:role/<DispatcherRoleName>",
            "arn:aws:iam::<ACCOUNT_ID>:role/<PlanRoleName>",
            "arn:aws:sts::<ACCOUNT_ID>:assumed-role/<DispatcherRoleName>/*",
            "arn:aws:sts::<ACCOUNT_ID>:assumed-role/<PlanRoleName>/*",
            "arn:aws:iam::<ACCOUNT_ID>:root"
          ]
        }
      }
    },
    {
      "Sid": "DenyReadWhenNoTag",
      "Effect": "Deny",
      "Principal": "*",
      "Action": ["s3:GetObject","s3:GetObjectVersion"],
      "Resource": "arn:aws:s3:::<INGEST_BUCKET>/*",
      "Condition": {
        "Null": { "s3:ExistingObjectTag/GuardDutyMalwareScanStatus": "true" },
        "ArnNotEquals": {
          "aws:PrincipalArn": [
            "arn:aws:iam::<ACCOUNT_ID>:role/<DispatcherRoleName>",
            "arn:aws:iam::<ACCOUNT_ID>:role/<PlanRoleName>",
            "arn:aws:sts::<ACCOUNT_ID>:assumed-role/<DispatcherRoleName>/*",
            "arn:aws:sts::<ACCOUNT_ID>:assumed-role/<PlanRoleName>/*",
            "arn:aws:iam::<ACCOUNT_ID>:root"
          ]
        }
      }
    }
  ]
}

Testing

Clean file

# 1) Get presigned URL
BASE="https://<api-id>.execute-api.<region>.amazonaws.com"
RESP=$(curl -sS -f -X POST "$BASE/upload-url" -H 'content-type: application/json'   -d '{"tenant_id":"dev","file_name":"hello.txt"}')
URL=$(echo "$RESP" | python3 -c 'import sys,json;print(json.load(sys.stdin)["upload_url"])')
BUCKET=$(echo "$RESP"| python3 -c 'import sys,json;print(json.load(sys.stdin)["bucket"])')
KEY=$(echo "$RESP"   | python3 -c 'import sys,json;print(json.load(sys.stdin)["key"])')

# 2) Upload
printf '%s' 'hello world' > /tmp/hello.txt
curl -sS --fail-with-body --http1.1 -T /tmp/hello.txt "$URL"

# 3) Poll the tag
for i in $(seq 1 18); do
  aws s3api get-object-tagging --bucket "$BUCKET" --key "$KEY"     | python3 -c 'import sys,json;t=json.load(sys.stdin).get("TagSet",[]);print({d["Key"]:d["Value"] for d in t}.get("GuardDutyMalwareScanStatus","<no-tag-yet>"))'
  sleep 5
done
# Expect: NO_THREATS_FOUND

EICAR test string (benign test file recognized by scanners)

printf '%s' 'X5O!P%@AP[4\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*' > /tmp/eicar.com
RESP=$(curl -sS -f -X POST "$BASE/upload-url" -H 'content-type: application/json'   -d '{"tenant_id":"dev","file_name":"eicar.com"}')
URL=$(echo "$RESP" | python3 -c 'import sys,json;print(json.load(sys.stdin)["upload_url"])')
BUCKET=$(echo "$RESP"| python3 -c 'import sys,json;print(json.load(sys.stdin)["bucket"])')
KEY=$(echo "$RESP"   | python3 -c 'import sys,json;print(json.load(sys.stdin)["key"])')
curl -sS --fail-with-body --http1.1 -T /tmp/eicar.com "$URL"

for i in $(seq 1 18); do
  aws s3api get-object-tagging --bucket "$BUCKET" --key "$KEY"     | python3 -c 'import sys,json;t=json.load(sys.stdin).get("TagSet",[]);print({d["Key"]:d["Value"] for d in t}.get("GuardDutyMalwareScanStatus","<no-tag-yet>"))'
  sleep 5
done
# Expect: THREATS_FOUND

# Quarantine check
aws s3api head-object --bucket <QUARANTINE_BUCKET> --key "quarantine/$KEY" || echo "not quarantined"

If you ever see ACCESS_DENIED, it almost always means the plan role isn’t permitted by the KMS key policy of the ingest bucket’s CMK, or your bucket policy Deny is catching the assumed-role principal.


Common gotchas (and fixes)

  • ACCESS_DENIED status on the tag
    Add the plan role to the KMS key policy used by the ingest bucket (and keep IAM policy aligned). Confirm the plan actually uses that role.

  • Bucket policy Deny still blocks GuardDuty
    Don’t use NotPrincipal with IAM role ARNs; instead use ArnNotEquals on aws:PrincipalArn and include both IAM role and assumed-role ARNs.

  • Dispatcher fails on clean events
    Trigger it only on THREATS_FOUND, or make the handler no-op when scanResultStatus != THREATS_FOUND (code above).

  • Versioning
    Enable S3 Versioning on both buckets. Always copy a specific version to quarantine.

  • Cross-account
    Add the other account’s role ARNs to the KMS key policy and adjust bucket policy principals accordingly.


Optional: Adapting this to Amazon Connect Chat uploads

  • Where to hook: When a customer uploads an attachment in Connect Chat, route the object into the same ingest bucket/prefix (or a dedicated Connect prefix).
  • Scan & quarantine: GuardDuty plan covers that prefix; the same dispatcher moves threats to quarantine.
  • Agent UX: Use a Lambda Function in your Contact Flow to fetch an artifact status (read the object tag) and decide whether to show or block the download.

The core pattern stays identical.


Wrapping up

This pattern gives you:

  • Safe upload via presigned URLs,
  • Automated malware scanning using a fully managed service,
  • Strict read controls enforced by S3 (tag-gated access),
  • A reliable quarantine path for forensics.

It’s cloud-native, low-ops, and ready for production once you layer in alerting (EventBridge → SNS/Slack), retention policies, and access controls on the quarantine bucket.

About

build a fully serverless pipeline where uploads arrive via a REST endpoint, land in an ingest S3 bucket, get scanned by Amazon GuardDuty Malware Protection for S3, and — if malicious — are automatically quarantined to a separate bucket

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages