AWS Glue, Athena, and QuickSight - Get Insights into New York City Taxi Records

2018年12月19日


缩略语表
缩写 全称
TLC Taxi and Limousine Commission
yellow New York City's medallion taxis
green street hail livery taxis
FHV for-hire vehicle

Data Analytics Services Architecture



Explain Sample Data
In each trip record dataset, one row represents a single trip made by a TLC-licensed vehicle.

Yellow
Trips made by New York City’s iconic yellow taxis have been recorded and provided to the TLC since 2009. Yellow taxis are traditionally hailed by signaling to a driver who is on duty and seeking a passenger (street hail), but now they may also be hailed using an e-hail app like Curb or Arro. Yellow taxis are the only vehicles permitted to respond to a street hail from a passenger in all five boroughs.

Records include fields capturing pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts. The records were collected and provided to the NYC TLC by technology service providers. The trip data was not created by the TLC, and TLC cannot guarantee their accuracy.

样例数据展示
s3://serverless-analytics/glue-blog/yellow/yellow_tripdata_2016-01.csv

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
2,2016-01-01 00:00:00,2016-01-01 00:00:00,2,1.10,-73.990371704101563,40.734695434570313,1,N,-73.981842041015625,40.732406616210937,2,7.5,0.5,0.5,0,0,0.3,8.8
2,2016-01-01 00:00:00,2016-01-01 00:00:00,5,4.90,-73.980781555175781,40.729911804199219,1,N,-73.944473266601563,40.716678619384766,1,18,0.5,0.5,0,0,0.3,19.3
2,2016-01-01 00:00:00,2016-01-01 00:00:00,1,10.54,-73.984550476074219,40.6795654296875,1,N,-73.950271606445313,40.788925170898438,1,33,0.5,0.5,0,0,0.3,34.3
2,2016-01-01 00:00:00,2016-01-01 00:00:00,1,4.75,-73.99346923828125,40.718990325927734,1,N,-73.962242126464844,40.657333374023437,2,16.5,0,0.5,0,0,0.3,17.3
2,2016-01-01 00:00:00,2016-01-01 00:00:00,3,1.76,-73.960624694824219,40.781330108642578,1,N,-73.977264404296875,40.758514404296875,2,8,0,0.5,0,0,0.3,8.8
2,2016-01-01 00:00:00,2016-01-01 00:18:30,2,5.52,-73.980117797851563,40.743049621582031,1,N,-73.913490295410156,40.763141632080078,2,19,0.5,0.5,0,0,0.3,20.3
2,2016-01-01 00:00:00,2016-01-01 00:26:45,2,7.45,-73.994056701660156,40.719989776611328,1,N,-73.966361999511719,40.789871215820313,2,26,0.5,0.5,0,0,0.3,27.3
1,2016-01-01 00:00:01,2016-01-01 00:11:55,1,1.20,-73.979423522949219,40.744613647460938,1,N,-73.992034912109375,40.753944396972656,2,9,0.5,0.5,0,0,0.3,10.3
1,2016-01-01 00:00:02,2016-01-01 00:11:14,1,6.00,-73.947151184082031,40.791046142578125,1,N,-73.920768737792969,40.865577697753906,2,18,0.5,0.5,0,0,0.3,19.3
...

Data dictionary
Field Name Description
VendorID A code indicating the TPEP provider that provided the record.

1= Creative Mobile Technologies, LLC; 2= VeriFone Inc.
tpep_pickup_datetime The date and time when the meter was engaged.
tpep_dropoff_datetime The date and time when the meter was disengaged.
passenger_count The number of passengers in the vehicle. This is a driver-entered value.
trip_distance The elapsed trip distance in miles reported by the taximeter.
RateCodeID The final rate code in effect at the end of the trip.

1= Standard rate
2=JFK
3=Newark

4=Nassau or Westchester
5=Negotiated fare
6=Group ride
store_and_fwd_flag This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka "store and forward," because the vehicle did not have a connection to the server.

Y= store and forward trip
N= not a store and forward trip
dropoff_longitude Longitude where the meter was disengaged.
dropoff_ latitude Latitude where the meter was disengaged.
payment_type A numeric code signifying how the passenger paid for the trip.

1= Credit card
2= Cash
3= No charge
4= Dispute
5= Unknown
6= Voided trip
fare_amount The time-and-distance fare calculated by the meter.
extra Miscellaneous extras and surcharges. Currently, this only includes the $0.50 and $1 rush hour and overnight charges.
mta_tax $0.50 MTA tax that is automatically triggered based on the metered rate in use.
improvement_surcharge $0.30 improvement surcharge assessed trips at the flag drop. The improvement surcharge began being levied in 2015.
tip_amount Tip amount – This field is automatically populated for credit card tips. Cash tips are not included.
tolls_amount Total amount of all tolls paid in trip.
total_amount The total amount charged to passengers. Does not include cash tips.


