AWS-Optimize Athena Query Performance

2019年02月04日

Prologue

For those who own data-sets to some extent of scale, one of the most urgent tasks is to gain insight of that data and make it produce value. Fortunately, nowadays mainstream public cloud vendors provide data analytical tools that are out of box. But, there are still some tasks left to you, for example, to orchestrate an ETL and data analytical pipeline addressing business requirements, and to reduce the time spent on data analytics. So how will you solve these?

Hello, I am Leo Du, a builder on Google Cloud and AWS Cloud. Today, I will be speaking to you about how I optimize the query performance towards a data analytical pipeline, which is built with AWS Glue, Athena, QuickSight and etc.
*

Background

About a week ago, I implemented a data analytics and visualization pipeline to consume ALB access logs. It helped me understand what is happening on the ALB, and learn user behaviors towards the Blog application. I have written an article regarding that topic. If you have not read it yet, please read that article before continuing reading this post.

And in this article, I will be focusing on optimizing Athena query performance, and on trying to reduce relevant cost.
*

Objective

The overall objective to enhance query performance against Athena database.

The question here is how we organize the process to make the data flow from S3 into Athena in a most cost-efficient way. Meanwhile, the overall architecture should be changed as little as possible.

*

Solution

ALB generates access logs in CSV format. Previously, I used the data of that format as Athena's data source in the pipeline. But is that format the optimal one to serve Athena query?

If it is not, I need to use Glue ETL tasks to convert the log data into a query-optimized format. After that, Glue Crawler will be used to build data catalog metadata upon the new data.

Till now, we have depicted an overall architecture illustrated as below. But there is still one question left open: which data format should be chosen as the source of Athena.

Glue jobs can help transform data to a format that optimizes query performance in Athena. Data formats have a large impact on query performance and query costs in Athena. It is recommend that Parquet and ORC data formats should be used for query in Athena. Glue supports writing to both of these data formats, which can make it easier and faster to transform data to an optimal format for Athena.

Below is the final architecture.


*

Implementation

The implementation steps are explained in the same sequence of the ETL data flow. Here, I will list the important steps and highlight the places that we need to pay attention to.

Build Data Catalog of Raw Data

The raw ALB access logs are stored in a location similar to this: "s3://<S3 Bucket Name>/<Prefix>/AWSLogs/<AWS Account ID>/elasticloadbalancing/".

Configure the Glue Crawler to build an Apache Hive-compatible metadata of these log data.

If the crawling completes successfully, there will be a new Glue Table appears in the Glue Database that you specified.

Nota bene
Each time data for a new partition is added, the crawling job should re-run to add the new partition information to metadata.

Because crawling the S3 objects will produce cost, I will use CloudWatch Events and Lambda instead of using Crawler to update partition information of the raw and Parquet tables.


Postscript
It costs $0.44 per Data Processing Unit-Hour for Glue Crawler at Oregon region.
*

Update Glue Table Attributes

For some reason, the ALB access logs could not be automatically classified by Glue Crawler. And thus, after the crawling completes, the Glue Table should be updated manually before it can be queried via Athena and be performed ETL tasks against.

CSV files occasionally have quotes around the data values intended for each column, and there may be header values included in CSV files, which aren't part of the data to be analyzed. When you use AWS Glue to create schema from these files, follow the guidance in this section.
The LazySimpleSerDe as the serialization library, which is a good choice for type inference. However, if the CSV data contains quoted strings, edit the table definition and change the SerDe library to OpenCSVSerDe.
Field Value Comments
Classification csv ETL jobs will fail if "classification" property is not specified.
Input format org.apache.hadoop.mapred.TextInputFormat  
Output format org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat  
Serde serialization lib org.apache.hadoop.hive.serde2.OpenCSVSerde  
Serde parameters - 
escapeChar
\ The escapeChar value is the backslash (\).
Serde parameters - 
quoteChar
" The quoteChar value is double quotes (").
Serde parameters - 
separatorChar
  Nota bene
This is a blank. In other words, the log is deemed as separated by blank.

A SerDe is a custom library that tells the data catalog used by Athena how to handle the data (which format is used and how to parse the data).

