Convert PySpark to Glue Studio

2023年03月02日


I was thinking converting below simple PySpark into an AWS Glue job. Below PySpark is based on a previous post in Spark - Map example (that blog post has not been published to public yet).

import collections
import os

from pyspark import SparkConf, SparkContext


conf = SparkConf().setMaster("local").setAppName("RatingsHistogram")
sc = SparkContext(conf=conf)
file = os.path.dirname(os.path.abspath(__file__)) + '/ml-100k/u.data'
lines = sc.textFile("file://"+file)

ratings = lines.map(lambda x: x.split()[2])
result = ratings.countByValue()
result_sorted = sorted(result.items())

sortedResults = collections.OrderedDict(result_sorted)
for key, value in sortedResults.items():
    print("%s %i" % (key, value))

After some investigation, I found it inappropriate to wrap it and place in Glue. And it would be suitable to use Glue native methods. I will demonstrate as below.

First, upload the source data u.data into an S3 bucket.

Orchestrate the ETL steps:


Source S3:

Output schema


Rename field:

Output schema


Group by and then Aggregate.

Output schema


Target S3:

Output schema同上。

Save and run the job.

There are 36 output files appeared under the S3 bucket, , among them, below outputs are we really need.
File 1 content:
rating,"count(rating)"
1,6110

File 2:
rating,"count(rating)"
2,11370

File 3:
rating,"count(rating)"
3,27145

File 4:
rating,"count(rating)"
4,34174

File 5:
rating,"count(rating)"
5,21201
Compared to the previous result when executing the PySpark in last post, they are the same. The Glue job's result is distributely stored in the S3 bucket.



Appendix
The overall generated script:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import functions as SqlFuncs


def sparkAggregate(
    glueContext, parentFrame, groups, aggs, transformation_ctx
) -> DynamicFrame:
    aggsFuncs = []
    for column, func in aggs:
        aggsFuncs.append(getattr(SqlFuncs, func)(column))
    result = (
        parentFrame.toDF().groupBy(*groups).agg(*aggsFuncs)
        if len(groups) > 0
        else parentFrame.toDF().agg(*aggsFuncs)
    )
    return DynamicFrame.fromDF(result, glueContext, transformation_ctx)


args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node Amazon S3
AmazonS3_node1677724421448 = glueContext.create_dynamic_frame.from_options(
    format_options={"quoteChar": '"', "withHeader": False, "separator": "\t"},
    connection_type="s3",
    format="csv",
    connection_options={
        "paths": ["s3://aws-glue-temporary-<111122223333>-us-west-2/u.data"],
        "recurse": True,
    },
    transformation_ctx="AmazonS3_node1677724421448",
)

# Script generated for node Rename Field
RenameField_node1677724837209 = RenameField.apply(
    frame=AmazonS3_node1677724421448,
    old_name="col2",
    new_name="rating",
    transformation_ctx="RenameField_node1677724837209",
)

# Script generated for node Aggregate
Aggregate_node1677724516125 = sparkAggregate(
    glueContext,
    parentFrame=RenameField_node1677724837209,
    groups=["rating"],
    aggs=[["rating", "count"]],
    transformation_ctx="Aggregate_node1677724516125",
)

# Script generated for node Amazon S3
AmazonS3_node1677724545988 = glueContext.write_dynamic_frame.from_options(
    frame=Aggregate_node1677724516125,
    connection_type="s3",
    format="csv",
    connection_options={
        "path": "s3://aws-glue-temporary-<111122223333>-us-west-2/result/",
        "partitionKeys": [],
    },
    transformation_ctx="AmazonS3_node1677724545988",
)

job.commit()


-

Category: data Tags: public

Upvote


Downvote