AWS-KinesisAnalytics-pre-process data
*
In most real-world cases, inbound records arriving Kinesis Firehose need pre-processing before persisted to S3. Same case applies to Kinesis Analytics, where data need pre-processing before reaching the SQL engine. Therefore, I evolve the previous lab environment a little bit to fulfill this demand.
Tier-1 Architecture

Tier-2 Architecture
Based on the lab result, the working mechanism seems to be like this:
Steps
*
Follow the Step 1 and Step 2 as illustrated in "AWS-Glue-Play with thermostats data", https://tianzhui.cloud/post/3378/
Create a Lambda function to pre-process data before persisted to S3 by Kinesis Firehose.
Sample Lambda code for Kinesis Firehose (in Python 2.7):
from __future__ import print_function
import base64
import json
print('Loading function')
def lambda_handler(event, context):
output = []
for record in event['records']:
# print(record['recordId'])
payload = base64.b64decode(record['data'])
# Do custom processing on the payload here
# Preprocessing data
payload_dict = json.loads(payload)
# print("INCOMING payload:")
# print(payload_dict)
temp = payload_dict["nestedstructure"]["temp"]
payload_dict[u"temp"] = temp
payload_dict.pop("nestedstructure")
# print("OUTGOING payload:")
# print(payload_dict)
payload_new = json.dumps(payload_dict)+"\n"
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(payload_new)
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['records'])))
"""
{
"records": [
{
"recordId" : record ID that was passed,
"result" : string with value - Ok, Dropped, ProcessingFailed
"data" : processed base64-encoded user payload
}
]
}
"""
return {'records': output}
*Update Kinesis Firehose role by attaching a new inline policy:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction",
"lambda:InvokeAsync"
],
"Resource": "*"
}
]
}
*Update Kinesis Analytics role by attaching a new inline policy:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction",
"lambda:InvokeAsync"
],
"Resource": "*"
}
]
}
*The raw source input data to Kinesis Analytics is as below:
{ "uuid": "cd5ea76a-678c-46c8-bc1d-0253b3912b8c", "devicets": "2018-12-23 10:49:45.821", "deviceid": 382, "nestedstructure": { "temp": 71 }}
, ...
from __future__ import print_function
import base64
import json
print('Loading function')
def lambda_handler(event, context):
output = []
for record in event['records']:
payload = base64.b64decode(record['data'])
# Do custom processing on the record payload here
payload_dict = json.loads(payload)
temp = payload_dict["nestedstructure"]["temp"]
payload_dict[u"temp"] = temp
payload_dict.pop("nestedstructure")
payload_new = json.dumps(payload_dict)
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(payload_new)
}
output.append(output_record)
print('Successfully processed {} records.'.format(len(event['records'])))
return {'records': output}
*Go to the Kinesis Firehose console, and edit:
- Enable "source record transformation" and specify the Lambda function created in the previous step for Kinesis Firehose
- For IAM role, select a policy that's under use role without modification
- Enable "record pre-processing" and specify the Lambda function created in the previous step for Kinesis Analytics
- For IAM role, select a policy that's under use role without modification
Then, changed the template in KDG
from:
{
"uuid": "{{random.uuid}}",
"devicets": "{{date.utc("YYYY-MM-DD HH:mm:ss.SSS")}}",
"deviceid": {{random.number(1000)}},
"temp": {{random.weightedArrayElement(
{"weights":[0.33, 0.33, 0.33, 0.01],"data":[70, 71, 72, 78]}
)}}
}
to:{
"uuid": "{{random.uuid}}",
"devicets": "{{date.utc("YYYY-MM-DD HH:mm:ss.SSS")}}",
"deviceid": {{random.number(1000)}},
"nestedstructure": {
"temp": {{random.weightedArrayElement(
{"weights":[0.33, 0.33, 0.33, 0.01],"data":[70, 71, 72, 78]}
)}}
}
}
In order to simulate a scenario where a transformation (or in other cases, data enrichment) is better to have before entering the processing engine (SQL execution). After that, start sending data, same as mentioned in article "AWS-Glue-Play with thermostats data".*
This way, we changed an inbound record, from, for example:
{u'devicets': u'2018-12-23 04:50:32.862', u'nestedstructure': {u'temp': 70}, u'uuid': u'ca247184-90b1-419c-b4f3-7701817b6b9d', u'deviceid': 18}
to:{u'devicets': u'2018-12-23 04:50:32.862', u'uuid': u'ca247184-90b1-419c-b4f3-7701817b6b9d', 'temp': 70, u'deviceid': 18}
*The content of the preserved data in S3 (S3 bucket: glue-serverless-X, prefix:raw/...) done by Kinesis Firehose (called "AWSBigDataBlog-RawDeliveryStream" in the lab env).
{"devicets": "2018-12-23 09:23:21.811", "uuid": "bd6fb819-e95f-4730-a4cb-afe554fe6e18", "temp": 72, "deviceid": 692}
...
*The content of the preserved data in S3 (S3 bucket: glue-serverless-X, prefix: results/...) done by Kinesis Analytics and then Firehose (called "AWSBigDataBlog-ResultsDeliveryStream" in the lab env).
8cc8a12c-90b6-4a93-8b02-38584b84536f,835,2018-12-23 11:49:53.824,78,70,0.11428571428571428 ...*
Follow the rest steps in the previous document, now you are good to go!
*
Reference
Preprocessing Data in Amazon Kinesis Analytics with AWS Lambda
*