A Ruby gem for processing messages from Amazon SQS queues with configurable message handling and error recovery.
- Long Polling: Efficiently polls SQS with configurable wait times
- Batch Processing: Processes multiple messages per batch
- Error Handling: Robust error handling with message retention on failure
- Customizable: Extensible message processing logic
- Logging: Comprehensive logging with configurable levels
- Graceful Shutdown: Handles SIGTERM, SIGINT, and SIGQUIT signals gracefully
- Environment Variables: Support for AWS credentials and configuration via environment variables
- Ruby 2.6 or higher
- AWS credentials configured (via environment variables, IAM roles, or AWS CLI)
- SQS queue URL
gem install sqs_processor
- Clone the repository
- Install dependencies:
bundle install
- Build and install the gem:
bundle exec rake install
- Copy the environment example and configure your settings:
cp env.example .env
- Edit
.env
with your AWS credentials and SQS queue URL:
DATA_SYNC_AWS_ACCESS_KEY_ID=your_access_key_here
DATA_SYNC_AWS_SECRET_ACCESS_KEY=your_secret_key_here
DATA_SYNC_AWS_REGION=us-east-1
DATA_SYNC_SQS_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123456789012/your-queue-name
The gem is designed to be used programmatically in your Ruby applications.
require 'sqs_processor'
# Create a processor instance with AWS credentials
processor = SQSProcessor::Processor.new(
queue_url: 'https://sqs.us-east-1.amazonaws.com/123456789012/my-queue',
aws_access_key_id: 'your-access-key',
aws_secret_access_key: 'your-secret-key',
aws_region: 'us-east-1',
max_messages: 10,
visibility_timeout: 30
)
# Start processing messages
processor.process_messages
The gem uses a hook-based approach for message processing. You must implement the handle_message
method in your subclass to define how messages should be processed.
The handle_message(body)
method receives:
body
: The parsed JSON body of the message
Return true
if processing was successful (message will be deleted from queue), or false
if processing failed (message will remain in queue for retry).
{
"event_type": "data_sync",
"dataset_id": "12345",
"timestamp": "2024-01-01T00:00:00Z"
}
To implement custom message processing logic, create a subclass of SQSProcessor::Processor
and override the handle_message
method:
require 'sqs_processor'
class MyCustomProcessor < SQSProcessor::Processor
def handle_message(body)
# This method receives the parsed message body
# Return true if processing was successful, false otherwise
case body['event_type']
when 'data_sync'
process_data_sync(body)
when 'report_generation'
process_report_generation(body)
else
logger.warn "Unknown event type: #{body['event_type']}"
false
end
end
private
def process_data_sync(body)
logger.info "Processing data sync for dataset: #{body['dataset_id']}"
# Your custom logic here
true
end
def process_report_generation(body)
logger.info "Processing report generation for report: #{body['report_id']}"
# Your custom logic here
true
end
end
# Usage
processor = MyCustomProcessor.new(
queue_url: 'your-queue-url',
aws_access_key_id: 'your-access-key',
aws_secret_access_key: 'your-secret-key',
aws_region: 'us-east-1'
)
processor.process_messages
The SQSProcessor::Processor.new
method accepts the following parameters:
queue_url:
(required) - The SQS queue URLaws_access_key_id:
(required) - AWS access key IDaws_secret_access_key:
(required) - AWS secret access keyaws_region:
(optional, default: 'us-east-1') - AWS regionaws_session_token:
(optional) - AWS session token for temporary credentialsmax_messages:
(optional, default: 10) - Maximum messages per batchvisibility_timeout:
(optional, default: 30) - Message visibility timeout in secondslogger:
(optional) - Custom logger instance
DATA_SYNC_AWS_ACCESS_KEY_ID
: Your AWS access keyDATA_SYNC_AWS_SECRET_ACCESS_KEY
: Your AWS secret keyDATA_SYNC_AWS_SESSION_TOKEN
: Your AWS session token (optional, for temporary credentials)DATA_SYNC_AWS_REGION
: AWS region (default: us-east-1)DATA_SYNC_SQS_QUEUE_URL
: Your SQS queue URL
The gem supports multiple ways to provide AWS credentials:
- Initializer Configuration: Set credentials in the initializer block
- Environment Variables: Use the
DATA_SYNC_
prefixed environment variables - AWS SDK Default Chain: If no credentials are provided, the AWS SDK will use its default credential provider chain (IAM roles, AWS CLI, etc.)
- Direct Parameter: Pass credentials directly to the processor constructor
The script supports multiple ways to provide AWS credentials:
- Environment Variables: Set
AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
- IAM Roles: If running on EC2 with IAM roles
- AWS CLI: If you have AWS CLI configured
- AWS SDK Default Credential Provider Chain: Automatic credential resolution
- JSON Parse Errors: Messages with invalid JSON are logged but kept in queue
- Processing Errors: Failed messages remain in queue for retry
- Network Errors: Automatic retry with exponential backoff
- Queue Errors: Comprehensive error logging with stack traces
The processor handles termination signals gracefully:
- SIGTERM: Standard termination signal (used by container orchestrators)
- SIGINT: Interrupt signal (Ctrl+C)
- SIGQUIT: Quit signal
When a shutdown signal is received:
- The processor immediately interrupts any blocking operations (like SQS polling)
- Stops accepting new messages
- Completes processing of any current message batch
- Logs the shutdown process
- Exits cleanly
Immediate Response: Like Puma, the processor responds to shutdown signals immediately, interrupting any blocking operations without waiting for timeouts.
The script provides detailed logging including:
- Queue attributes (message counts)
- Message processing status
- Error details with stack traces
- Processing performance metrics
- Set appropriate visibility timeout: Should be longer than your processing time
- Use long polling: Reduces API calls and costs
- Handle errors gracefully: Return
false
from processing methods to keep messages in queue - Monitor queue depth: Use the built-in queue attribute reporting
- Use appropriate batch sizes: Balance between throughput and memory usage
- Deploy with graceful shutdown: The processor handles SIGTERM gracefully for container deployments
- "Queue URL is required": Set
DATA_SYNC_SQS_QUEUE_URL
environment variable or use-q
option - "Access Denied": Check AWS credentials and SQS permissions
- "Queue does not exist": Verify queue URL and region
- Messages not being processed: Check visibility timeout and processing logic
To enable debug logging, modify the logger level in the script:
@logger.level = Logger::DEBUG
bundle exec rspec
bundle exec rubocop
bundle exec rake build
bundle exec rake release
This gem is licensed under the MIT License - see the LICENSE file for details.