Green
Green taxis, also known as boro taxis and street-hail liveries, were introduced in August of 2013 to improve taxi service and availability in the boroughs. Green taxis may respond to street hails, but only in the areas indicated in green on the map (i.e. above W 110 St/E 96th St in Manhattan and in the boroughs).

Records include fields capturing pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts.

样例数据展示
s3://serverless-analytics/glue-blog/green/green_tripdata_2016-01.csv
VendorID,lpep_pickup_datetime,Lpep_dropoff_datetime,Store_and_fwd_flag,RateCodeID,Pickup_longitude,Pickup_latitude,Dropoff_longitude,Dropoff_latitude,Passenger_count,Trip_distance,Fare_amount,Extra,MTA_tax,Tip_amount,Tolls_amount,Ehail_fee,improvement_surcharge,Total_amount,Payment_type,Trip_type
2,2016-01-01 00:29:24,2016-01-01 00:39:36,N,1,-73.928642272949219,40.680610656738281,-73.924278259277344,40.698043823242188,1,1.46,8,0.5,0.5,1.86,0,,0.3,11.16,1,1
2,2016-01-01 00:19:39,2016-01-01 00:39:18,N,1,-73.952674865722656,40.723175048828125,-73.923919677734375,40.761379241943359,1,3.56,15.5,0.5,0.5,0,0,,0.3,16.8,2,1
2,2016-01-01 00:19:33,2016-01-01 00:39:48,N,1,-73.971611022949219,40.676105499267578,-74.013160705566406,40.646072387695313,1,3.79,16.5,0.5,0.5,4.45,0,,0.3,22.25,1,1
2,2016-01-01 00:22:12,2016-01-01 00:38:32,N,1,-73.989501953125,40.669578552246094,-74.000648498535156,40.689033508300781,1,3.01,13.5,0.5,0.5,0,0,,0.3,14.8,2,1
2,2016-01-01 00:24:01,2016-01-01 00:39:22,N,1,-73.964729309082031,40.682853698730469,-73.940719604492188,40.663013458251953,1,2.55,12,0.5,0.5,0,0,,0.3,13.3,2,1
2,2016-01-01 00:32:59,2016-01-01 00:39:35,N,1,-73.891143798828125,40.746456146240234,-73.867744445800781,40.742111206054688,1,1.37,7,0.5,0.5,0,0,,0.3,8.3,2,1
2,2016-01-01 00:34:42,2016-01-01 00:39:21,N,1,-73.896675109863281,40.746196746826172,-73.886192321777344,40.745689392089844,1,.57,5,0.5,0.5,0,0,,0.3,6.3,2,1
2,2016-01-01 00:31:23,2016-01-01 00:39:36,N,1,-73.953353881835937,40.803558349609375,-73.949150085449219,40.794120788574219,1,1.01,7,0.5,0.5,0,0,,0.3,8.3,2,1
...

Data dictionary
Field Name Description
VendorID A code indicating the LPEP provider that provided the record.

1= Creative Mobile Technologies, LLC;
2= VeriFone Inc.
lpep_pickup_datetime The date and time when the meter was engaged.
lpep_dropoff_datetime The date and time when the meter was disengaged.
Passenger_count The number of passengers in the vehicle.

This is a driver-entered value.
Trip_distance The elapsed trip distance in miles reported by the taximeter.
Pickup_longitude Longitude where the meter was engaged.
Pickup_latitude Latitude where the meter was engaged.
RateCodeID The final rate code in effect at the end of the trip.

1= Standard rate
2=JFK
3=Newark
4=Nassau or Westchester
5=Negotiated fare
6=Group ride
Store_and_fwd_flag This flag indicates whether the trip record was held in vehicle memory before sending to the vendor, aka “store and forward,” because the vehicle did not have a connection to the server.

Y= store and forward trip
N= not a store and forward trip
Dropoff_longitude Longitude where the meter was timed off.
Dropoff_ latitude Latitude where the meter was timed off
Payment_type A numeric code signifying how the passenger paid for the trip.

1= Credit card
2= Cash
3= No charge
4= Dispute
5= Unknown
6= Voided trip
Fare_amount The time-and-distance fare calculated by the meter.
Extra Miscellaneous extras and surcharges. Currently, this only includes the $0.50 and $1 rush hour and overnight charges.
MTA_tax $0.50 MTA tax that is automatically triggered based on the metered rate in use.
improvement_surcharge $0.30 improvement surcharge assessed on hailed trips at the flag drop. The improvement surcharge began being levied in 2015.
Tip_amount Tip amount – This field is automatically populated for credit card tips. Cash tips are not included.
Tolls_amount Total amount of all tolls paid in trip. 
Total_amount The total amount charged to passengers. Does not include cash tips.
Trip_type A code indicating whether the trip was a street-hail or a dispatch that is automatically assigned based on the metered rate in use but can be altered by the driver.

1= Street-hail
2= Dispatch


FHV
FHV data includes trip data from high-volume for-hire vehicle bases (bases for companies dispatching 10,000+ trip per day, meaning Uber, Lyft, Via, and Juno), community livery bases, luxury limousine bases, and black car bases.

The TLC began receiving FHV trip data from bases in 2015, but the amount of information that has been provided has changed over time. In 2015, only the dispatching base number, pickup datetime, and the location of the pickup (see section on matching zone IDs below) were provided to the TLC. In summer of 2017, the TLC mandated that the companies provide the drop-off date/time and the drop-off location. In 2017, the TLC also started to receive information on shared rides, like those offered in services like Lyft Line and Uber Pool. A trip is only considered shared if it was reserved specially with one of these services. See note below for more information on shared rides. After the high volume license type was created in Feb 2019, a high-volume license number was added. This is an overall identifier for app companies who may have multiple base licenses.

样例数据展示
s3://serverless-analytics/glue-blog/fhv/fhv_tripdata_2016-01.csv
Dispatching_base_num,Pickup_date,locationID
B00001,2016-01-01 00:30:00,
B00001,2016-01-01 00:30:00,
B00001,2016-01-01 02:30:00,
B00001,2016-01-01 07:00:00,
B00001,2016-01-01 08:45:00,
B00001,2016-01-01 09:00:00,
...

从Glue的blog_fhv、blog_green、blog_yellow到Athena table "canonicaldata"的fields的映射关系(DB: "nycitytaxianalysis")。
Yellow   Green   Fhv   Athena
VendorID   VendorID       vendorid
tpep_pickup_datetime pickup_date lpep_pickup_datetime pickup_date Pickup_date   pickup_date
tpep_dropoff_datetime dropoff_date Lpep_dropoff_datetime dropoff_date     dropoff_date
dropoff_latitude   Dropoff_latitude       dropoff_latitude
dropoff_longitude   Dropoff_longitude       dropoff_longitude
extra   Extra       extra
fare_amount   Fare_amount       fare_amount
improvement_surcharge   improvement_surcharge       improvement_surcharge
mta_tax   MTA_tax       mta_tax
passenger_count   Passenger_count       passenger_count
payment_type   Payment_type       payment_type
pickup_latitude   Pickup_latitude       pickup_latitude
pickup_longitude   Pickup_longitude       pickup_longitude
RatecodeID   RateCodeID       ratecodeid
store_and_fwd_flag   Store_and_fwd_flag       store_and_fwd_flag
tip_amount   Tip_amount       tip_amount
tolls_amount   Tolls_amount       tolls_amount
total_amount   Total_amount       total_amount
trip_distance   Trip_distance       trip_distance
type("yellow")   type ("green")   type ("fhv")   type
    Ehail_fee (Athena table没有建这个field)   Dispatching_base_num (Athena table没有建这个field)    
    Trip_type (Athena table没有建这个field)   locationID (Athena table没有建这个field)    



Explain the data
The yellow and green taxi trip records include fields capturing pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts. The data used in the attached datasets were collected and provided to the NYC TLC by technology providers authorized under the Taxicab & Livery Passenger Enhancement Programs (TPEP/LPEP). The trip data was not created by the TLC, and TLC makes no representations as to the accuracy of these data.

The FHV trip records include fields capturing the dispatching base license number and the pick-up date, time, and taxi zone location ID (shape file below). These records are generated from the FHV Trip Record submissions made by bases. Note: The TLC publishes base trip record data as submitted by the bases, and we cannot guarantee or confirm their accuracy or completeness. Therefore, this may not represent the total amount of trips dispatched by all TLC-licensed bases. The TLC performs routine reviews of the records and takes enforcement actions when necessary to ensure, to the extent possible, complete and accurate information.

From: TLC Trip Record Data

Data dictionary
Field Name Description
Dispatching_base_num The TLC Base License Number of the base that dispatched the trip
Pickup_datetime The date and time of the trip pick-up


Discover the data


Add a Glue database in Glue (in this case, "nycitytaxianalysis").
CLI
$ aws glue create-database --database-input "{\"Name\":\"nycitytaxianalysis\"}"

Add a Glue crawler "NYCityTaxiCrawler" (Crawler name:NYCityTaxiCrawler). It infers the schemas, structure, and other properties of the data.
  • Data store, choose "S3" (Choose a data store: S3).
  • Crawl data in: "Specified path in another account".
  • Include path: "s3://serverless-analytics/glue-blog".
  • Add another data store: "No".
  • Choose "Choose an existing IAM role", then choose "AWSGlueServiceRoleDefault".
  • Database: select the database created earlier (in this case, it is "nycitytaxianalysis").
  • Frequency: "On demand".
  • Prefix added to tables / Table name prefix - optional: "blog_".
  • Finish.

Run crawler.



Once crawling is complete, the console will show below message:
Crawler "NYCityTaxiCrawler" completed and made the following changes: 3 tables created, 0 tables updated. See the tables created in database nycitytaxianalysis.

The Glue Data Catalog is shared to both Glue and Athena. This means once the database or the table is created in Glue, it will automatically appear in Athena's console. In this case, the tables are created by Glue Crawler.


Postscript
In a previous step setting up "Getting Started Using AWS Glue", we created an IAM role for Glue "AWSGlueServiceRoleDefault". For more information, refer to AWS-Glue-Getting Started Using AWS Glue-1_Setting up IAM Permissions for AWS Glue.

Now there are 3 new tables that were created under the database "nycitytaxianalysis", namely, blog_fhv, blog_green, and blog_yellow.


Looking into the "blog_yellow" table, the yellow dataset for January 2017 has 8.7 million rows (the "recordCount" in the "Table properties" shows there are 8717727 (rows)).



Optimize queries and get data into a canonical form


Define ETL Job for Yellow Data
Go to ETL, Jobs
Create a ETL job (in this case, Name: "NYCityTaxiYellow").
IAM role: "AWSGlueServiceRoleDefault".

Nota bene
IAM role should have permissions to write to a new S3 location in a bucket that you own
(Optional) Specify a location for the script and tmp space on S3, or use default values.
(legacy screenshot)




Choose a data source: "blog_yellow".
Choose a data target: "Create tables in your data target".
Data store: "Amazon S3".
Format: "Parquet".
Target path: "s3://YOUR-S3-BUCKET/glue-blog/" (e.g. YOUR-S3-BUCKET is a value, for example, aws-glue-temporary-<AWS-Account-ID>-us-west-2).

This will generate the file format and location (???) in the later ETL script.


Nota bene
Specify a new location (a new prefix location without any object) to store the results.


In the transform, rename the "tpep_pickup_datetime" and "tpep_dropoff_datetime" to be generic fields. In the raw data (e.g. the corresponding fields in yellow data are called as "tpep_pickup_datetime" and "tpep_dropoff_datetime", refer to the sample data "yellow_tripdata_2016-01.csv" above, and those in green data are called as "lpep_pickup_datetime" and "Lpep_dropoff_datetime", refer to the sample data "green_tripdata_2016-01.csv" above), these fields are listed differently, and you should use a common name. Change the data types to be timestamp rather than strings.
Old Name Target Name Target Data Type
tpep_pickup_datetime pickup_date timestamp
tpep_dropoff_datetime dropoff_date timestamp


Click the "Save job and edit script" button.

Now Glue creates a script.

Add the following code at the first part.
from pyspark.sql.functions import lit
from awsglue.dynamicframe import DynamicFrame

Find the last call before the the line that start with the datasink.
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://aws-glue-temporary-123456789012-us-west-2/glue-blog/"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://aws-glue-temporary-###-us-west-2/glue-blog/"}, format = "parquet", transformation_ctx = "datasink4")
This is the DynamicFrame that is being used to write out the data. Convert that into a DataFrame.
给来自这个数据源的每一条对应数据条目增加一个field,field的名称为type,每一个条目里这个field对应的值都是yellow。
##----------------------------------
#convert to a Spark DataFrame...
customDF = dropnullfields3.toDF()
 
#add a new column for "type"
customDF = customDF.withColumn("type", lit('yellow'))
 
# Convert back to a DynamicFrame for further processing.
customDynamicFrame = DynamicFrame.fromDF(customDF, glueContext, "customDF_df")
##----------------------------------

Insert above code snippet before below code:
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://aws-glue-temporary-123456789012-us-west-2/glue-blog/"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://aws-glue-temporary-###-us-west-2/glue-blog/"}, format = "parquet", transformation_ctx = "datasink4")

In the last datasink call, change the DynamicFrame to point to the new custom DynamicFrame created from the Spark DataFrame.
Change:
datasink4 = glueContext.write_dynamic_frame.from_options(frame = dropnullfields3, connection_type = "s3", connection_options = {"path": "s3://aws-glue-temporary-###-us-west-2/glue-blog/"}, format = "parquet", transformation_ctx = "datasink4")
To:
datasink4 = glueContext.write_dynamic_frame.from_options(frame = customDynamicFrame, connection_type = "s3", connection_options = {"path": "s3://aws-glue-temporary-###-us-west-2/glue-blog/"}, format = "parquet", transformation_ctx = "datasink4")

