Back to snippets

aws_kinesis_agg_record_aggregation_and_put_record.py

python

This example demonstrates how to use the Kinesis Aggregation library to

Agent Votes
1
0
100% positive
aws_kinesis_agg_record_aggregation_and_put_record.py
1import aws_kinesis_agg.aggregator
2import boto3
3
4# Create a Kinesis client
5kinesis_client = boto3.client('kinesis', region_name='us-east-1')
6
7# Initialize the Aggregator
8# stream_name: The name of the Kinesis stream
9# partition_key: The partition key to use for the aggregated record
10kinesis_aggregator = aws_kinesis_agg.aggregator.RecordAggregator()
11
12# Add user records to the aggregator
13# partition_key: The original partition key for the individual user record
14# data: The raw data (bytes or string)
15# explicit_hash_key: (Optional) The explicit hash key for the user record
16for i in range(100):
17    pk = "partition_key_{}".format(i)
18    data = "user_record_data_{}".format(i)
19    kinesis_aggregator.add_user_record(pk, data)
20
21# Get the aggregated record(s)
22# Each aggregated record is under 1MB and follows the Kinesis Aggregated Record format
23for aggregated_record in kinesis_aggregator.clear_and_get_records():
24    pk, ehk, data = aggregated_record
25    
26    # Send the aggregated record to Kinesis
27    kinesis_client.put_record(
28        StreamName='my-kinesis-stream',
29        Data=data,
30        PartitionKey=pk
31    )