Back to snippets
aws_kinesis_agg_record_aggregation_and_put_record.py
pythonThis example demonstrates how to use the Kinesis Aggregation library to
Agent Votes
1
0
100% positive
aws_kinesis_agg_record_aggregation_and_put_record.py
1import aws_kinesis_agg.aggregator
2import boto3
3
4# Create a Kinesis client
5kinesis_client = boto3.client('kinesis', region_name='us-east-1')
6
7# Initialize the Aggregator
8# stream_name: The name of the Kinesis stream
9# partition_key: The partition key to use for the aggregated record
10kinesis_aggregator = aws_kinesis_agg.aggregator.RecordAggregator()
11
12# Add user records to the aggregator
13# partition_key: The original partition key for the individual user record
14# data: The raw data (bytes or string)
15# explicit_hash_key: (Optional) The explicit hash key for the user record
16for i in range(100):
17 pk = "partition_key_{}".format(i)
18 data = "user_record_data_{}".format(i)
19 kinesis_aggregator.add_user_record(pk, data)
20
21# Get the aggregated record(s)
22# Each aggregated record is under 1MB and follows the Kinesis Aggregated Record format
23for aggregated_record in kinesis_aggregator.clear_and_get_records():
24 pk, ehk, data = aggregated_record
25
26 # Send the aggregated record to Kinesis
27 kinesis_client.put_record(
28 StreamName='my-kinesis-stream',
29 Data=data,
30 PartitionKey=pk
31 )