Full code:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import lit
from awsglue.dynamicframe import DynamicFrame

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "nycitytaxianalysis", table_name = "blog_yellow", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "nycitytaxianalysis", table_name = "blog_yellow", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("vendorid", "long", "vendorid", "long"), ("tpep_pickup_datetime", "string", "pickup_date", "timestamp"), ("tpep_dropoff_datetime", "string", "dropoff_date", "timestamp"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("pickup_longitude", "double", "pickup_longitude", "double"), ("pickup_latitude", "double", "pickup_latitude", "double"), ("ratecodeid", "long", "ratecodeid", "long"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("dropoff_longitude", "double", "dropoff_longitude", "double"), ("dropoff_latitude", "double", "dropoff_latitude", "double"), ("payment_type", "long", "payment_type", "long"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("vendorid", "long", "vendorid", "long"), ("tpep_pickup_datetime", "string", "pickup_date", "timestamp"), ("tpep_dropoff_datetime", "string", "dropoff_date", "timestamp"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("pickup_longitude", "double", "pickup_longitude", "double"), ("pickup_latitude", "double", "pickup_latitude", "double"), ("ratecodeid", "long", "ratecodeid", "long"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("dropoff_longitude", "double", "dropoff_longitude", "double"), ("dropoff_latitude", "double", "dropoff_latitude", "double"), ("payment_type", "long", "payment_type", "long"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double")], transformation_ctx = "applymapping1")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice2"]
## @return: resolvechoice2
## @inputs: [frame = applymapping1]
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")
## @type: DropNullFields
## @args: [transformation_ctx = "dropnullfields3"]
## @return: dropnullfields3
## @inputs: [frame = resolvechoice2]
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")

##----------------------------------
#convert to a Spark DataFrame...
customDF = dropnullfields3.toDF()
 
#add a new column for "type"
customDF = customDF.withColumn("type", lit('yellow'))
 
# Convert back to a DynamicFrame for further processing.
customDynamicFrame = DynamicFrame.fromDF(customDF, glueContext, "customDF_df")
##----------------------------------

## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://aws-glue-temporary-123456789012-us-west-2/glue-blog/"}, format = "parquet", transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = customDynamicFrame, connection_type = "s3", connection_options = {"path": "s3://aws-glue-temporary-123456789012-us-west-2/glue-blog/"}, format = "parquet", transformation_ctx = "datasink4")
job.commit()

PS
For how the file format is converted from CSV to Parquet, refer to Format Options for ETL Inputs and Outputs in AWS Glue.

PySpark SQL functions lit() is used to add a new column to DataFrame by assigning a literal or constant value. For more information, refer to PySpark lit() – Add Literal or Constant to DataFrame. And this will add a new column (with the same value "yellow") for each row that handled by this ETL job.

Save.
Run the ETL (Run job).

Go to Athena console (and create a table there for the new data format). This shows how you can interact with the data catalog either through Glue or the Athena.

In the Athena query editor console, make sure the nycitytaxianalysis database is selected. Then, execute below SQL statement in the Athena console. Replace the account ID.
CREATE EXTERNAL TABLE IF NOT EXISTS nycitytaxianalysis.CanonicalData (
         VendorID INT,
         pickup_date TIMESTAMP,
         dropoff_date TIMESTAMP,
         passenger_count BIGINT,
         trip_distance DOUBLE,
         pickup_longitude DOUBLE,
         pickup_latitude DOUBLE,
         RatecodeID BIGINT,
         store_and_fwd_flag STRING,
         dropoff_longitude DOUBLE,
         dropoff_latitude DOUBLE,
         payment_type INT,
         fare_amount DOUBLE,
         extra DOUBLE,
         mta_tax DOUBLE,
         tip_amount DOUBLE,
         tolls_amount DOUBLE,
         improvement_surcharge DOUBLE,
         total_amount DOUBLE,
         type STRING 
) STORED AS PARQUET LOCATION 's3://aws-glue-temporary-<123456789012>-us-west-2/glue-blog/' TBLPROPERTIES ('classification'='parquet')

Nota bene
The S3 path ends with "/glue-blog/", corresponding to the script. YOUR-S3-BUCKET looks like aws-glue-temporary-###-us-west-2.


After about 6 minutes, the ETL job will complete. Run below SQL query in Athena.
SELECT type, count(*) FROM "nycitytaxianalysis"."canonicaldata" group by type
Return:
 type_col1
1 yellow 10906858

Define ETL Job for Green Data
In Glue console, create a new ETL job, Here we name it as "NYCityTaxiGreen".
Choose "blog_green" as the source. This time, specify the existing table "canonicaldata" (created by the above step) in the data catalog as the data target. Choose the new pickup_date and dropoff_date fields for the target field types for the date variables, as shown below.

Change the data types to be timestamp rather than strings. The following table outlines the updates.
Old Name Target Name Target Data Type
lpep_pickup_datetime pickup_date timestamp
lpep_dropoff_datetime dropoff_date timestamp



Click the "Save job and edit script" button.

Insert the following two lines at the end of the top imports:
from pyspark.sql.functions import lit
from awsglue.dynamicframe import DynamicFrame

Type this line before the last sink calls. Replace the <DYNAMIC_FRAME_NAME> with the name generated in the script, in this case it is "resolvechoice4".
给来自这个数据源的每一条对应数据条目增加一个field,field的名称为type,每一个条目里这个field对应的值都是green。
##----------------------------------
#convert to a Spark DataFrame...
customDF = <DYNAMIC_FRAME_NAME>.toDF()
 
#add a new column for "type"
customDF = customDF.withColumn("type", lit('green'))
 
# Convert back to a DynamicFrame for further processing.
customDynamicFrame = DynamicFrame.fromDF(customDF, glueContext, "customDF_df")
##----------------------------------

## @type: DataSink
## @args: [database = "nycitytaxianalysis", table_name = "canonicaldata", transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
datasink4 = glueContext.write_dynamic_frame.from_options(frame = customDynamicFrame, connection_type = "s3", connection_options = {"path": "s3://aws-glue-temporary-###-us-west-2/glue-blog/"}, format = "parquet", transformation_ctx = "datasink4")
Update from:
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "nycitytaxianalysis", table_name = "canonicaldata", transformation_ctx = "datasink5")
to:
datasink5 = glueContext.write_dynamic_frame.from_options(frame = customDynamicFrame, connection_type = "s3", connection_options = {"path": "s3://aws-glue-temporary-###-us-west-2/glue-blog/"}, format = "parquet", transformation_ctx = "datasink5")

Full code:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import lit
from awsglue.dynamicframe import DynamicFrame

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "nycitytaxianalysis", table_name = "blog_green", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "nycitytaxianalysis", table_name = "blog_green", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("vendorid", "long", "vendorid", "int"), ("lpep_pickup_datetime", "string", "pickup_date", "timestamp"), ("lpep_dropoff_datetime", "string", "dropoff_date", "timestamp"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("ratecodeid", "long", "ratecodeid", "long"), ("pickup_longitude", "double", "pickup_longitude", "double"), ("pickup_latitude", "double", "pickup_latitude", "double"), ("dropoff_longitude", "double", "dropoff_longitude", "double"), ("dropoff_latitude", "double", "dropoff_latitude", "double"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double"), ("payment_type", "long", "payment_type", "int")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("vendorid", "long", "vendorid", "int"), ("lpep_pickup_datetime", "string", "pickup_date", "timestamp"), ("lpep_dropoff_datetime", "string", "dropoff_date", "timestamp"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("ratecodeid", "long", "ratecodeid", "long"), ("pickup_longitude", "double", "pickup_longitude", "double"), ("pickup_latitude", "double", "pickup_latitude", "double"), ("dropoff_longitude", "double", "dropoff_longitude", "double"), ("dropoff_latitude", "double", "dropoff_latitude", "double"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double"), ("payment_type", "long", "payment_type", "int")], transformation_ctx = "applymapping1")
## @type: SelectFields
## @args: [paths = ["vendorid", "pickup_date", "dropoff_date", "passenger_count", "trip_distance", "pickup_longitude", "pickup_latitude", "ratecodeid", "store_and_fwd_flag", "dropoff_longitude", "dropoff_latitude", "payment_type", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge", "total_amount", "type"], transformation_ctx = "selectfields2"]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["vendorid", "pickup_date", "dropoff_date", "passenger_count", "trip_distance", "pickup_longitude", "pickup_latitude", "ratecodeid", "store_and_fwd_flag", "dropoff_longitude", "dropoff_latitude", "payment_type", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge", "total_amount", "type"], transformation_ctx = "selectfields2")
## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "nycitytaxianalysis", table_name = "canonicaldata", transformation_ctx = "resolvechoice3"]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "nycitytaxianalysis", table_name = "canonicaldata", transformation_ctx = "resolvechoice3")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice4"]
## @return: resolvechoice4
## @inputs: [frame = resolvechoice3]
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_struct", transformation_ctx = "resolvechoice4")

##----------------------------------
#convert to a Spark DataFrame...
customDF = resolvechoice4.toDF()
 
#add a new column for "type"
customDF = customDF.withColumn("type", lit('green'))
 
# Convert back to a DynamicFrame for further processing.
customDynamicFrame = DynamicFrame.fromDF(customDF, glueContext, "customDF_df")

## @type: DataSink
## @args: [database = "nycitytaxianalysis", table_name = "canonicaldata", transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
datasink5 = glueContext.write_dynamic_frame.from_options(frame = customDynamicFrame, connection_type = "s3", connection_options = {"path": "s3://aws-glue-temporary-123456789012-us-west-2/glue-blog/"}, format = "parquet", transformation_ctx = "datasink5")
job.commit()

Run the same SQL query as the previous one, in Athena.
SELECT type, count(*) FROM "nycitytaxianalysis"."canonicaldata" group by type;
Return:
 type_col1
1 yellow 10906858
2 green 12352144


Define ETL Job for FHV data
You have one more dataset to convert, so create a new ETL job. Name the job "NYCityTaxiFHV".
Choose "blog_fhv" as the source.
Specify the existing table "canonicaldata" in the data catalog as the data target.
This dataset is limited as it only has 3 fields, one of which you are converting.
Choose "–" (dash) for the dispatcher (dispatching_base_num) and location (locationid), and map the pickup_ date to the canonical field (pickup_date) (currently, nothing changed).



Click the "Save job and edit script" button.

In the script, insert the following two lines at the end of the top imports:
from pyspark.sql.functions import lit
from awsglue.dynamicframe import DynamicFrame

Find the last call before the the line that start with the datasink.  This is the DynamicFrame that is being used to write out the data.

Now convert that to a DataFrame.  Replace the <DYNAMIC_FRAME_NAME> with the name generated in the script, in this case it is "resolvechoice4".

Insert this code before the last sink call.
##----------------------------------
#convert to a Spark DataFrame...
customDF = <DYNAMIC_FRAME_NAME>.toDF()
 
#add a new column for "type"
customDF = customDF.withColumn("type", lit('fhv'))
 
# Convert back to a DynamicFrame for further processing.
customDynamicFrame = DynamicFrame.fromDF(customDF, glueContext, "customDF_df")
##----------------------------------
Insert above code before this code block:
## @type: DataSink
## @args: [database = "nycitytaxianalysis", table_name = "canonicaldata", transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "nycitytaxianalysis", table_name = "canonicaldata", transformation_ctx = "datasink5")

And update below code from:
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "nycitytaxianalysis", table_name = "canonicaldata", transformation_ctx = "datasink5")
to:
datasink5 = glueContext.write_dynamic_frame.from_options(frame = customDynamicFrame, connection_type = "s3", connection_options = {"path": "s3://aws-glue-temporary-###-us-west-2/glue-blog/"}, format = "parquet", transformation_ctx = "datasink5")
Full code:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import lit
from awsglue.dynamicframe import DynamicFrame

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "nycitytaxianalysis", table_name = "blog_fhv", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "nycitytaxianalysis", table_name = "blog_fhv", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("pickup_date", "string", "pickup_date", "timestamp")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("pickup_date", "string", "pickup_date", "timestamp")], transformation_ctx = "applymapping1")
## @type: SelectFields
## @args: [paths = ["vendorid", "pickup_date", "dropoff_date", "passenger_count", "trip_distance", "pickup_longitude", "pickup_latitude", "ratecodeid", "store_and_fwd_flag", "dropoff_longitude", "dropoff_latitude", "payment_type", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge", "total_amount", "type"], transformation_ctx = "selectfields2"]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["vendorid", "pickup_date", "dropoff_date", "passenger_count", "trip_distance", "pickup_longitude", "pickup_latitude", "ratecodeid", "store_and_fwd_flag", "dropoff_longitude", "dropoff_latitude", "payment_type", "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge", "total_amount", "type"], transformation_ctx = "selectfields2")
## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "nycitytaxianalysis", table_name = "canonicaldata", transformation_ctx = "resolvechoice3"]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "nycitytaxianalysis", table_name = "canonicaldata", transformation_ctx = "resolvechoice3")
## @type: ResolveChoice
## @args: [choice = "make_struct", transformation_ctx = "resolvechoice4"]
## @return: resolvechoice4
## @inputs: [frame = resolvechoice3]
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_struct", transformation_ctx = "resolvechoice4")

