AWS-KinesisAnalytics-pre-process data

2018年12月23日

*
In most real-world cases, inbound records arriving Kinesis Firehose need pre-processing before persisted to S3. Same case applies to Kinesis Analytics, where data need pre-processing before reaching the SQL engine. Therefore, I evolve the previous lab environment a little bit to fulfill this demand.

Tier-1 Architecture

Tier-2 Architecture
Based on the lab result, the working mechanism seems to be like this:


Steps
*
Follow the Step 1 and Step 2 as illustrated in "AWS-Glue-Play with thermostats data", https://tianzhui.cloud/post/3378/
Create a Lambda function to pre-process data before persisted to S3 by Kinesis Firehose.
Sample Lambda code for Kinesis Firehose (in Python 2.7):

from __future__ import print_function

import base64
import json

print('Loading function')


def lambda_handler(event, context):
    output = []

    for record in event['records']:
        # print(record['recordId'])
        payload = base64.b64decode(record['data'])

        # Do custom processing on the payload here
        # Preprocessing data
        payload_dict = json.loads(payload)
        # print("INCOMING payload:")
        # print(payload_dict)
        temp = payload_dict["nestedstructure"]["temp"]
        payload_dict[u"temp"] = temp
        payload_dict.pop("nestedstructure")
        # print("OUTGOING payload:")
        # print(payload_dict)
        payload_new = json.dumps(payload_dict)+"\n"

        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(payload_new)
        }
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))
    """
    {
      "records": [
        {
          "recordId" : record ID that was passed,
          "result"   : string with value - Ok, Dropped, ProcessingFailed
          "data"     : processed base64-encoded user payload
        }
      ]
    }
    """
    return {'records': output}
*
Update Kinesis Firehose role by attaching a new inline policy:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction",
                "lambda:InvokeAsync"
            ],
            "Resource": "*"
        }
    ]
}
*
Update Kinesis Analytics role by attaching a new inline policy:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction",
                "lambda:InvokeAsync"
            ],
            "Resource": "*"
        }
    ]
}
*
Although Kinesis Firehose has its pre-processing logic associated (via Lambda function), the Kinesis Analytics sitting behind it also need to have pre-processing worker (via Lambda function), if same or similar transformation or data enrichment logic is needed there. Seen from the lab result, the raw input stream that Kinesis Analytics get from Firehose is the input stream of Kinesis Firehose, meaning the transformation work on Firehose has nothing to do with Analytics.

The raw source input data to Kinesis Analytics is as below:
{  "uuid": "cd5ea76a-678c-46c8-bc1d-0253b3912b8c",  "devicets": "2018-12-23 10:49:45.821",  "deviceid": 382,  "nestedstructure": {    "temp": 71  }}
, ...
When the raw data entering Analytics, it need transforming again into desired format using Lambda function for Kinesis Analytics (in Python 2.7). This Lambda function pre-processes data before it entering SQL engine in Kinesis Analytics.
from __future__ import print_function

import base64
import json

print('Loading function')


def lambda_handler(event, context):
    output = []

    for record in event['records']:
        payload = base64.b64decode(record['data'])
        
        # Do custom processing on the record payload here
        payload_dict = json.loads(payload)
        temp = payload_dict["nestedstructure"]["temp"]
        payload_dict[u"temp"] = temp
        payload_dict.pop("nestedstructure")
        payload_new = json.dumps(payload_dict)
        
        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(payload_new)
        }
        
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))

    return {'records': output}
*

Go to the Kinesis Firehose console, and edit:
  • Enable "source record transformation" and specify the Lambda function created in the previous step for Kinesis Firehose
  • For IAM role, select a policy that's under use role without modification
After the first CloudFormation stack is created, go to the Kinesis Analytics console, and edit:
  • Enable "record pre-processing" and specify the Lambda function created in the previous step for Kinesis Analytics
  • For IAM role, select a policy that's under use role without modification
*
Then, changed the template in KDG
from:
{
  "uuid": "{{random.uuid}}",
  "devicets": "{{date.utc("YYYY-MM-DD HH:mm:ss.SSS")}}",
  "deviceid": {{random.number(1000)}},
  "temp": {{random.weightedArrayElement(
    {"weights":[0.33, 0.33, 0.33, 0.01],"data":[70, 71, 72, 78]}
  )}}
}
to:
{
  "uuid": "{{random.uuid}}",
  "devicets": "{{date.utc("YYYY-MM-DD HH:mm:ss.SSS")}}",
  "deviceid": {{random.number(1000)}},
  "nestedstructure": {
    "temp": {{random.weightedArrayElement(
      {"weights":[0.33, 0.33, 0.33, 0.01],"data":[70, 71, 72, 78]}
    )}}
  }
}
In order to simulate a scenario where a transformation (or in other cases, data enrichment) is better to have before entering the processing engine (SQL execution). After that, start sending data, same as mentioned in article "AWS-Glue-Play with thermostats data".
*
This way, we changed an inbound record, from, for example:
{u'devicets': u'2018-12-23 04:50:32.862', u'nestedstructure': {u'temp': 70}, u'uuid': u'ca247184-90b1-419c-b4f3-7701817b6b9d', u'deviceid': 18}
to:
{u'devicets': u'2018-12-23 04:50:32.862', u'uuid': u'ca247184-90b1-419c-b4f3-7701817b6b9d', 'temp': 70, u'deviceid': 18}
*
The content of the preserved data in S3 (S3 bucket: glue-serverless-X, prefix:raw/...) done by Kinesis Firehose (called "AWSBigDataBlog-RawDeliveryStream" in the lab env).
{"devicets": "2018-12-23 09:23:21.811", "uuid": "bd6fb819-e95f-4730-a4cb-afe554fe6e18", "temp": 72, "deviceid": 692}
...
*
The content of the preserved data in S3 (S3 bucket: glue-serverless-X, prefix: results/...) done by Kinesis Analytics and then Firehose (called "AWSBigDataBlog-ResultsDeliveryStream" in the lab env).
8cc8a12c-90b6-4a93-8b02-38584b84536f,835,2018-12-23 11:49:53.824,78,70,0.11428571428571428
...
*
Follow the rest steps in the previous document, now you are good to go!
*
Reference
Preprocessing Data in Amazon Kinesis Analytics with AWS Lambda

*

Category: data Tags: public

Upvote


Downvote