AWS re:Invent 2020: Data pipelines with Amazon Managed Workflows for Apache Airflow

2023年03月27日






[12:40] Demo
Name: MWAA-Demo-1
Airflow version: 2.4.3 (Latest)
S3 Bucket: s3://sc-airflow
DAGs folder: s3://sc-airflow/dags
VPC & subnets
Web server access选择Public network (Internet accessible)
Security group(s)选择Create new security group
Environment class: mw1.small

Create environment

Airflow UI: 
https://us-west-2.console.aws.amazon.com/mwaa/home?region=us-west-2#environments/MWAA-Demo-1/sso

% python3 -m pip install apache-airflow
% python3 -m pip install apache-airflow-providers-amazon

copy-s3-file-dag.py

from airflow import DAG

from airflow.operators.python_operator import PythonOperator
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

from datetime import datetime
from io import StringIO

s3_bucket_name = 'sc-airflow'
s3_inkey = 'input.csv'
s3_outkey = 'output.csv'

def tranform_file_fn():
    hook = S3Hook()
    s3c = hook.get_conn()
    obj = s3c.get_object(Bucket=s3_bucket_name,Key=s3_inkey)
    infileStr = obj['Body'].read().decode('utf-8')
    outfileStr=infileStr.replace('"','')
    outfile = StringIO(outfileStr)
    s3c.put_object(Bucket=s3_bucket_name,Key=s3_outkey,Body=outfile.getvalue())

with DAG(dag_id='copy-s3-file',
    schedule_interval='@once',
    start_date=datetime(2020,11,24),
    catchup=False) as dag:

    wait_for_file = S3KeySensor(
        task_id='wait_for_file',
        bucket_key=s3_inkey,
        bucket_name=s3_bucket_name
    )
    tranform_file = PythonOperator(
        task_id='tranform_file',
        python_callable=tranform_file_fn
    )

    wait_for_file >> tranform_file

% aws s3 cp copy-s3-file-dag.py s3://sc-airflow/dags/



给MWAA Execution role增加访问S3的permission:AmazonS3FullAccess

点击Airflow UI的copy-s3-file
点击下图中的Trigger DAG



点wait_for_file




全部log:
*** Reading remote log from Cloudwatch log_group: airflow-MWAA-Demo-1-Task log_stream: dag_id=copy-s3-file/run_id=manual__2023-03-28T05_57_14.842043+00_00/task_id=wait_for_file/attempt=1.log.
[2023-03-28, 05:57:21 UTC] {{taskinstance.py:1165}} INFO - Dependencies all met for <TaskInstance: copy-s3-file.wait_for_file manual__2023-03-28T05:57:14.842043+00:00 [queued]>
[2023-03-28, 05:57:21 UTC] {{taskinstance.py:1165}} INFO - Dependencies all met for <TaskInstance: copy-s3-file.wait_for_file manual__2023-03-28T05:57:14.842043+00:00 [queued]>
[2023-03-28, 05:57:21 UTC] {{taskinstance.py:1362}} INFO - 
--------------------------------------------------------------------------------
[2023-03-28, 05:57:21 UTC] {{taskinstance.py:1363}} INFO - Starting attempt 1 of 1
[2023-03-28, 05:57:21 UTC] {{taskinstance.py:1364}} INFO - 
--------------------------------------------------------------------------------
[2023-03-28, 05:57:21 UTC] {{taskinstance.py:1383}} INFO - Executing <Task(S3KeySensor): wait_for_file> on 2023-03-28 05:57:14.842043+00:00
[2023-03-28, 05:57:21 UTC] {{standard_task_runner.py:55}} INFO - Started process 228 to run task
[2023-03-28, 05:57:21 UTC] {{standard_task_runner.py:82}} INFO - Running: ['airflow', 'tasks', 'run', 'copy-s3-file', 'wait_for_file', 'manual__2023-03-28T05:57:14.842043+00:00', '--job-id', '5', '--raw', '--subdir', 'DAGS_FOLDER/copy-s3-file-dag.py', '--cfg-path', '/tmp/tmp4dp3x5wk']
[2023-03-28, 05:57:21 UTC] {{standard_task_runner.py:83}} INFO - Job 5: Subtask wait_for_file
[2023-03-28, 05:57:21 UTC] {{task_command.py:376}} INFO - Running <TaskInstance: copy-s3-file.wait_for_file manual__2023-03-28T05:57:14.842043+00:00 [running]> on host ip-10-0-10-245.us-west-2.compute.internal
[2023-03-28, 05:57:21 UTC] {{taskinstance.py:1590}} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=copy-s3-file
AIRFLOW_CTX_TASK_ID=wait_for_file
AIRFLOW_CTX_EXECUTION_DATE=2023-03-28T05:57:14.842043+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-03-28T05:57:14.842043+00:00
[2023-03-28, 05:57:21 UTC] {{s3.py:98}} INFO - Poking for key : s3://sc-airflow/input.csv
[2023-03-28, 05:57:21 UTC] {{base.py:71}} INFO - Using connection ID 'aws_default' for task execution.


