AWS-KinesisAnalytics-pre-process data
*
In most real-world cases, inbound records arriving Kinesis Firehose need pre-processing before persisted to S3. Same case applies to Kinesis Analytics, where data need pre-processing before reaching the SQL engine. Therefore, I evolve the previous lab environment a little bit to fulfill this demand.
Tier-1 Architecture
Tier-2 Architecture
Based on the lab result, the working mechanism seems to be like this:
Steps
*
Follow the Step 1 and Step 2 as illustrated in "AWS-Glue-Play with thermostats data", https://tianzhui.cloud/post/3378/
Create a Lambda function to pre-process data before persisted to S3 by Kinesis Firehose.
Sample Lambda code for Kinesis Firehose (in Python 2.7):
from __future__ import print_function import base64 import json print('Loading function') def lambda_handler(event, context): output = [] for record in event['records']: # print(record['recordId']) payload = base64.b64decode(record['data']) # Do custom processing on the payload here # Preprocessing data payload_dict = json.loads(payload) # print("INCOMING payload:") # print(payload_dict) temp = payload_dict["nestedstructure"]["temp"] payload_dict[u"temp"] = temp payload_dict.pop("nestedstructure") # print("OUTGOING payload:") # print(payload_dict) payload_new = json.dumps(payload_dict)+"\n" output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': base64.b64encode(payload_new) } output.append(output_record) print('Successfully processed {} records.'.format(len(event['records']))) """ { "records": [ { "recordId" : record ID that was passed, "result" : string with value - Ok, Dropped, ProcessingFailed "data" : processed base64-encoded user payload } ] } """ return {'records': output}*
Update Kinesis Firehose role by attaching a new inline policy:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:InvokeAsync" ], "Resource": "*" } ] }*
Update Kinesis Analytics role by attaching a new inline policy:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:InvokeAsync" ], "Resource": "*" } ] }*
The raw source input data to Kinesis Analytics is as below:
{ "uuid": "cd5ea76a-678c-46c8-bc1d-0253b3912b8c", "devicets": "2018-12-23 10:49:45.821", "deviceid": 382, "nestedstructure": { "temp": 71 }} , ...
from __future__ import print_function import base64 import json print('Loading function') def lambda_handler(event, context): output = [] for record in event['records']: payload = base64.b64decode(record['data']) # Do custom processing on the record payload here payload_dict = json.loads(payload) temp = payload_dict["nestedstructure"]["temp"] payload_dict[u"temp"] = temp payload_dict.pop("nestedstructure") payload_new = json.dumps(payload_dict) output_record = { 'recordId': record['recordId'], 'result': 'Ok', 'data': base64.b64encode(payload_new) } output.append(output_record) print('Successfully processed {} records.'.format(len(event['records']))) return {'records': output}*
Go to the Kinesis Firehose console, and edit:
- Enable "source record transformation" and specify the Lambda function created in the previous step for Kinesis Firehose
- For IAM role, select a policy that's under use role without modification
- Enable "record pre-processing" and specify the Lambda function created in the previous step for Kinesis Analytics
- For IAM role, select a policy that's under use role without modification
Then, changed the template in KDG
from:
{ "uuid": "{{random.uuid}}", "devicets": "{{date.utc("YYYY-MM-DD HH:mm:ss.SSS")}}", "deviceid": {{random.number(1000)}}, "temp": {{random.weightedArrayElement( {"weights":[0.33, 0.33, 0.33, 0.01],"data":[70, 71, 72, 78]} )}} }to:
{ "uuid": "{{random.uuid}}", "devicets": "{{date.utc("YYYY-MM-DD HH:mm:ss.SSS")}}", "deviceid": {{random.number(1000)}}, "nestedstructure": { "temp": {{random.weightedArrayElement( {"weights":[0.33, 0.33, 0.33, 0.01],"data":[70, 71, 72, 78]} )}} } }In order to simulate a scenario where a transformation (or in other cases, data enrichment) is better to have before entering the processing engine (SQL execution). After that, start sending data, same as mentioned in article "AWS-Glue-Play with thermostats data".
*
This way, we changed an inbound record, from, for example:
{u'devicets': u'2018-12-23 04:50:32.862', u'nestedstructure': {u'temp': 70}, u'uuid': u'ca247184-90b1-419c-b4f3-7701817b6b9d', u'deviceid': 18}to:
{u'devicets': u'2018-12-23 04:50:32.862', u'uuid': u'ca247184-90b1-419c-b4f3-7701817b6b9d', 'temp': 70, u'deviceid': 18}*
The content of the preserved data in S3 (S3 bucket: glue-serverless-X, prefix:raw/...) done by Kinesis Firehose (called "AWSBigDataBlog-RawDeliveryStream" in the lab env).
{"devicets": "2018-12-23 09:23:21.811", "uuid": "bd6fb819-e95f-4730-a4cb-afe554fe6e18", "temp": 72, "deviceid": 692} ...*
The content of the preserved data in S3 (S3 bucket: glue-serverless-X, prefix: results/...) done by Kinesis Analytics and then Firehose (called "AWSBigDataBlog-ResultsDeliveryStream" in the lab env).
8cc8a12c-90b6-4a93-8b02-38584b84536f,835,2018-12-23 11:49:53.824,78,70,0.11428571428571428 ...*
Follow the rest steps in the previous document, now you are good to go!
*
Reference
Preprocessing Data in Amazon Kinesis Analytics with AWS Lambda
*