Data engineering/Airflow

airflow dag작성 및 실행해보기

amelia-suyeon 2023. 8. 29. 21:35

필자는 bigquery 배치를 연결하기 전에 dag에 대해 자세히 알아보고, dag를 작성하여 실행 해보도록 하겠다.

 

dag란?

- Directed  acyclic graph 로 비선형 그래프이다.

- 하나의 작업 단위를 task라 부르고, 여러 개의 task가 합쳐져 dag를 구성한다.

- task 간에는 >> ,  << 기호를 사용하여 순서를 지정하는 것이 가능하다.

- pythonOperator, BashOperator와 같이 python이나 bash를 실행 할 수 있는 여러가지 operator가 존재한다.

 

Operator란? 

task를 구성하기 위한 Airflow class 이며, task를 어떻게 실행시킬지를 나타낸다. (일반적으로 operator를 대부분 사용한다)

 

Operator 구분 

  • action operator : 작업을 수행하거나 다른 시스템에 작업을 수행하도록 지시
  • Transfer Operator : 특정 시스템에서 다른 시스템으로 데이터를 이동

 

자주 사용되는 Operator의 종류 

구분 클래스 경로 내용
Dummy Operator -> Empty Operator airflow.operators.dummy 
-> airflow.operators.empty
아무 작업을 하지 않는 operator, 보통 시작과 종료를 나타내거나, 다른 작업을 그룹화 하는데 사용 (현재는 dummy 대신 empty를 사용한다 -> 공식 홈페이지 참고: https://airflow.apache.org/docs/apache-airflow/2.3.0/_api/airflow/operators/dummy/index.html)
Bash Operator airflow.operators.bash Bash shell 스크립트를 실행하는 Operator
리눅스 명령어 실행 & 프로그램 실행도 가능
python operator airflow.operators.python 파이썬 코드(.py)를 실행하기 위한 Operator
일반적으로 자동화 스크립트를 파이썬으로 작성하는 경우가 많음.
이러한 파이썬 코드들은 해당 operator를 통해 실행할 수 있음.

 

그 외 Operator도 존재하는데 다른 포스팅에서 다루도록 하겠다.

 

오늘은 Empty Operator와 Bash Operator 를 이용하여 dag를 작성해보고 실행해보도록 하겠다. 

 

operator_test.py
 
from airflow.operators.empty import EmptyOperator
from airflow.operators.bash import BashOperator
from airflow import DAG
from datetime import datetime

dag = DAG(
    dag_id = "testdag",  # dag의 고유 이름(webserver에 표시될 dag 이름)
    description= "EmptyOperator test", # 설명
    start_date = datetime(2023,8,29), # 해당 pipeline 실행 시작 시간
   tags=['empty_test'],
    schedule_interval=None, # 해당 pipeline 실행 주기(none 이면 한번만 실행 )
    catchup=False # 이전에 실행되지 않았던 dag를 실행할지 말지 결정
    )

task1 = BashOperator(task_id ="task2", bash_command="echo task1", dag=dag)

## EmptyOperator 예제 -> 더 이상 dummyoperator를 사용하지 않기 때문

start = EmptyOperator(task_id="start", dag=dag)
end = EmptyOperator(task_id="end", dag=dag)

start >> task1 >> end
 
 

 

위와 같이, dag를 생성하고, dag 파일에 넣어서 실행해보도록 하겠다.

 

파일을 생성하여 넣고, dag list를 조회 해보았을 때 아래와 같이 list에 있으면 등록 완료

 

 

이후, airflow db init 명령어를 통해 새로고침 해준다.

 

webserver에 접속한뒤 tag를 검색해서 들어갔을 때, 아래와 같이 있는지 확인한다.

 

 

DAG를 클릭해서 graph나 code, log 등을 본다.

 

이때, 위와 같이 내가 작성한 순서대로 작성되었고, success 되었다면 성공!

 

다음 포스팅에서는 다른 operator를 이용하여 다른 dag를 작성하고 test 해보도록 하겠다.