Amazon Redshift Database Developer Guide - Create Streaming ingestion into materialized view
2023年03月29日
2. Amazon Redshift cluster should have a route to get to the Kinesis Data Streams endpoints, either over the Internet using a NAT gateway or internet gateway.
This procedure demonstrates how to ingest data from a Kinesis stream named ev_station_data, which contains consumption data from different EV charging stations, in JSON format. The schema is well defined. The example shows how to store the data as raw JSON and also how to convert the JSON data to Amazon Redshift data types as it's ingested.
Producer setup
1. Using Amazon Kinesis Data Streams, follow the steps to create a stream named
2. The Amazon Kinesis Data Generator can help you generate test data for use with your stream. Follow the steps detailed in the tool to get started, and use the following data template for generating your data:
Amazon Redshift setup
Configure the materialized view to ingest data
1. Create an external schema to map the data from Kinesis to a schema.
2. Create a materialized view to consume the stream data.
The following examples show both methods of defining materialized views to ingest the JSON source data.
Note that the following view validates the data is valid JSON source. Kinesis stream names are case sensitive and can contain both uppercase and lowercase letters. To use case-sensitive identifiers, you can set the configuration setting
First, store stream records in semi-structured SUPER format. In this example, the JSON source is stored in Redshift without converting to Redshift types.
下面这两个仅做参考,实验实际没有用到。
In contrast, in the following materialized view definition, the materialized view has a defined schema in Redshift. The materialized view is distributed on the UUID value from the stream and is sorted by the
To turn on auto refresh, use
Metadata columns include the following:
Refresh the view, which invokes Amazon Redshift to read from the stream and load data into the materialized view.
Query data in the materialized view.
Query the stream
Query the refreshed materialized view to get usage statistics.
References
JSON_PARSE function
CAN_JSON_PARSE function
Getting started with streaming ingestion from Amazon Kinesis Data Streams和Electric vehicle station-data streaming ingestion tutorial, using Kinesis结合起来做实验。
1. 给Redshift的IAM role增加一个inline policy:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStreamSummary", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream" ], "Resource": "arn:aws:kinesis:*:<111122223333>:stream/*" }, { "Sid": "ListStream", "Effect": "Allow", "Action": [ "kinesis:ListStreams", "kinesis:ListShards" ], "Resource": "*" } ] }
2. Amazon Redshift cluster should have a route to get to the Kinesis Data Streams endpoints, either over the Internet using a NAT gateway or internet gateway.
This procedure demonstrates how to ingest data from a Kinesis stream named ev_station_data, which contains consumption data from different EV charging stations, in JSON format. The schema is well defined. The example shows how to store the data as raw JSON and also how to convert the JSON data to Amazon Redshift data types as it's ingested.
Producer setup
1. Using Amazon Kinesis Data Streams, follow the steps to create a stream named
ev_station_data
. Choose On-demand for the Capacity mode.2. The Amazon Kinesis Data Generator can help you generate test data for use with your stream. Follow the steps detailed in the tool to get started, and use the following data template for generating your data:
{ "_id" : "{{random.uuid}}", "clusterID": "{{random.number( { "min":1, "max":50 } )}}", "connectionTime": "{{date.now("YYYY-MM-DD HH:mm:ss")}}", "kWhDelivered": "{{commerce.price}}", "stationID": "{{random.number( { "min":1, "max":467 } )}}", "spaceID": "{{random.word}}-{{random.number( { "min":1, "max":20 } )}}", "timezone": "America/Los_Angeles", "userID": "{{random.number( { "min":1000, "max":500000 } )}}" }Each JSON object in the stream data has the following properties:
{ "_id": "12084f2f-fc41-41fb-a218-8cc1ac6146eb", "clusterID": "49", "connectionTime": "2022-01-31 13:17:15", "kWhDelivered": "74.00", "stationID": "421", "spaceID": "technologies-2", "timezone": "America/Los_Angeles", "userID": "482329" }
Amazon Redshift setup
Configure the materialized view to ingest data
1. Create an external schema to map the data from Kinesis to a schema.
CREATE EXTERNAL SCHEMA evdata FROM KINESIS IAM_ROLE 'arn:aws:iam::<111122223333>:role/myRedshiftRole';Streaming ingestion for Kinesis Data Streams uses the IAM role defined in the
CREATE EXTERNAL SCHEMA
statement for making Kinesis Data Streams requests.2. Create a materialized view to consume the stream data.
The following examples show both methods of defining materialized views to ingest the JSON source data.
Note that the following view validates the data is valid JSON source. Kinesis stream names are case sensitive and can contain both uppercase and lowercase letters. To use case-sensitive identifiers, you can set the configuration setting
enable_case_sensitive_identifier
to true
at either the session or cluster level. For more information, see Names and identifiers and enable_case_sensitive_identifier.First, store stream records in semi-structured SUPER format. In this example, the JSON source is stored in Redshift without converting to Redshift types.
下面这两个仅做参考,实验实际没有用到。
CREATE MATERIALIZED VIEW my_view AUTO REFRESH YES AS SELECT approximate_arrival_timestamp, JSON_PARSE(kinesis_data) as Data FROM schema_one.my_stream_name WHERE CAN_JSON_PARSE(kinesis_data);
CREATE MATERIALIZED VIEW ev_station_data AS SELECT approximate_arrival_timestamp, partition_key, shard_id, sequence_number, json_parse(kinesis_data) as payload FROM evdata."ev_station_data" WHERE can_json_parse(kinesis_data);
In contrast, in the following materialized view definition, the materialized view has a defined schema in Redshift. The materialized view is distributed on the UUID value from the stream and is sorted by the
approximatearrivaltimestamp
value.CREATE MATERIALIZED VIEW ev_station_data_extract DISTKEY(6) sortkey(1) AUTO REFRESH YES AS SELECT refresh_time, approximate_arrival_timestamp, partition_key, shard_id, sequence_number, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'_id',true)::character(36) as ID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'clusterID',true)::varchar(30) as clusterID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'connectionTime',true)::varchar(20) as connectionTime, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'kWhDelivered',true)::DECIMAL(10,2) as kWhDelivered, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'stationID',true)::DECIMAL(10,2) as stationID, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'spaceID',true)::varchar(100) as spaceID, json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'timezone',true)::varchar(30)as timezone, json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'userID',true)::varchar(30) as userID FROM evdata."ev_station_data" WHERE LENGTH(kinesis_data) < 65355;
To turn on auto refresh, use
AUTO REFRESH YES
. The default behavior is manual refresh.Metadata columns include the following:
Metadata column | Data type | Description |
---|---|---|
approximate_arrival_timestamp | timestamp without time zone | The approximate time that the record was inserted into the Kinesis stream |
partition_key | varchar(256) | The key used by Kinesis to assign the record to a shard |
shard_id | char(20) | The unique identifier of the shard within the stream from which the record was retrieved |
sequence_number | varchar(128) | The unique identifier of the record from the Kinesis shard |
refresh_time | timestamp without time zone | The time the refresh started |
kinesis_data | varbyte | The record from the Kinesis stream |
Refresh the view, which invokes Amazon Redshift to read from the stream and load data into the materialized view.
REFRESH MATERIALIZED VIEW ev_station_data;
Query data in the materialized view.
select * from ev_station_data;Result:
approximate_arrival_timestamp | partition_key | shard_id | sequence_number | payload |
2023-03-29 12:59:19.776 | 805097471 | shardId-000000000002 | 49639333974848913368740266442939676013780992094388617250 | {"_id":"f93c2d1a-3005-4fd6-8677-bfe585ff14cb","clusterID":"16","connectionTime":"2023-03-29 20:59:17","kWhDelivered":"284.00","stationID":"178","spaceID":"indexing-1","timezone":"America/Los_Angeles","userID":"205532"} |
2023-03-29 12:59:19.776 | 7689325901 | shardId-000000000002 | 49639333974848913368740266442940884939600606723563323426 | {"_id":"341b1ea1-9ce9-4344-82c6-b28ca628f92c","clusterID":"7","connectionTime":"2023-03-29 20:59:17","kWhDelivered":"659.00","stationID":"283","spaceID":"Officer-2","timezone":"America/Los_Angeles","userID":"27695"} |
2023-03-29 12:59:19.776 | 5391718530 | shardId-000000000002 | 49639333974848913368740266442942093865420221352738029602 | {"_id":"524a4b73-df12-4af0-bc82-55d4b3f1065d","clusterID":"11","connectionTime":"2023-03-29 20:59:17","kWhDelivered":"438.00","stationID":"40","spaceID":"Towels-20","timezone":"America/Los_Angeles","userID":"93873"} |
2023-03-29 12:59:19.776 | 7199931921 | shardId-000000000002 | 49639333974848913368740266442943302791239835981912735778 | {"_id":"7c3d8fb3-15e6-4c1f-8cc9-691ed6b8f34a","clusterID":"41","connectionTime":"2023-03-29 20:59:17","kWhDelivered":"965.00","stationID":"333","spaceID":"incubate-4","timezone":"America/Los_Angeles","userID":"94817"} |
2023-03-29 12:59:19.776 | 2773909401 | shardId-000000000002 | 49639333974848913368740266442944511717059450611087441954 | {"_id":"54de3369-324f-43df-8ee6-892f17aecd08","clusterID":"12","connectionTime":"2023-03-29 20:59:17","kWhDelivered":"567.00","stationID":"115","spaceID":"Outdoors-9","timezone":"America/Los_Angeles","userID":"473510"} |
… |
Query the stream
Query the refreshed materialized view to get usage statistics.
SELECT to_timestamp(connectionTime, 'YYYY-MM-DD HH24:MI:SS') as connectiontime ,SUM(kWhDelivered) AS Energy_Consumed ,count(distinct userID) AS #Users from ev_station_data_extract group by to_timestamp(connectionTime, 'YYYY-MM-DD HH24:MI:SS') order by 1 desc;Result:
connectiontime | energy_consumed | #users |
2023-03-29 21:04:22+00 | 49897 | 100 |
2023-03-29 21:04:01+00 | 50838 | 100 |
2023-03-29 21:04:00+00 | 48724 | 100 |
2023-03-29 21:03:58+00 | 52645 | 100 |
... |
References
JSON_PARSE function
CAN_JSON_PARSE function