Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is data loss possible when consume aggregated records by AWS lambda? #175

Open
LilTwo opened this issue Apr 28, 2023 · 11 comments
Open

Is data loss possible when consume aggregated records by AWS lambda? #175

LilTwo opened this issue Apr 28, 2023 · 11 comments

Comments

@LilTwo
Copy link

LilTwo commented Apr 28, 2023

I've read the following warning from this page

Caution - this module is only suitable for low-value messages which are processed in aggregate. Do not use Kinesis Aggregation for data which is sensitive or where every message must be delivered, and where the KCL (including with AWS Lambda) is used for processing. DATA LOSS CAN OCCUR.

But as I know, records are not consumed by KCL in lambda and need to be deaggregated manually by invoking deaggregate, Is it still possible data loss in this case?

@IanMeyers
Copy link
Contributor

Hello - if you are consuming data created by the Kinesis Producer Library and published onto Kinesis Data Streams, then you will be fine. The data loss issue occurs if you use the Aggregation Library from outside of the KPL and then try to publish messages. We'll update the documentation to make this more clear.

@LilTwo
Copy link
Author

LilTwo commented Apr 28, 2023

Hi @IanMeyers thanks for the reply, I am producing data by calling putRecord API directly, not via KPL, the data is aggregated by aggregate (https://github.com/awslabs/kinesis-aggregation/blob/master/node/lib/kpl-agg.js#L446) before puting into the stream, and consumed by AWS lambda, does that mean data loss could happen? if so, what's the casue?

@IanMeyers
Copy link
Contributor

OK - so in this case if you are using stream autoscaling or scaling the stream manually, then it is possible that aggregated records will target Shards that don't exist. Therefore when you call PutRecords you will need to process the Failures records that come back, and those will then have to be re-aggregated to target their correct Shards. This means that you have to retain the entire record base without aggregation until all your records have been flushed to the Stream.

@LilTwo
Copy link
Author

LilTwo commented Apr 28, 2023

Hi @IanMeyers, much appreciate!
I'm still a little confused, if my producer is like this

agg.aggregate([
  {
    data: '123',
    partitionKey: 'a'
  },
  {
    data: '456',
    partitionKey: 'b'
  }
], (d) => {
  kinesis.putRecord({
    Data: d.data,
    PartitionKey: d.partitionKey,
    StreamName: name
  }).promise().then(console.log)
})

as my understand, the PartitionKey in putRecord is the only thing that determine which shard the record will be send to
even after the stream is scaled, aren't the only difference is that the same partition key could leads to a different shard (child shard) ?

@IanMeyers
Copy link
Contributor

IanMeyers commented Apr 28, 2023

Correct. Let's say (for example) that your Stream only has 1 Shard, and you perform aggregation. The encoded Protobuf message will have an ExplicitHashKey that will resolve to a put onto the single Shard. However, if the Stream mutates after you performed aggregation, then it's possible that your single Protobuf message should have been dispatched to 2 separate shards with 2 separate hash keys. In this case, your PutRecords request should return a set of Failed records which have to be retried. To redrive those, you will have to re-aggregate them using the original record image. If you do not, then when a KCL client deserializes the Protobuf message, the records that didn't map to the source Shard will be dropped silently. To address this you have to either use the KPL for puts, or only use non-KCL based consumers.

@LilTwo
Copy link
Author

LilTwo commented Apr 28, 2023

ok thanks, that resolves my question!
I think the paragraph I quoted in the issue is quite misunderstanding since AWS lambda is not a KCL client which should not have the problem we're dicussing here.

@IanMeyers
Copy link
Contributor

I've been trying to get specific validation on this issue, but the Kinesis Lambda Event Source does use the KCL internally, and so may display this behaviour.

@slootjes
Copy link

@IanMeyers does this mean this tool is not safe to be used when consuming Kinesis Data streams from Lambda? I wanted to use this in combination with a QLDB stream but I'm not sure anymore if this is a good idea as it's crucial I get all records from the stream. More insights are highly appreciated, thanks :)

@IanMeyers
Copy link
Contributor

Consumption is OK, when you are just using the deaggregate methods. The only risk with this module is when you are publishing through the aggregate API.

@slootjes
Copy link

Great, thanks for confirming!

@jshlbrd
Copy link

jshlbrd commented Mar 28, 2024

This part of the warning could use some clarification:

This can result in data loss unless Kinesis Aggregation is carefully designed.

The suggestion described in this issue is to check for failed records, then re-aggregate and send. If all records in the aggregated record have the same partition key, then does that also mitigate the problem (if the EHK is never provided in the protobuf)?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants