AWS re:Invent 2020: Data pipelines with Amazon Managed Workflows for Apache Airflow
[12:40] Demo
Name: MWAA-Demo-1
Airflow version: 2.4.3 (Latest)
S3 Bucket: s3://sc-airflow
DAGs folder: s3://sc-airflow/dags
VPC & subnets
Web server access选择Public network (Internet accessible)
Security group(s)选择Create new security group
Environment class: mw1.small
Create environment
Airflow UI:
https://us-west-2.console.aws.amazon.com/mwaa/home?region=us-west-2#environments/MWAA-Demo-1/sso
% python3 -m pip install apache-airflow
% python3 -m pip install apache-airflow-providers-amazon
copy-s3-file-dag.py
from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor from airflow.providers.amazon.aws.hooks.s3 import S3Hook from datetime import datetime from io import StringIO s3_bucket_name = 'sc-airflow' s3_inkey = 'input.csv' s3_outkey = 'output.csv' def tranform_file_fn(): hook = S3Hook() s3c = hook.get_conn() obj = s3c.get_object(Bucket=s3_bucket_name,Key=s3_inkey) infileStr = obj['Body'].read().decode('utf-8') outfileStr=infileStr.replace('"','') outfile = StringIO(outfileStr) s3c.put_object(Bucket=s3_bucket_name,Key=s3_outkey,Body=outfile.getvalue()) with DAG(dag_id='copy-s3-file', schedule_interval='@once', start_date=datetime(2020,11,24), catchup=False) as dag: wait_for_file = S3KeySensor( task_id='wait_for_file', bucket_key=s3_inkey, bucket_name=s3_bucket_name ) tranform_file = PythonOperator( task_id='tranform_file', python_callable=tranform_file_fn ) wait_for_file >> tranform_file
% aws s3 cp copy-s3-file-dag.py s3://sc-airflow/dags/
给MWAA Execution role增加访问S3的permission:AmazonS3FullAccess
点击Airflow UI的copy-s3-file
点击下图中的Trigger DAG
点wait_for_file
全部log:
*** Reading remote log from Cloudwatch log_group: airflow-MWAA-Demo-1-Task log_stream: dag_id=copy-s3-file/run_id=manual__2023-03-28T05_57_14.842043+00_00/task_id=wait_for_file/attempt=1.log. [2023-03-28, 05:57:21 UTC] {{taskinstance.py:1165}} INFO - Dependencies all met for <TaskInstance: copy-s3-file.wait_for_file manual__2023-03-28T05:57:14.842043+00:00 [queued]> [2023-03-28, 05:57:21 UTC] {{taskinstance.py:1165}} INFO - Dependencies all met for <TaskInstance: copy-s3-file.wait_for_file manual__2023-03-28T05:57:14.842043+00:00 [queued]> [2023-03-28, 05:57:21 UTC] {{taskinstance.py:1362}} INFO - -------------------------------------------------------------------------------- [2023-03-28, 05:57:21 UTC] {{taskinstance.py:1363}} INFO - Starting attempt 1 of 1 [2023-03-28, 05:57:21 UTC] {{taskinstance.py:1364}} INFO - -------------------------------------------------------------------------------- [2023-03-28, 05:57:21 UTC] {{taskinstance.py:1383}} INFO - Executing <Task(S3KeySensor): wait_for_file> on 2023-03-28 05:57:14.842043+00:00 [2023-03-28, 05:57:21 UTC] {{standard_task_runner.py:55}} INFO - Started process 228 to run task [2023-03-28, 05:57:21 UTC] {{standard_task_runner.py:82}} INFO - Running: ['airflow', 'tasks', 'run', 'copy-s3-file', 'wait_for_file', 'manual__2023-03-28T05:57:14.842043+00:00', '--job-id', '5', '--raw', '--subdir', 'DAGS_FOLDER/copy-s3-file-dag.py', '--cfg-path', '/tmp/tmp4dp3x5wk'] [2023-03-28, 05:57:21 UTC] {{standard_task_runner.py:83}} INFO - Job 5: Subtask wait_for_file [2023-03-28, 05:57:21 UTC] {{task_command.py:376}} INFO - Running <TaskInstance: copy-s3-file.wait_for_file manual__2023-03-28T05:57:14.842043+00:00 [running]> on host ip-10-0-10-245.us-west-2.compute.internal [2023-03-28, 05:57:21 UTC] {{taskinstance.py:1590}} INFO - Exporting the following env vars: AIRFLOW_CTX_DAG_OWNER=airflow AIRFLOW_CTX_DAG_ID=copy-s3-file AIRFLOW_CTX_TASK_ID=wait_for_file AIRFLOW_CTX_EXECUTION_DATE=2023-03-28T05:57:14.842043+00:00 AIRFLOW_CTX_TRY_NUMBER=1 AIRFLOW_CTX_DAG_RUN_ID=manual__2023-03-28T05:57:14.842043+00:00 [2023-03-28, 05:57:21 UTC] {{s3.py:98}} INFO - Poking for key : s3://sc-airflow/input.csv [2023-03-28, 05:57:21 UTC] {{base.py:71}} INFO - Using connection ID 'aws_default' for task execution.
input.csv:
name,age "Alice",20 "Bob",25 "Charlie",30
% aws s3 cp input.csv s3://sc-airflow/
刷新日志
*** Reading remote log from Cloudwatch log_group: airflow-MWAA-Demo-1-Task log_stream: dag_id=copy-s3-file/run_id=manual__2023-03-28T05_57_14.842043+00_00/task_id=wait_for_file/attempt=1.log. [2023-03-28, 05:57:21 UTC] {{taskinstance.py:1165}} INFO - Dependencies all met for <TaskInstance: copy-s3-file.wait_for_file manual__2023-03-28T05:57:14.842043+00:00 [queued]> [2023-03-28, 05:57:21 UTC] {{taskinstance.py:1165}} INFO - Dependencies all met for <TaskInstance: copy-s3-file.wait_for_file manual__2023-03-28T05:57:14.842043+00:00 [queued]> [2023-03-28, 05:57:21 UTC] {{taskinstance.py:1362}} INFO - -------------------------------------------------------------------------------- [2023-03-28, 05:57:21 UTC] {{taskinstance.py:1363}} INFO - Starting attempt 1 of 1 [2023-03-28, 05:57:21 UTC] {{taskinstance.py:1364}} INFO - -------------------------------------------------------------------------------- [2023-03-28, 05:57:21 UTC] {{taskinstance.py:1383}} INFO - Executing <Task(S3KeySensor): wait_for_file> on 2023-03-28 05:57:14.842043+00:00 [2023-03-28, 05:57:21 UTC] {{standard_task_runner.py:55}} INFO - Started process 228 to run task [2023-03-28, 05:57:21 UTC] {{standard_task_runner.py:82}} INFO - Running: ['airflow', 'tasks', 'run', 'copy-s3-file', 'wait_for_file', 'manual__2023-03-28T05:57:14.842043+00:00', '--job-id', '5', '--raw', '--subdir', 'DAGS_FOLDER/copy-s3-file-dag.py', '--cfg-path', '/tmp/tmp4dp3x5wk'] [2023-03-28, 05:57:21 UTC] {{standard_task_runner.py:83}} INFO - Job 5: Subtask wait_for_file [2023-03-28, 05:57:21 UTC] {{task_command.py:376}} INFO - Running <TaskInstance: copy-s3-file.wait_for_file manual__2023-03-28T05:57:14.842043+00:00 [running]> on host ip-10-0-10-245.us-west-2.compute.internal [2023-03-28, 05:57:21 UTC] {{taskinstance.py:1590}} INFO - Exporting the following env vars: AIRFLOW_CTX_DAG_OWNER=airflow AIRFLOW_CTX_DAG_ID=copy-s3-file AIRFLOW_CTX_TASK_ID=wait_for_file AIRFLOW_CTX_EXECUTION_DATE=2023-03-28T05:57:14.842043+00:00 AIRFLOW_CTX_TRY_NUMBER=1 AIRFLOW_CTX_DAG_RUN_ID=manual__2023-03-28T05:57:14.842043+00:00 [2023-03-28, 05:57:21 UTC] {{s3.py:98}} INFO - Poking for key : s3://sc-airflow/input.csv [2023-03-28, 05:57:21 UTC] {{base.py:71}} INFO - Using connection ID 'aws_default' for task execution. [2023-03-28, 05:58:21 UTC] {{s3.py:98}} INFO - Poking for key : s3://sc-airflow/input.csv [2023-03-28, 05:59:21 UTC] {{s3.py:98}} INFO - Poking for key : s3://sc-airflow/input.csv [2023-03-28, 05:59:21 UTC] {{base.py:213}} INFO - Success criteria met. Exiting. [2023-03-28, 05:59:21 UTC] {{taskinstance.py:1401}} INFO - Marking task as SUCCESS. dag_id=copy-s3-file, task_id=wait_for_file, execution_date=20230328T055714, start_date=20230328T055721, end_date=20230328T055921 [2023-03-28, 05:59:21 UTC] {{local_task_job.py:159}} INFO - Task exited with return code 0 [2023-03-28, 05:59:21 UTC] {{taskinstance.py:2623}} INFO - 0 downstream tasks scheduled from follow-on schedule check
进入Graph view
点击transform_file,然后点击Log
*** Reading remote log from Cloudwatch log_group: airflow-MWAA-Demo-1-Task log_stream: dag_id=copy-s3-file/run_id=manual__2023-03-28T06_05_37.804978+00_00/task_id=tranform_file/attempt=1.log. [2023-03-28, 06:05:51 UTC] {{taskinstance.py:1165}} INFO - Dependencies all met for <TaskInstance: copy-s3-file.tranform_file manual__2023-03-28T06:05:37.804978+00:00 [queued]> [2023-03-28, 06:05:51 UTC] {{taskinstance.py:1165}} INFO - Dependencies all met for <TaskInstance: copy-s3-file.tranform_file manual__2023-03-28T06:05:37.804978+00:00 [queued]> [2023-03-28, 06:05:51 UTC] {{taskinstance.py:1362}} INFO - -------------------------------------------------------------------------------- [2023-03-28, 06:05:51 UTC] {{taskinstance.py:1363}} INFO - Starting attempt 1 of 1 [2023-03-28, 06:05:51 UTC] {{taskinstance.py:1364}} INFO - -------------------------------------------------------------------------------- [2023-03-28, 06:05:51 UTC] {{taskinstance.py:1383}} INFO - Executing <Task(PythonOperator): tranform_file> on 2023-03-28 06:05:37.804978+00:00 [2023-03-28, 06:05:51 UTC] {{standard_task_runner.py:55}} INFO - Started process 283 to run task [2023-03-28, 06:05:51 UTC] {{standard_task_runner.py:82}} INFO - Running: ['airflow', 'tasks', 'run', 'copy-s3-file', 'tranform_file', 'manual__2023-03-28T06:05:37.804978+00:00', '--job-id', '12', '--raw', '--subdir', 'DAGS_FOLDER/copy-s3-file-dag.py', '--cfg-path', '/tmp/tmpl6rb8vcl'] [2023-03-28, 06:05:51 UTC] {{standard_task_runner.py:83}} INFO - Job 12: Subtask tranform_file [2023-03-28, 06:05:51 UTC] {{task_command.py:376}} INFO - Running <TaskInstance: copy-s3-file.tranform_file manual__2023-03-28T06:05:37.804978+00:00 [running]> on host ip-10-0-10-245.us-west-2.compute.internal [2023-03-28, 06:05:51 UTC] {{taskinstance.py:1590}} INFO - Exporting the following env vars: AIRFLOW_CTX_DAG_OWNER=airflow AIRFLOW_CTX_DAG_ID=copy-s3-file AIRFLOW_CTX_TASK_ID=tranform_file AIRFLOW_CTX_EXECUTION_DATE=2023-03-28T06:05:37.804978+00:00 AIRFLOW_CTX_TRY_NUMBER=1 AIRFLOW_CTX_DAG_RUN_ID=manual__2023-03-28T06:05:37.804978+00:00 [2023-03-28, 06:05:51 UTC] {{base.py:71}} INFO - Using connection ID 'aws_default' for task execution. [2023-03-28, 06:05:51 UTC] {{python.py:177}} INFO - Done. Returned value was: None [2023-03-28, 06:05:51 UTC] {{taskinstance.py:1401}} INFO - Marking task as SUCCESS. dag_id=copy-s3-file, task_id=tranform_file, execution_date=20230328T060537, start_date=20230328T060551, end_date=20230328T060551 [2023-03-28, 06:05:51 UTC] {{local_task_job.py:159}} INFO - Task exited with return code 0 [2023-03-28, 06:05:51 UTC] {{taskinstance.py:2623}} INFO - 0 downstream tasks scheduled from follow-on schedule check
output.csv:
name,age Alice,20 Bob,25 Charlie,30
[24:40] Comparing Amazon MWAA and AWS Step Functions