Skip to content

HandleSQSSend allow for array body #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions example-spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ role: generate
# the starenv derefers, lambdafy adds the following derefers:
#
# - lambdafy_sqs_send: This derefer will be replaced with a URL which when POSTed
# to will send a message to the SQS queue whose ARN is specified. The body
# of the POST will be sent as the SQS message body. If header
# 'Lambdafy-SQS-Group-Id' is set, it will be used as Group ID for the
# to will send a message to the SQS queue whose ARN is specified. This accepts
# either a JSON array of messages or a single message. If an array, the body
# of the POST will be split into batches and sent as entries in SQS send message batch.
# Otherwise, if a single messsage, the body of the POST will be sent as the SQS message body.
# If header 'Lambdafy-SQS-Group-Id' is set, it will be used as Group ID for the
# message. A 2xx/3xx response is considered a success, otherwise a fail. See
# the example below for usage.
# Note: The necessary IAM role permissions to send SQS messages are added
Expand Down
68 changes: 58 additions & 10 deletions proxy/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/ioutil"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
sqs "github.com/aws/aws-sdk-go-v2/service/sqs"
sqstypes "github.com/aws/aws-sdk-go-v2/service/sqs/types"
)

var sqsARNPat = regexp.MustCompile(`^arn:aws:sqs:([^:]+):([^:]+):(.+)$`)
Expand Down Expand Up @@ -168,17 +170,63 @@ func handleSQSSend(w http.ResponseWriter, r *http.Request) {
}
sqsCl := sqs.NewFromConfig(c)

if _, err := sqsCl.SendMessage(context.Background(), &sqs.SendMessageInput{
MessageBody: aws.String(string(body)),
QueueUrl: aws.String(qURL),
MessageGroupId: groupID,
}); err != nil {
log.Printf("error sending SQS message: %v", err)
http.Error(w, fmt.Sprintf("Error sending SQS message: %v", err), http.StatusInternalServerError)
return
}
// Check if body starts with '[' to detect JSON array (multiple messages)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the message starts with [. This will break existing clients.

A safe and "non-magical" option is to check for a new header SQS-Batch-Message and if set (value is not important) AND the Content-Type header starts with application/json (not == since you can have things like ; charset=utf-8 suffix — best to use ParseMediaType from stdlib to properly parse the MIME type), THEN and only then is the request valid and will be treated as a JSON array of string objects to be sent as a batch of messages to SQS.

Why new header

Since we allowed HTTP requests with any content and any content type to be treated as a byte string and to be sent on the queue, we essentially removed any possibility of the content type OR the content to define anything meaningful.

Therefore, we need an extra signal. The SQS-Batch-Message is that signal.

Why Content-Type: application/json

You should never assume the content type of an HTTP request based on its content. HTTP content is typed, unlike say a file on disk. Compliant clients/servers will treat the same bytes in the body differently based on what the content type is. It's therefore best to build conforming servers/clients that don't fall over when interacting with unforeseen peers in the future.

if len(body) > 0 && body[0] == '[' {
// Multiple messages - parse and use batch send
var messages []json.RawMessage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is assuming each message itself is also JSON. Why? This will be limiting what you can send to a queue unnecessarily and requires the receiving end to parse the message as JSON.

One alternative is to accept JSON array of strings:

var messages []string

if err := json.Unmarshal(body, &messages); err != nil {
http.Error(w, "Invalid JSON array", http.StatusBadRequest)
return
}

if len(messages) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless there is a significant performance advantage, I suggest accepting up to the maximum of 10 messages allowed by SQS API and thus making a single call to SQS.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the best way to determine this be to deploy a simple lambdafied function in the dev account and run requests against it?

http.Error(w, "Empty message array", http.StatusBadRequest)
return
}

// Build entries for batch send (split into groups of 10)
var allEntries []sqstypes.SendMessageBatchRequestEntry
for i, msg := range messages {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You can construct the message instances in the next loop where you're actually sending them to SQS 10 at a time, instead of all at once, taking up memory.

allEntries = append(allEntries, sqstypes.SendMessageBatchRequestEntry{
Id: aws.String(fmt.Sprintf("msg-%d", i)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the extra msg- prefix? This only needs to be unique within the batch and we know they are messages.

MessageBody: aws.String(string(msg)),
MessageGroupId: groupID,
})
}

// Send in batches of 10 (SQS limit)
for i := 0; i < len(allEntries); i += 10 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid magic numbers and define these as file level const:

const maxSQSBatchSize = 10

end := i + 10
if end > len(allEntries) {
end = len(allEntries)
}
batch := allEntries[i:end]

if _, err := sqsCl.SendMessageBatch(context.Background(), &sqs.SendMessageBatchInput{
QueueUrl: aws.String(qURL),
Entries: batch,
}); err != nil {
log.Printf("error sending SQS message batch: %v", err)
http.Error(w, fmt.Sprintf("Error sending SQS message batch: %v", err), http.StatusInternalServerError)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is swallowing individual errors of the batch send. The client would want to know if some of the items managed to get through.

return
}
}

log.Printf("sent an SQS message to '%s'", qURL)
log.Printf("sent %d SQS messages to '%s'", len(messages), qURL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readability: should return early here and unindent the rest of the code instead of using else. Better yet, move the single message case to the top since it's smaller and easier to read and return early after that.

} else {
// Single message - use regular send
if _, err := sqsCl.SendMessage(context.Background(), &sqs.SendMessageInput{
MessageBody: aws.String(string(body)),
QueueUrl: aws.String(qURL),
MessageGroupId: groupID,
}); err != nil {
log.Printf("error sending SQS message: %v", err)
http.Error(w, fmt.Sprintf("Error sending SQS message: %v", err), http.StatusInternalServerError)
return
}

log.Printf("sent an SQS message to '%s'", qURL)
}

}

Expand Down