Skip to content

Dummy commit for CodeGuru testing #98

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,4 @@ public Executor getExecutor() {
return this.executor;
}
}
}
}
3 changes: 2 additions & 1 deletion python/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
.idea
build
.pytest_cache
python_lambda_build.zip
dist/*
*.zip
58 changes: 53 additions & 5 deletions python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,67 @@ Alternately, you can simply copy the aws_kinesis_agg module from this repository

The [aggregator.py](aggregator.py) module contains Python classes that allow you to aggregate records using the [Kinesis Aggregated Record Format](https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md). Using record aggregation improves throughput and reduces costs when writing producer applications that publish data to Amazon Kinesis.

### Caution - this module is only suitable for low-value messages which are processed in aggregate. Do not use Kinesis Aggregation for data which is sensitive or where every message must be delivered, and where the KCL (including with AWS Lambda) is used for processing. [DATA LOSS CAN OCCUR.](../potential_data_loss.md)

### Usage

The record aggregation module provides a simple interface for creating protocol buffers encoded data in a producer application. The `aws_kinesis_agg` module provides methods for efficiently packing individual records into larger aggregated records.
The record aggregation module provides a simple interface for creating protocol buffers encoded data in a producer application. The `aws_kinesis_agg` module provides methods for efficiently packing individual records into larger aggregated records, and deaggregating large records into a set of 'real' user records.

When using aggregation, you create a RecordAggregator object and then provide a partition key, raw data and (optionally) an explicit hash key for each record. You can choose to either provide a callback function that will be invoked when a fully-packed aggregated record is available or you can add records and check byte sizes or number of records until the aggregated record is suitably full. You're guaranteed that any aggregated record returned from the RecordAggregator object will fit within a single PutRecord request to Kinesis.
When using aggregation, you create a managing class which helps you to target the correct Kinesis Shard, and then provide a partition key, raw data and (optionally) an explicit hash key for each record. You can choose to either provide a callback function that will be invoked when a fully-packed aggregated record is available or you can add records and check byte sizes or number of records until the aggregated record is suitably full. You're guaranteed that any aggregated record returned from the RecordAggregator object will fit within a single PutRecord request to Kinesis. As you produce records in your producer application, you will aggregate them using a base `RecordAggregator` object, which provides methods to do both iterative aggregation and callback-based aggregation.

To get started, import the `aws_kinesis_agg` module:
There are two ways to create aggregated user records. The first is to use a raw `RecordAggregator`, which can aggregate messages *which are targeted for a single Shard*, or use the `AggregationManager` to aggregate messages which may span Shards. From version `1.2.0`, we __highly__ recommend the use of `AggregationManager` to limit any exposure to data loss.

### Aggregation Manager

Record Aggregation results in a single message that tightly packs `UserRecords`, and in v2 of the Kinesis Client Library all of the messages in a single aggregated payload must target the same Shard. Version 1.2.0 of Kinesis Aggregation for Python includes the `AggregationManager` class which ensures that aggregated messages will only target a single Kinesis Shard at a time. It does this by periodically refreshing the underlying Shard topology, and managing one `RecordAggregator` per destination Shard.

To use `AggregationManager`, import it:

```
import aws_kinesis_agg as agg
aggregation_manager = agg.AggregationManager(
stream_name: str,
region_name: str,
refresh_shard_frequency_count: int
)
```

Where:

* `stream_name` is the name of the destination Stream
* `region_name` is the AWS Region in which the Stream is provisioned. Default is `us-east-1`
* `refresh_shard_frequency_count` is the number of aggregated records that can be added before a Shard refresh occurs. Default is 1000.

You can then perform aggregation by executing:

```
aggregation_manager.add_user_record(
partition_key: str,
explicit_hash_key: int = None,
data
)
```

This will generate (including callbacks) or extract the `RecordAggregator` for the target Shard, and add the user record to it. You can also use the non-encapsulated format which access the underlying `RecordAggregator` directly:

```
import aws_kinesis_agg
aggregator = aggregation_manager.get_record_aggregator(
partition_key:str,
explicit_hash_key:int = None
)
aggregator.add_user_record(...)
```

As you produce records in your producer application, you will aggregate them using the aggregation methods available in the `aws_kinesis_agg` module. The `aws_kinesis_agg` module provides methods to do both iterative aggregation and callback-based aggregation.
Please __NOTE__ that every time the Shard cache is refreshed, all Callbacks will be executed and then `RecordAggregator` will be discarded. Therefore the use of Callbacks is mandatory.

### Raw Aggregation

You can construct a raw `RecordAggregator` class with:

```
import aws_kinesis_agg as agg
kinesis_aggregator = agg.RecordAggregator()
```

#### Iterative Aggregation

Expand Down
68 changes: 58 additions & 10 deletions python/aws_kinesis_agg.egg-info/PKG-INFO
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
Metadata-Version: 1.1
Metadata-Version: 2.1
Name: aws-kinesis-agg
Version: 1.1.3
Version: 1.2.0
Summary: Python module to assist in taking advantage of the Kinesis message aggregation format for both aggregation and deaggregation.
Home-page: http://github.com/awslabs/kinesis-aggregation
Author: Brent Nash
Author-email: brenash@amazon.com
Author: Ian Meyers
Author-email: meyersi@amazon.com
License: Apache-2.0
Description-Content-Type: text/markdown
Description: # Python Kinesis Aggregation & Deaggregation Modules

The Kinesis Aggregation/Deaggregation Modules for Python provide the ability to do in-memory aggregation and deaggregation of standard Kinesis user records using the [Kinesis Aggregated Record Format](https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md) to allow for more efficient transmission of records.
Expand All @@ -25,19 +24,67 @@ Description: # Python Kinesis Aggregation & Deaggregation Modules

The [aggregator.py](aggregator.py) module contains Python classes that allow you to aggregate records using the [Kinesis Aggregated Record Format](https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md). Using record aggregation improves throughput and reduces costs when writing producer applications that publish data to Amazon Kinesis.

### Caution - this module is only suitable for low-value messages which are processed in aggregate. Do not use Kinesis Aggregation for data which is sensitive or where every message must be delivered, and where the KCL (including with AWS Lambda) is used for processing. [DATA LOSS CAN OCCUR.](../potential_data_loss.md)

### Usage

The record aggregation module provides a simple interface for creating protocol buffers encoded data in a producer application. The `aws_kinesis_agg` module provides methods for efficiently packing individual records into larger aggregated records.
The record aggregation module provides a simple interface for creating protocol buffers encoded data in a producer application. The `aws_kinesis_agg` module provides methods for efficiently packing individual records into larger aggregated records, and deaggregating large records into a set of 'real' user records.

When using aggregation, you create a RecordAggregator object and then provide a partition key, raw data and (optionally) an explicit hash key for each record. You can choose to either provide a callback function that will be invoked when a fully-packed aggregated record is available or you can add records and check byte sizes or number of records until the aggregated record is suitably full. You're guaranteed that any aggregated record returned from the RecordAggregator object will fit within a single PutRecord request to Kinesis.
When using aggregation, you create a managing class which helps you to target the correct Kinesis Shard, and then provide a partition key, raw data and (optionally) an explicit hash key for each record. You can choose to either provide a callback function that will be invoked when a fully-packed aggregated record is available or you can add records and check byte sizes or number of records until the aggregated record is suitably full. You're guaranteed that any aggregated record returned from the RecordAggregator object will fit within a single PutRecord request to Kinesis. As you produce records in your producer application, you will aggregate them using a base `RecordAggregator` object, which provides methods to do both iterative aggregation and callback-based aggregation.

To get started, import the `aws_kinesis_agg` module:
There are two ways to create aggregated user records. The first is to use a raw `RecordAggregator`, which can aggregate messages *which are targeted for a single Shard*, or use the `AggregationManager` to aggregate messages which may span Shards. From version `1.2.0`, we __highly__ recommend the use of `AggregationManager` to limit any exposure to data loss.

### Aggregation Manager

Record Aggregation results in a single message that tightly packs `UserRecords`, and in v2 of the Kinesis Client Library all of the messages in a single aggregated payload must target the same Shard. Version 1.2.0 of Kinesis Aggregation for Python includes the `AggregationManager` class which ensures that aggregated messages will only target a single Kinesis Shard at a time. It does this by periodically refreshing the underlying Shard topology, and managing one `RecordAggregator` per destination Shard.

To use `AggregationManager`, import it:

```
import aws_kinesis_agg as agg
aggregation_manager = agg.AggregationManager(
stream_name: str,
region_name: str,
refresh_shard_frequency_count: int
)
```

Where:

* `stream_name` is the name of the destination Stream
* `region_name` is the AWS Region in which the Stream is provisioned. Default is `us-east-1`
* `refresh_shard_frequency_count` is the number of aggregated records that can be added before a Shard refresh occurs. Default is 1000.

You can then perform aggregation by executing:

```
aggregation_manager.add_user_record(
partition_key: str,
explicit_hash_key: int = None,
data
)
```

This will generate (including callbacks) or extract the `RecordAggregator` for the target Shard, and add the user record to it. You can also use the non-encapsulated format which access the underlying `RecordAggregator` directly:

```
import aws_kinesis_agg
aggregator = aggregation_manager.get_record_aggregator(
partition_key:str,
explicit_hash_key:int = None
)
aggregator.add_user_record(...)
```

As you produce records in your producer application, you will aggregate them using the aggregation methods available in the `aws_kinesis_agg` module. The `aws_kinesis_agg` module provides methods to do both iterative aggregation and callback-based aggregation.
Please __NOTE__ that every time the Shard cache is refreshed, all Callbacks will be executed and then `RecordAggregator` will be discarded. Therefore the use of Callbacks is mandatory.

### Raw Aggregation

You can construct a raw `RecordAggregator` class with:

```
import aws_kinesis_agg as agg
kinesis_aggregator = agg.RecordAggregator()
```

#### Iterative Aggregation

Expand Down Expand Up @@ -277,3 +324,4 @@ Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3.6
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: Utilities
Description-Content-Type: text/markdown
Loading