Skip to content

Commit

Permalink
Demo for serverless integration with Textract
Browse files Browse the repository at this point in the history
  • Loading branch information
Vinod Kumar authored and Vinod Kumar committed Dec 12, 2024
1 parent 9331c49 commit 39ee2cf
Show file tree
Hide file tree
Showing 11 changed files with 1,818 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@
webapp/node_modules/
webapp/package-lock.json
iac/aws/terraform/creating-custom-vpc/.terraform/
iac/demo/textract/.terraform.lock.hcl
iac/demo/textract/.terraform/*

27 changes: 27 additions & 0 deletions iac/demo/textract/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Python Lambda files
LAMBDA1_FILE = lambda_function.py
LAMBDA2_FILE = sqs_to_csv_lambda.py

# Output zip files
LAMBDA1_ZIP = lambda_function.zip
LAMBDA2_ZIP = sqs_to_csv_lambda.zip

# Default target
all: zip-lambdas

# Zip the Lambda functions
zip-lambdas: $(LAMBDA1_ZIP) $(LAMBDA2_ZIP)

$(LAMBDA1_ZIP): $(LAMBDA1_FILE)
@echo "Zipping $(LAMBDA1_FILE) into $(LAMBDA1_ZIP)..."
zip $(LAMBDA1_ZIP) $(LAMBDA1_FILE)

$(LAMBDA2_ZIP): $(LAMBDA2_FILE)
@echo "Zipping $(LAMBDA2_FILE) into $(LAMBDA2_ZIP)..."
zip $(LAMBDA2_ZIP) $(LAMBDA2_FILE)

# Clean the zip files
clean:
rm -f $(LAMBDA1_ZIP) $(LAMBDA2_ZIP)
@echo "Cleaned up old zip files!"

136 changes: 136 additions & 0 deletions iac/demo/textract/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import boto3
import json
import os

s3_client = boto3.client('s3')
textract_client = boto3.client('textract')
sns_client = boto3.client('sns')

SNS_TOPIC_ARN = os.environ['SNS_TOPIC_ARN'] # Environment variable for SNS topic ARN

SUPPORTED_IMAGE_EXTENSIONS = ['.png', '.jpg', '.jpeg']
SUPPORTED_PDF_EXTENSION = '.pdf'

def lambda_handler(event, context):
try:
print(f"Event: {json.dumps(event)}")

# Get S3 bucket and object key from the S3 event
bucket_name = event['Records'][0]['s3']['bucket']['name']
object_key = event['Records'][0]['s3']['object']['key']

print('bucket_name::', bucket_name, ' - object_key::', object_key)

# Validate file extension
if any(object_key.lower().endswith(ext) for ext in SUPPORTED_IMAGE_EXTENSIONS):
# Process image files
response = textract_client.detect_document_text(
Document={'S3Object': {'Bucket': bucket_name, 'Name': object_key}}
)

print('Textract detect_document_text response::', response)

# Extract text blocks
# text_blocks = [block['Text'] for block in response['Blocks'] if block['BlockType'] == 'LINE']
# extracted_text = '\n'.join(text_blocks)
# print('extracted_text::', extracted_text)


# Extract text blocks with confidence scores
text_blocks_with_confidence = []
for block in response['Blocks']:
if block['BlockType'] == 'LINE':
text_blocks_with_confidence.append((block['Text'], block['Confidence']))


# Format the extracted text with confidence scores
extracted_text_with_confidence = '\n'.join([f"{text} (Confidence: {confidence:.2f})" for text, confidence in text_blocks_with_confidence])
print('extracted_text_with_confidence::', extracted_text_with_confidence)


# Send extracted text to SNS
sns_client.publish(
TopicArn=SNS_TOPIC_ARN,
Message=json.dumps({
'bucket': bucket_name,
'key': object_key,
'text': extracted_text_with_confidence
}),
Subject='Textract Extracted Text from Image'
)

elif object_key.lower().endswith(SUPPORTED_PDF_EXTENSION):
# Process PDF files
try:
response = textract_client.analyze_document(
Document={'S3Object': {'Bucket': bucket_name, 'Name': object_key}},
FeatureTypes=['QUERIES'],
QueriesConfig={
'Queries': [
{'Text': 'What is the event name?', 'Alias': 'EventName'},
{'Text': 'What is the Location?', 'Alias': 'Location'}
]
}
)

print('Textract analyze_document response::', response)

# # Extract key-value pairs from QUERY_RESULT blocks
# query_results = {
# block['QueryResult']['Alias']: block['Text']
# for block in response['Blocks']
# if block['BlockType'] == 'QUERY_RESULT'
# }


query_results = {}
for block in response['Blocks']:
if block['BlockType'] == 'QUERY_RESULT':
for relationship in block['Relationships']:
if relationship['Type'] == 'ANSWER':
query_id = relationship['Ids'][0]
for query_block in response['Blocks']:
if query_block['Id'] == query_id:
query_alias = query_block['Query']['Alias']
query_results[query_alias] = {
'Text': block['Text'],
'Confidence': block['Confidence']
}

print('query_results::', query_results)


# Send key-value pairs to SNS
sns_client.publish(
TopicArn=SNS_TOPIC_ARN,
Message=json.dumps({
'bucket': bucket_name,
'key': object_key,
'key_value_pairs': query_results
}),
Subject='Textract Extracted Key-Value Pairs from PDF'
)

except Exception as e:
print(f"Error processing file: {e}")

else:
raise ValueError(f"Unsupported file extension for file: {object_key}")

return {
'statusCode': 200,
'body': json.dumps('File processed successfully and data sent to SNS')
}

except textract_client.exceptions.UnsupportedDocumentException as e:
print(f"Unsupported document format: {e}")
return {
'statusCode': 400,
'body': json.dumps('Unsupported document format')
}
except Exception as e:
print(f"Error processing file: {e}")
return {
'statusCode': 500,
'body': json.dumps(f"Error processing file: {str(e)}")
}
Binary file added iac/demo/textract/lambda_function.zip
Binary file not shown.
206 changes: 206 additions & 0 deletions iac/demo/textract/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
provider "aws" {
region = var.region
profile = var.aws_profile
}

# S3 Bucket
resource "aws_s3_bucket" "textract_bucket" {
bucket_prefix = var.s3_bucket_name

tags = {
Name = "TextractBucket"
}
force_destroy = true
}

# SNS Topic
resource "aws_sns_topic" "textract_topic" {
name = "textract-sns-topic"

tags = {
Name = "TextractSNSTopic"
}
}

# Lambda Function
resource "aws_lambda_function" "textract_lambda" {
filename = "lambda_function.zip"
function_name = var.lambda_function_name
role = aws_iam_role.lambda_role.arn
handler = "lambda_function.lambda_handler"
runtime = "python3.11"
source_code_hash = filebase64sha256("lambda_function.zip")
timeout = 60

environment {
variables = {
SNS_TOPIC_ARN = aws_sns_topic.textract_topic.arn # Pass SNS Topic ARN as an environment variable
}
}
}

# S3 Bucket Notification to Lambda
resource "aws_s3_bucket_notification" "bucket_notification" {
bucket = aws_s3_bucket.textract_bucket.id

lambda_function {
lambda_function_arn = aws_lambda_function.textract_lambda.arn
events = ["s3:ObjectCreated:*"]
filter_suffix = ".pdf"
}

lambda_function {
lambda_function_arn = aws_lambda_function.textract_lambda.arn
events = ["s3:ObjectCreated:*"]
filter_suffix = ".jpeg"
}

depends_on = [aws_lambda_permission.allow_s3]
}

# IAM Role for Lambda
resource "aws_iam_role" "lambda_role" {
name = "lambda-textract-role"

assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
}
]
})
}

# IAM Policy for Lambda to publish to SNS
resource "aws_iam_role_policy" "lambda_policy" {
name = "lambda-textract-policy"
role = aws_iam_role.lambda_role.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = ["s3:GetObject", "s3:PutObject"]
Resource = "${aws_s3_bucket.textract_bucket.arn}/*"
},
{
Effect = "Allow"
Action = ["sns:Publish"]
Resource = aws_sns_topic.textract_topic.arn
},
{
Effect = "Allow"
Action = ["textract:DetectDocumentText"]
Resource = "*"
},
{
"Effect": "Allow",
"Action": "textract:AnalyzeDocument",
"Resource": "*"
},
{
Effect = "Allow"
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
]
Resource = "*"
},
{
Effect = "Allow"
Action = [
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
]
Resource = aws_sqs_queue.textract_queue.arn
}
]
})
}




# Allow S3 to invoke Lambda
resource "aws_lambda_permission" "allow_s3" {
statement_id = "AllowS3Invoke"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.textract_lambda.function_name
principal = "s3.amazonaws.com"
source_arn = aws_s3_bucket.textract_bucket.arn
}

# SQS Queue
resource "aws_sqs_queue" "textract_queue" {
name = var.sqs_queue_name
visibility_timeout_seconds = 60

tags = {
Name = "TextractQueue"
}
}

# SQS Queue Policy to allow SNS to publish messages
resource "aws_sqs_queue_policy" "sns_publish_policy" {
queue_url = aws_sqs_queue.textract_queue.id
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Principal = {
Service = "sns.amazonaws.com"
}
Action = "sqs:SendMessage"
Resource = aws_sqs_queue.textract_queue.arn
Condition = {
ArnEquals = {
"aws:SourceArn" = aws_sns_topic.textract_topic.arn
}
}
}
]
})
}

# SNS Subscription for SQS
resource "aws_sns_topic_subscription" "sns_to_sqs" {
topic_arn = aws_sns_topic.textract_topic.arn
protocol = "sqs"
endpoint = aws_sqs_queue.textract_queue.arn

# Allow SNS to publish to SQS
depends_on = [aws_sqs_queue_policy.sns_publish_policy]
}

resource "aws_lambda_function" "sqs_to_csv_lambda" {
filename = "sqs_to_csv_lambda.zip" # Path to your Lambda zip file
function_name = var.lambda_function_name_2
role = aws_iam_role.lambda_role.arn # IAM role for Lambda
handler = "sqs_to_csv_lambda.lambda_handler" # Lambda function handler
runtime = "python3.11" # Lambda runtime
source_code_hash = filebase64sha256("sqs_to_csv_lambda.zip") # Source code hash for validation
timeout = 60 # Timeout in seconds

environment {
variables = {
CSV_S3_BUCKET = aws_s3_bucket.textract_bucket.bucket # S3 bucket name
CSV_S3_PREFIX = var.csv_s3_prefix # Prefix for CSV files
}
}
}

resource "aws_lambda_event_source_mapping" "sqs_trigger" {
event_source_arn = aws_sqs_queue.textract_queue.arn
function_name = aws_lambda_function.sqs_to_csv_lambda.arn
batch_size = 1
enabled = true
}

19 changes: 19 additions & 0 deletions iac/demo/textract/outputs.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
output "s3_bucket_name" {
value = aws_s3_bucket.textract_bucket.id
}

output "lambda_function_arn" {
value = aws_lambda_function.textract_lambda.arn
}

output "sqs_queue_url" {
value = aws_sqs_queue.textract_queue.id
}

output "csv_lambda_function_arn" {
value = aws_lambda_function.sqs_to_csv_lambda.arn
}

output "csv_s3_prefix" {
value = var.csv_s3_prefix
}
Loading

0 comments on commit 39ee2cf

Please sign in to comment.