##----------------------------------
#convert to a Spark DataFrame...
customDF = resolvechoice4.toDF()
 
#add a new column for "type"
customDF = customDF.withColumn("type", lit('fhv'))
 
# Convert back to a DynamicFrame for further processing.
customDynamicFrame = DynamicFrame.fromDF(customDF, glueContext, "customDF_df")
##----------------------------------

## @type: DataSink
## @args: [database = "nycitytaxianalysis", table_name = "canonicaldata", transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
datasink5 = glueContext.write_dynamic_frame.from_options(frame = customDynamicFrame, connection_type = "s3", connection_options = {"path": "s3://aws-glue-temporary-123456789012-us-west-2/glue-blog/"}, format = "parquet", transformation_ctx = "datasink5")

job.commit()

Save and then run job.

After about 4 minutes, the ETL job should complete. Run the same SQL query as the previous one, in Athena.
SELECT type, count(*) FROM "nycitytaxianalysis"."canonicaldata" group by type;
Return:
 type_col1
1 yellow 10906858
2 fhv 8659987
3 green 12352144

Run below SQL query in Athena.
SELECT min(trip_distance) AS minDistance,
       max(trip_distance) AS maxDistance,
       min(total_amount) AS minTotal,
       max(total_amount) AS maxTotal
FROM "nycitytaxianalysis"."canonicaldata";
Return:
 minDistancemaxDistanceminTotalmaxTotal
