-
airflow를 빅쿼리에 연결 후, 빅쿼리 dataset 만들기Data engineering/Airflow 2023. 9. 12. 15:57
필자는 airflow를 빅쿼리에 연결하는데 시간소요가 많이 되었다... airflow의 잦은 릴리즈 이유로 ㅠㅠ (최근엔 한달에 한번씩 릴리즈...)
끝내 GCP에 연결되었고, 간단한 dag 작성을 통해 빅쿼리 dataset 생성에 성공하였다!
이것이 바로 4번 만에 성공한,,,(실제로는 20번 넘을듯) 빅쿼리 파이프 라인이다.
빅쿼리 연결 부분은 다른 포스팅에서 했지만, 버전의 문제인지 admin에서 connection만 한다고 되는 것이 아니였다.
그래서 기존의 빅쿼리 연결 클래스를 이용하여, 도전했다.
먼저, main에는 간단한 함수 하나를 만들어 준다.
main.pydef bigquery_test2():data = ctrl_getpublic().createdataset()return {'data': data}bigquery_test2()그리고, 클래스 내 createdataset() 함수는 데이터 셋을 간단하게 만드는 기능을 구현했다.
def createdataset(self):LoggerFactory._LOGGER.info("test")dataset ='airflow_test_3'try:dac = BigqueryDac()dac.create_dataset(dataset_id=dataset)except Exception as e:LoggerFactory._LOGGER.warning("test search error: {}".format(e))그리고, 이를 기반으로 DAG를 만들었다.
from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.bash import BashOperatorfrom airflow.operators.empty import EmptyOperatorimport pendulum
# local timezonelocal_tz = pendulum.timezone("Asia/Seoul")
default_args = {'owner': 'airflow','start_date': datetime(2023,9,11, tzinfo=local_tz),'retries': 0,'catchup':False}
with DAG(dag_id='bigquery_test_for_searching',description='search_bigquery_data',default_args=default_args,start_date=datetime(2023,9,11),schedule_interval=None,tags=['query','test','connect']) as dag:task1 = BashOperator(task_id ="task2",bash_command="echo task1",dag=dag)start = EmptyOperator(task_id="start", dag=dag)end = EmptyOperator(task_id="end", dag=dag)t1 = BashOperator(task_id='bigquery_search_test',bash_command='bash /home/sychung/airflow/start.sh ')start >> task1 >> t1 >> end데이터 셋을 만들기 전,후로 EmptyOperator를 만들어 주었다 -> 진행 상황을 보기 위해!
예전 버전에서는 dummpyOperator를 사용했지만, 현재는 Empty로 변경된 것 같았다.
그리고 실행 순서에 따라 기재하고 -> airflow db init -> airflow scheduler -> airflow webserver -p 8080 을 통해 웹 페이지에서 확인 가능!
빅쿼리에서도 확인 가능 (dataset이 생성되어 있다!)
3월 부터 시작했던 airflow를 드디어 조금 더 친해지게 되어서 기쁘다.
이제 공공 API를 이용한 데이터 파이프 라인도 airflow로 구현해보도록 하겠다!!!
'Data engineering > Airflow' 카테고리의 다른 글