Amazon Redshift Database Developer Guide - Create Streaming ingestion into materialized view

2023年03月29日


Getting started with streaming ingestion from Amazon Kinesis Data StreamsElectric vehicle station-data streaming ingestion tutorial, using Kinesis结合起来做实验。

1. 给Redshift的IAM role增加一个inline policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ReadStream",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStreamSummary",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:*:<111122223333>:stream/*"
        },
        {
            "Sid": "ListStream",
            "Effect": "Allow",
            "Action": [
                "kinesis:ListStreams",
                "kinesis:ListShards"
            ],
            "Resource": "*"
        }
    ]
}

2. Amazon Redshift cluster should have a route to get to the Kinesis Data Streams endpoints, either over the Internet using a NAT gateway or internet gateway.

This procedure demonstrates how to ingest data from a Kinesis stream named ev_station_data, which contains consumption data from different EV charging stations, in JSON format. The schema is well defined. The example shows how to store the data as raw JSON and also how to convert the JSON data to Amazon Redshift data types as it's ingested.

Producer setup

1. Using Amazon Kinesis Data Streams, follow the steps to create a stream named ev_station_data. Choose On-demand for the Capacity mode.

2. The Amazon Kinesis Data Generator can help you generate test data for use with your stream. Follow the steps detailed in the tool to get started, and use the following data template for generating your data:
{
    
   "_id" : "{{random.uuid}}",
   "clusterID": "{{random.number(
        {   "min":1,
            "max":50
        }
    )}}", 
    "connectionTime": "{{date.now("YYYY-MM-DD HH:mm:ss")}}",
    "kWhDelivered": "{{commerce.price}}",
    "stationID": "{{random.number(
        {   "min":1,
            "max":467
        }
    )}}",
      "spaceID": "{{random.word}}-{{random.number(
        {   "min":1,
            "max":20
        }
    )}}",
 
   "timezone": "America/Los_Angeles",
   "userID": "{{random.number(
        {   "min":1000,
            "max":500000
        }
    )}}"
}
Each JSON object in the stream data has the following properties:
{
    "_id": "12084f2f-fc41-41fb-a218-8cc1ac6146eb",
    "clusterID": "49",
    "connectionTime": "2022-01-31 13:17:15",
    "kWhDelivered": "74.00",
    "stationID": "421",
    "spaceID": "technologies-2",
    "timezone": "America/Los_Angeles",
    "userID": "482329"
}


Amazon Redshift setup
Configure the materialized view to ingest data

1. Create an external schema to map the data from Kinesis to a schema.
CREATE EXTERNAL SCHEMA evdata
FROM KINESIS
IAM_ROLE 'arn:aws:iam::<111122223333>:role/myRedshiftRole';
Streaming ingestion for Kinesis Data Streams uses the IAM role defined in the CREATE EXTERNAL SCHEMA statement for making Kinesis Data Streams requests.


2. Create a materialized view to consume the stream data.
The following examples show both methods of defining materialized views to ingest the JSON source data.
Note that the following view validates the data is valid JSON source. Kinesis stream names are case sensitive and can contain both uppercase and lowercase letters. To use case-sensitive identifiers, you can set the configuration setting enable_case_sensitive_identifier to true at either the session or cluster level. For more information, see Names and identifiers and enable_case_sensitive_identifier.
First, store stream records in semi-structured SUPER format. In this example, the JSON source is stored in Redshift without converting to Redshift types.

下面这两个仅做参考,实验实际没有用到。

CREATE MATERIALIZED VIEW my_view AUTO REFRESH YES AS
SELECT approximate_arrival_timestamp,
JSON_PARSE(kinesis_data) as Data
FROM schema_one.my_stream_name
WHERE CAN_JSON_PARSE(kinesis_data);

CREATE MATERIALIZED VIEW ev_station_data AS
    SELECT approximate_arrival_timestamp,
    partition_key,
    shard_id,
    sequence_number,
    json_parse(kinesis_data) as payload
    FROM evdata."ev_station_data" WHERE can_json_parse(kinesis_data);

In contrast, in the following materialized view definition, the materialized view has a defined schema in Redshift. The materialized view is distributed on the UUID value from the stream and is sorted by the approximatearrivaltimestamp value.
CREATE MATERIALIZED VIEW ev_station_data_extract DISTKEY(6) sortkey(1) AUTO REFRESH YES AS
    SELECT refresh_time,
    approximate_arrival_timestamp,
    partition_key,
    shard_id,
    sequence_number,
    json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'_id',true)::character(36) as ID,
    json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'clusterID',true)::varchar(30) as clusterID,
    json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'connectionTime',true)::varchar(20) as connectionTime,
    json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'kWhDelivered',true)::DECIMAL(10,2) as kWhDelivered,
    json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'stationID',true)::DECIMAL(10,2) as stationID,
    json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'spaceID',true)::varchar(100) as spaceID,
    json_extract_path_text(from_varbyte(kinesis_data, 'utf-8'),'timezone',true)::varchar(30)as timezone,
    json_extract_path_text(from_varbyte(kinesis_data,'utf-8'),'userID',true)::varchar(30) as userID
    FROM evdata."ev_station_data"
    WHERE LENGTH(kinesis_data) < 65355;

To turn on auto refresh, use AUTO REFRESH YES. The default behavior is manual refresh.
Metadata columns include the following:
Metadata columnData typeDescription
approximate_arrival_timestamp timestamp without time zone The approximate time that the record was inserted into the Kinesis stream
partition_key varchar(256) The key used by Kinesis to assign the record to a shard
shard_id char(20) The unique identifier of the shard within the stream from which the record was retrieved
sequence_number varchar(128) The unique identifier of the record from the Kinesis shard
refresh_time timestamp without time zone The time the refresh started
kinesis_data varbyte The record from the Kinesis stream


Refresh the view, which invokes Amazon Redshift to read from the stream and load data into the materialized view.
REFRESH MATERIALIZED VIEW ev_station_data;


Query data in the materialized view.
select * from ev_station_data;
Result:
approximate_arrival_timestamp partition_key shard_id sequence_number payload
2023-03-29 12:59:19.776 805097471 shardId-000000000002 49639333974848913368740266442939676013780992094388617250 {"_id":"f93c2d1a-3005-4fd6-8677-bfe585ff14cb","clusterID":"16","connectionTime":"2023-03-29 20:59:17","kWhDelivered":"284.00","stationID":"178","spaceID":"indexing-1","timezone":"America/Los_Angeles","userID":"205532"}
2023-03-29 12:59:19.776 7689325901 shardId-000000000002 49639333974848913368740266442940884939600606723563323426 {"_id":"341b1ea1-9ce9-4344-82c6-b28ca628f92c","clusterID":"7","connectionTime":"2023-03-29 20:59:17","kWhDelivered":"659.00","stationID":"283","spaceID":"Officer-2","timezone":"America/Los_Angeles","userID":"27695"}
2023-03-29 12:59:19.776 5391718530 shardId-000000000002 49639333974848913368740266442942093865420221352738029602 {"_id":"524a4b73-df12-4af0-bc82-55d4b3f1065d","clusterID":"11","connectionTime":"2023-03-29 20:59:17","kWhDelivered":"438.00","stationID":"40","spaceID":"Towels-20","timezone":"America/Los_Angeles","userID":"93873"}
2023-03-29 12:59:19.776 7199931921 shardId-000000000002 49639333974848913368740266442943302791239835981912735778 {"_id":"7c3d8fb3-15e6-4c1f-8cc9-691ed6b8f34a","clusterID":"41","connectionTime":"2023-03-29 20:59:17","kWhDelivered":"965.00","stationID":"333","spaceID":"incubate-4","timezone":"America/Los_Angeles","userID":"94817"}
2023-03-29 12:59:19.776 2773909401 shardId-000000000002 49639333974848913368740266442944511717059450611087441954 {"_id":"54de3369-324f-43df-8ee6-892f17aecd08","clusterID":"12","connectionTime":"2023-03-29 20:59:17","kWhDelivered":"567.00","stationID":"115","spaceID":"Outdoors-9","timezone":"America/Los_Angeles","userID":"473510"}
       


Query the stream

Query the refreshed materialized view to get usage statistics.
SELECT to_timestamp(connectionTime, 'YYYY-MM-DD HH24:MI:SS') as connectiontime
,SUM(kWhDelivered) AS Energy_Consumed 
,count(distinct userID) AS #Users
from ev_station_data_extract
group by  to_timestamp(connectionTime, 'YYYY-MM-DD HH24:MI:SS')
order by 1 desc;
Result:
connectiontime energy_consumed #users
2023-03-29 21:04:22+00 49897 100
2023-03-29 21:04:01+00 50838 100
2023-03-29 21:04:00+00 48724 100
2023-03-29 21:03:58+00 52645 100
...    


References

JSON_PARSE function

CAN_JSON_PARSE function


Category: data Tags: public

Upvote


Downvote