1 0.0 8000010.0 -958.4 111271.65


Remember the data dict for Yellow and Green. Re-post here:
Field Name Description
trip_distance The elapsed trip distance in miles reported by the taximeter.
total_amount The total amount charged to passengers. PS: Does not include cash tips.

Run below SQL query in Athena.
SELECT TYPE,
       avg(trip_distance) AS avgDist,
       avg(total_amount/trip_distance) AS avgCostPerMile
FROM "nycitytaxianalysis"."canonicaldata"
WHERE trip_distance > 0
  AND total_amount > 0
GROUP BY TYPE;
Return:
 typeavgDistavgCostPerMile
1 green 4.460172564911156 7.447596834344573
2 yellow 4.676942730048546 8.374197225225656

Run below SQL query in Athena.
SELECT TYPE, 
       avg(trip_distance) avgDistance, 
       avg(total_amount/trip_distance) avgCostPerMile, 
       avg(total_amount) avgCost,
       approx_percentile(total_amount, .99) percentile99
FROM "nycitytaxianalysis"."canonicaldata"
WHERE trip_distance > 0 and total_amount > 0
GROUP BY TYPE;
Return:
 TYPEavgDistanceavgCostPerMileavgCostpercentile99
1 green 4.460172564911154 7.44759683434457 13.847251500201278 69.99
2 yellow 4.6769427300485455 8.374197225225657 15.593192205161714 69.99