input.csv:
name,age
"Alice",20
"Bob",25
"Charlie",30

% aws s3 cp input.csv s3://sc-airflow/

刷新日志
*** Reading remote log from Cloudwatch log_group: airflow-MWAA-Demo-1-Task log_stream: dag_id=copy-s3-file/run_id=manual__2023-03-28T05_57_14.842043+00_00/task_id=wait_for_file/attempt=1.log.
[2023-03-28, 05:57:21 UTC] {{taskinstance.py:1165}} INFO - Dependencies all met for <TaskInstance: copy-s3-file.wait_for_file manual__2023-03-28T05:57:14.842043+00:00 [queued]>
[2023-03-28, 05:57:21 UTC] {{taskinstance.py:1165}} INFO - Dependencies all met for <TaskInstance: copy-s3-file.wait_for_file manual__2023-03-28T05:57:14.842043+00:00 [queued]>
[2023-03-28, 05:57:21 UTC] {{taskinstance.py:1362}} INFO - 
--------------------------------------------------------------------------------
[2023-03-28, 05:57:21 UTC] {{taskinstance.py:1363}} INFO - Starting attempt 1 of 1
[2023-03-28, 05:57:21 UTC] {{taskinstance.py:1364}} INFO - 
--------------------------------------------------------------------------------
[2023-03-28, 05:57:21 UTC] {{taskinstance.py:1383}} INFO - Executing <Task(S3KeySensor): wait_for_file> on 2023-03-28 05:57:14.842043+00:00
[2023-03-28, 05:57:21 UTC] {{standard_task_runner.py:55}} INFO - Started process 228 to run task
[2023-03-28, 05:57:21 UTC] {{standard_task_runner.py:82}} INFO - Running: ['airflow', 'tasks', 'run', 'copy-s3-file', 'wait_for_file', 'manual__2023-03-28T05:57:14.842043+00:00', '--job-id', '5', '--raw', '--subdir', 'DAGS_FOLDER/copy-s3-file-dag.py', '--cfg-path', '/tmp/tmp4dp3x5wk']
[2023-03-28, 05:57:21 UTC] {{standard_task_runner.py:83}} INFO - Job 5: Subtask wait_for_file
[2023-03-28, 05:57:21 UTC] {{task_command.py:376}} INFO - Running <TaskInstance: copy-s3-file.wait_for_file manual__2023-03-28T05:57:14.842043+00:00 [running]> on host ip-10-0-10-245.us-west-2.compute.internal
[2023-03-28, 05:57:21 UTC] {{taskinstance.py:1590}} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=copy-s3-file
AIRFLOW_CTX_TASK_ID=wait_for_file
AIRFLOW_CTX_EXECUTION_DATE=2023-03-28T05:57:14.842043+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-03-28T05:57:14.842043+00:00
[2023-03-28, 05:57:21 UTC] {{s3.py:98}} INFO - Poking for key : s3://sc-airflow/input.csv
[2023-03-28, 05:57:21 UTC] {{base.py:71}} INFO - Using connection ID 'aws_default' for task execution.
[2023-03-28, 05:58:21 UTC] {{s3.py:98}} INFO - Poking for key : s3://sc-airflow/input.csv
[2023-03-28, 05:59:21 UTC] {{s3.py:98}} INFO - Poking for key : s3://sc-airflow/input.csv
[2023-03-28, 05:59:21 UTC] {{base.py:213}} INFO - Success criteria met. Exiting.
[2023-03-28, 05:59:21 UTC] {{taskinstance.py:1401}} INFO - Marking task as SUCCESS. dag_id=copy-s3-file, task_id=wait_for_file, execution_date=20230328T055714, start_date=20230328T055721, end_date=20230328T055921
[2023-03-28, 05:59:21 UTC] {{local_task_job.py:159}} INFO - Task exited with return code 0
[2023-03-28, 05:59:21 UTC] {{taskinstance.py:2623}} INFO - 0 downstream tasks scheduled from follow-on schedule check

