Streaming Aurora MySQL Data Changes to Lambda
Attention
This post is only for Aurora MySQL version 2. This topic has an update for Aurora MySQL version 3, please refer to post.
Using a stored procedure is deprecated. We strongly recommend using an Aurora MySQL native function if you are using one of the following Aurora MySQL versions:
- Aurora MySQL version 2, for MySQL 5.7-compatible clusters.
- Aurora MySQL version 3.01 and higher, for MySQL 8.0-compatible clusters. The stored procedure isn't available in Aurora MySQL version 3.
--------------
Prior to today, my web site need to enrich different databases explicitly. This was done by application storing data of different purpose to different data stores synchronously. This behavior had increased the application burden, and negetively impacted user experiences.
During last weekend, we have changed that behavior from synchronous to asynchronous.
Now, contents will be firstly landed on our main DB. After that, stored procedures will trigger async calls to AWS Lambda, where data will be streamed to other data services. Different data services are for different purposes.
As of MySQL 5.6, multiple triggers are not supported, therefore, the Aurora MySQL engine need to be upgraded from 5.6 to 5.7, to support multiple triggers. The reason that multiple triggers are needed is that actions should be taken when UPDATE and DELETE happens in the table. (INSERT is not needed in this scenario.) For MySQL, you cannot write UPDATE and DELETE clauses in one trigger statement.
Before upgrading the Aurora engine, a new parameter group should be created, because the default parameter group is not modifiable.
Next, the IAM role ARN specified in the "aws_default_lambda_role" field. This role is what the RDS service will asuume to make calls to Lambda.
Because Aurora does not support in-place upgrade, we took a snapshot of the Aurora instance, and re-build a new Aurora cluster from that snapshot, but with verion 5.7 this time.
The parameter group was specified to use the one created just now.
PS
If you want to change the DB parameter group associated with a DB instance, you must manually reboot the instance before the new DB parameter group is used by the DB instance.
After the Aurora cluster had been restored, specify the IAM role under the cluster.
Login a jump server and connect to the Aurora MySQL server using MySQL workbench.
Because the content of a post could be very large, such data are not suitable to be carried over an async call. Also, my Lambda function only care about when the interested actions happen, therefore only a few columns need to be sent to Lambda. In other words, which post is undergo which type of action.
After figuring out the necessary data that should be consumed by Lambda, I could continue to configure stored procedure in Aurora.
DROP PROCEDURE IF EXISTS TO_LAMBDA; DELIMITER ;; CREATE PROCEDURE TO_LAMBDA ( IN `action` VARCHAR(8), IN `[PK]` INT(11) ) LANGUAGE SQL BEGIN CALL mysql.lambda_async( 'arn:aws:lambda:us-west-2:123456789012:function:[LambdaFunctionName]', CONCAT( '{ "action" : "', `action`, '", "PK" : "', `[PK]`, '"}') ); END ;; DELIMITER ;Fill in the column name, type and attribute, as well as the Lambda function ARN, and the data format that will be sent to Lambda.
Next, configure triggers.
DROP TRIGGER IF EXISTS To_Lambda_Trigger_Update; DELIMITER ;; CREATE TRIGGER To_Lambda_Trigger_Update AFTER UPDATE ON [TableName] FOR EACH ROW BEGIN SELECT "UPDATE", NEW.`PK` INTO @`action`, @`PK`; CALL TO_LAMBDA( @`action`, @`PK` ); END ;; DELIMITER ; DROP TRIGGER IF EXISTS To_Lambda_Trigger_Delete; DELIMITER ;; CREATE TRIGGER To_Lambda_Trigger_Delete AFTER DELETE ON [TableName] FOR EACH ROW BEGIN SELECT "DELETE", Old.`PK` INTO @`action`, @`PK`; CALL TO_LAMBDA( @`action`, @`PK` ); END ;; DELIMITER ;Fill in the field name (i.e. PK), table name to be suitable for your use case.
So, that's all.
To test, I updated and deleted of some test posts. On the Lambda side, it got notified with the necessary information.
For a demonstration code, consider below Python code.
import json def lambda_handler(event, context): print(event["action"], event["PK"]) # Handle the event ... return { 'statusCode': 200, 'body': json.dumps(...) }
From here, you are good to go. Try feeding the data into other data stores, doing full-text search on Elasticsearch or storing metadata in DynamoDB.
Have fun!
References
Capturing Data Changes in Amazon Aurora Using AWS Lambda
Error: This version of MariaDB doesn't yet support 'multiple triggers with the same action time and event for one table
MySQL error code 1235
Invoking a Lambda Function from an Amazon Aurora MySQL DB Cluster
Associating an IAM Role with an Amazon Aurora MySQL DB Cluster
Creating an IAM Role to Allow Amazon Aurora to Access AWS Services
Creating an IAM Policy to Access AWS Lambda Resources
13.1.17 CREATE PROCEDURE and CREATE FUNCTION Statements
24.3.1 Trigger Syntax and Examples
Create Trigger in MySQL
Working with DB Parameter Groups
Syntax error due to using a reserved word as a table or column name in MySQL
Call AWS Lambda from Aurora RDS Stored Procedure Permissions Issue
-