Visualizing the Data in QuickSight

Amazon QuickSight is a data visualization service you can use to analyze the data that you just combined. For more detailed instructions, see the Amazon QuickSight User Guide.
Login QuickSight console for the Oregon region via URL https://us-west-2.quicksight.aws.amazon.com/sn/start.

Create a new dataset for NY taxi data
Create a new Athena data source. (Create a Dataset, FROM NEW DATA SOURCES: Athena)
Data source name: nyctlcdemo. For Athena workgroup, select "[primary]".


Point it to the canonical taxi data. (Database: nycitytaxianalysis; Tables: canonicaldata)

Click Edit/Preview data button.
Add a new calculated field named HourOfDay.

 
extract('HH', {pickup_date})

Choose Save and Visualize.
Choose Save & Publish.
Choose Publish & Visualize.
Choose Interactive sheet.

You can now visualize this dataset and query through Athena the canonical dataset on S3 that has been converted by Glue.

Selecting HourOfDay allows you to see how many taxi rides there are based on the hour of the day. This is a calculated field that is being derived by the timestamp in the raw dataset.


You can also deselect that and choose type to see the total counts again by type.


Now choose the pickup_date field in addition to the type field and notice that the plot type automatically changed. Because time is being used, it’s showing a time chart defaulting to the year.
Click "pickup_data" under the "X axis". From "Aggregate", click "Day" to choose a day interval instead.


Expand the x axis to zoom out. You can clearly see a large drop in all rides on January 23, 2016.


Summary

In the post, you went from data investigation to analyzing and visualizing a canonical dataset, without starting a single server. You started by crawling a dataset you didn’t know anything about and the crawler told you the structure, columns, and counts of records.
From there, you saw that all three datasets were in different formats, but represented the same thing: NY City Taxi rides. You then converted them into a canonical (or normalized) form that is easily queried through Athena and QuickSight, in addition to a wide number of different tools not covered in this post.


Reference
AWS Big Data Blog (Tag: AWS Glue)

New York City Taxi Records dataset

Harmonize, Query, and Visualize Data from Various Providers using AWS Glue, Amazon Athena, and Amazon QuickSight

Build a Data Lake Foundation with AWS Glue and Amazon S3 (This one could be used as a complement of the above one, which is more depth but lack a little bit description.)

TLC Trip Records User Guide
-

Category: big_data Tags: public

Upvote


Downvote