进入Graph view
点击transform_file,然后点击Log
*** Reading remote log from Cloudwatch log_group: airflow-MWAA-Demo-1-Task log_stream: dag_id=copy-s3-file/run_id=manual__2023-03-28T06_05_37.804978+00_00/task_id=tranform_file/attempt=1.log.
[2023-03-28, 06:05:51 UTC] {{taskinstance.py:1165}} INFO - Dependencies all met for <TaskInstance: copy-s3-file.tranform_file manual__2023-03-28T06:05:37.804978+00:00 [queued]>
[2023-03-28, 06:05:51 UTC] {{taskinstance.py:1165}} INFO - Dependencies all met for <TaskInstance: copy-s3-file.tranform_file manual__2023-03-28T06:05:37.804978+00:00 [queued]>
[2023-03-28, 06:05:51 UTC] {{taskinstance.py:1362}} INFO - 
--------------------------------------------------------------------------------
[2023-03-28, 06:05:51 UTC] {{taskinstance.py:1363}} INFO - Starting attempt 1 of 1
[2023-03-28, 06:05:51 UTC] {{taskinstance.py:1364}} INFO - 
--------------------------------------------------------------------------------
[2023-03-28, 06:05:51 UTC] {{taskinstance.py:1383}} INFO - Executing <Task(PythonOperator): tranform_file> on 2023-03-28 06:05:37.804978+00:00
[2023-03-28, 06:05:51 UTC] {{standard_task_runner.py:55}} INFO - Started process 283 to run task
[2023-03-28, 06:05:51 UTC] {{standard_task_runner.py:82}} INFO - Running: ['airflow', 'tasks', 'run', 'copy-s3-file', 'tranform_file', 'manual__2023-03-28T06:05:37.804978+00:00', '--job-id', '12', '--raw', '--subdir', 'DAGS_FOLDER/copy-s3-file-dag.py', '--cfg-path', '/tmp/tmpl6rb8vcl']
[2023-03-28, 06:05:51 UTC] {{standard_task_runner.py:83}} INFO - Job 12: Subtask tranform_file
[2023-03-28, 06:05:51 UTC] {{task_command.py:376}} INFO - Running <TaskInstance: copy-s3-file.tranform_file manual__2023-03-28T06:05:37.804978+00:00 [running]> on host ip-10-0-10-245.us-west-2.compute.internal
[2023-03-28, 06:05:51 UTC] {{taskinstance.py:1590}} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=copy-s3-file
AIRFLOW_CTX_TASK_ID=tranform_file
AIRFLOW_CTX_EXECUTION_DATE=2023-03-28T06:05:37.804978+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2023-03-28T06:05:37.804978+00:00
[2023-03-28, 06:05:51 UTC] {{base.py:71}} INFO - Using connection ID 'aws_default' for task execution.
[2023-03-28, 06:05:51 UTC] {{python.py:177}} INFO - Done. Returned value was: None
[2023-03-28, 06:05:51 UTC] {{taskinstance.py:1401}} INFO - Marking task as SUCCESS. dag_id=copy-s3-file, task_id=tranform_file, execution_date=20230328T060537, start_date=20230328T060551, end_date=20230328T060551
[2023-03-28, 06:05:51 UTC] {{local_task_job.py:159}} INFO - Task exited with return code 0
[2023-03-28, 06:05:51 UTC] {{taskinstance.py:2623}} INFO - 0 downstream tasks scheduled from follow-on schedule check



output.csv:
name,age
Alice,20
Bob,25
Charlie,30



[24:40] Comparing Amazon MWAA and AWS Step Functions


Category: AWS Tags: public

Upvote


Downvote