Nota bene
If the SerDe is not specified properly, the future ETL job will fail to generate Parquet data in the correct format. This is true even if certain SerDe configuration of Crawler guarantees Athena return query result as expected. A bad example is shown as below.
Field Value
Classification csv
Input format org.apache.hadoop.mapred.TextInputFormat
Output format org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Serde serialization lib org.apache.hadoop.hive.serde2.RegexSerDe
Serde parameters - 
serialization.format
1
Serde parameters - 
input.regex
([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*):([0-9]*) ([^ ]*)[:-]([0-9]*) ([-.0-9]*) ([-.0-9]*) ([-.0-9]*) (|[-0-9]*) (-|[-0-9]*) ([-0-9]*) ([-0-9]*) \"([^ ]*) ([^ ]*) (- |[^ ]*)\" \"([^\"]*)\" ([A-Z0-9-]+) ([A-Za-z0-9.-]*) ([^ ]*) \"([^\"]*)\" \"([^\"]*)\" \"([^\"]*)\" ([-.0-9]*) ([^ ]*) \"([^\"]*)\" \"([^\"]*)\"($| \"[^ ]*\")(.*)

The table schema should also be adjusted to reflect the real data fields. Based on the syntax of ALB access log entries (refer to Access Logs for Your Application Load Balancer), the schema should have columns in the order shown as below.
Sequence Column Name Data Type
1 type string
2 time string
3 elb string
4 client_ip_and_port string
5 target_ip_and_port string
6 request_processing_time double
7 target_processing_time double
8 response_processing_time double
9 elb_status_code string
10 target_status_code string
11 received_bytes bigint
12 sent_bytes bigint
13 request string
14 user_agent string
15 ssl_cipher string
16 ssl_protocol string
17 target_group_arn string
18 trace_id string
19 domain_name string
20 chosen_cert_arn string
21 matched_rule_priority string
22 request_creation_time string
23 actions_executed string
24 redirect_url string
25 lambda_error_reason string
26 new_field string

Modify recognized partition keys as shown below.
Sequence Column Name Data Type
1 region string
2 year string
3 month string
4 day string
*

Update Glue Crawler Configuration

Update Glue Crawler Configuration to run daily, so that new partition information could be added to metadata after new partition is created. The new partitions are those have partition keys such as "year", "month", and "day", which implies the partition information needs to be updated daily. Here I set the crawling task to run at 00:05 AM UTC every day.

Postscript
The first object is stored in S3 a little bit after UTC 0:00.

Due to the Crawler is set to run repeatedly and we don't want the manually update table attributes to be overwritten by the next Crawler run.
  • Select "Ignore the change and don't update the table in the data catalog"
  • Check "Update all new and existing partitions with metadata from the table"
*
*

Perform Glue ETL tasks

Configure Glue ETL tasks by going through the wizard, and in the end, it will generate the ETL code for you to continue working on.

This ETL task is used to extract the raw data table created by the last step, transform the data format from CSV format into Parquet, and load the result data into S3 of a new location.
[Glue ETL #1]

[Glue ETL #2]

Before transform data format, I need to convert a DynamicFrame to a Spark DataFrame by converting DynamicRecords into DataFrame fields.
[Glue ETL #3]

Because I am more interested in the client_ip column, which to some extent identifies a unique viewer, I will split the "client_ip_and_port" column into two, namely "client_ip" and "client_port", and split the "target_ip_and_port" column into "target_ip" and "target_port" columns, and split the "request" column into three columns, that is, "request_verb", "request_url", and "request_proto". After this step, it would be more easy to execute query against client_ip and request_url columns.
[Glue ETL #4]

Next, I will convert a DataFrame to a DynamicFrame by converting DataFrame fields to DynamicRecord fields.
[Glue ETL #5]

After data transformation is done, re-map the data columns from source to target.
[Glue ETL #6]

In the end, write dynamic frame with partition created.
[Glue ETL #7]
*

Schedule Glue ETL tasks

Add a trigger to fire the ETL job with job bookmark enabled. Schedule it to run as needed, for example, hourly or daily. The first of each day's ETL task runs at 0:10 AM UTC.

Postscript
Glue tracks data that has already been processed during a previous run of an ETL job by persisting state information from the job run. This persisted state information is called a job bookmark.

With ETL jobs scheduled and job bookmark enabled, the ETL task will only process incremental data and store the transformed data into the specified location.

*

Build Data Catalog of Parquet Data

Note that the ETL task stores the result data into below S3 path, configure the Glue Crawler to build an metadata store using that S3 path as the source.
s3://<S3 Bucket Name>/<Prefix>/

Configure Glue Crawler of the Parquet data to run daily, so that new partition information could be added to metadata after new partition is created. The new partitions are those have partition keys such as "year", "month", and "day", which implies the partition information needs to be updated daily. Here I set the crawling task to run at 00:25 AM UTC every day, right after each day's first ETL task is completed.

This time, the Glue Crawler could automatically recognize all the columns.

When the job has finished, a new table will be created for the Parquet data, with all settings correctly configured.

Nota bene
Each time the ETL task is executed (for example, scheduled ETL tasks), the crawling job should re-run to add the new partition information to metadata.


*

Verification

*
Compare the amount of scanned data of the CSV data-set with that of the Parquet data-set.
[Verification #1]
Below screenshot shows the query run time and the amount of data that have been scanned upon the CSV data.


[Verification #2]
Below screenshot shows the query run time and the amount of data that have been scanned upon the Parquet data.

*

Conclusion

After we performed the query optimization, it is observed that the query takes much shorter time and scanned much less data.
*
Reference
Grok and custom pattern example

GitHub aws-samples/aws-glue-samples

GitHub - awslabs/athena-glue-service-logs

Built-In Classifiers in AWS Glue

Build a Data Lake Foundation with AWS Glue and Amazon S3

Code Example: Data Preparation Using ResolveChoice, Lambda, and ApplyMapping

Supported SerDes and Data Formats

SerDe Reference

Work with partitioned data in AWS Glue
(This document uses Scala)

Best Practices When Using Athena with AWS Glue

Querying Application Load Balancer Logs
(This document describes how to directly query ALB logs via Athena.)

Top 10 Performance Tuning Tips for Amazon Athena
*

Category: big_data Tags: public

Upvote


Downvote