Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully Handle 5xx errors when trying to write to kinesis #2547

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion internal/impl/aws/output_kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ func (a *kinesisWriter) WriteBatch(ctx context.Context, batch service.MessageBat
continue
}

// requeue any individual records that failed due to throttling
// requeue any individual records that failed due to throttling, or encountered an internal failure
// AWS states that 5xx errors can occur for several minutes, and should be retired: https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/
failed = nil
if output.FailedRecordCount != nil {
for i, entry := range output.Records {
Expand All @@ -252,6 +253,8 @@ func (a *kinesisWriter) WriteBatch(ctx context.Context, batch service.MessageBat
a.log.Errorf("Kinesis record write request rate too high, either the frequency or the size of the data exceeds your available throughput.")
case "KMSThrottlingException":
a.log.Errorf("Kinesis record write request throttling exception, the send traffic exceeds your request quota.")
case "InternalFailure":
a.log.Errorf("Kinesis record write request failed with an internal service failure: %s", *entry.ErrorMessage)
default:
err = fmt.Errorf("record failed with code [%s] %s: %+v", *entry.ErrorCode, *entry.ErrorMessage, input.Records[i])
a.log.Errorf("kinesis record write error: %v\n", err)
Expand Down