-
airflow를 이용하여 bigquery에 공공 API 데이터 적재 파이프라인 생성기Data engineering/Airflow 2023. 9. 14. 14:24
드디어 필자는 airflow에 공공 API를 이용하여 빅쿼리에 적재하는 파이프라인을 성공시켰다!!!!!!!!
이어서 쿼리를 통해, 필요한 정보만 뽑아오는 작업을 통해 머신러닝이나, 시각화에 필요한 데이터들로 만들어 보겠다. (나중 예정)
먼저, 아주 간단한 파이프 라인을 그려보았다.
<pipeline 순서 및 설명>
1. 서울시 부동산 실거래가 정보에 대한 공공데이터에서 openAPI를 설정한다. (url, service key 등 설정 필요한 것들 미리 준비)
2.먼저, 빅쿼리에 컬럼과 타입만 지정한 빈테이블을 만들어 준다. 그리고 빅쿼리의 경우, insert 보다는 테이블을 삭제하고, 새로 적재하는 것이 더 낫기 때문에(비용 적인 측면), 삭제 로직을 넣는다.
3. 쿼리를 통한 삭제 로직이 이루어지지 않으면 -> 오류로 인해 중지되고, 로직이 이루어 진다면 다음으로 넘어간다.
4. 호출할 공공 데이터 API url를 호출한다. 왜냐하면, status = 200이어도, service 형식 -> json or xml 이 맞지 않으면 exception 되기 때문이다.
5. url 호출이 성공적이었다면, 테이블의 타입에 맞추어, 데이터를 전처리 한다.
예를 들어, int형 이지만, '' -> 즉 빈값이어서 string으로 인식하는 경우를 대비하여,
data['BEFORE_MT_RENT_CHRGE'] = None if res_json[i]['BEFORE_MT_RENT_CHRGE'] == '' else res_json[i]['BEFORE_MT_RENT_CHRGE']이런식으로, None 처리를 해준다.
6. 마지막으로 Airflow webserver에 들어가서 파이프라인이 실행되었는지 확인한다.
모두다 초록색으로 성공한 것을 볼 수 있다!
그리고, 이번에는 bashOperator가 아닌 PythonOperator을 이용하여, 각 함수를 구동하여 사용했더니 훨씬 편리했다!
DAG 내용은 다음과 같다.
from datetime import datetime, timedeltafrom airflow import DAGfrom airflow.operators.bash import BashOperatorfrom airflow.operators.empty import EmptyOperatorfrom airflow.operators.python import PythonOperator
import pendulumimport sysimport os
sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))from app.controller.get_data import GetPublicData
# local timezonelocal_tz = pendulum.timezone("Asia/Seoul")
def _complete():print("complete")
default_args = {'owner': 'airflow','start_date': datetime(2023,9,11, tzinfo=local_tz),'retries': 0,'catchup':False}
with DAG(dag_id='bigquery_pipeline_test',description='public_api_pipeline',default_args=default_args,start_date=datetime(2023,9,11),schedule_interval=None,tags=['bigquery','batch','test']) as dag:start = EmptyOperator(task_id="start", dag=dag)delete_table = PythonOperator(task_id='delete_bigquery_table',python_callable=GetPublicData().delete_table,dag=dag)url_test = PythonOperator(task_id='test_for_url',python_callable=GetPublicData().status_test,dag=dag)store_public_data = PythonOperator(task_id = 'get_data_from_public',python_callable=GetPublicData().property,dag=dag)end = PythonOperator(task_id='end',python_callable=_complete,dag=dag)start >> delete_table >> url_test >> store_public_data >> end위의 scheduler_interval 부분을 '@daily' 로 맞추어 준다면, 일배치가 될 것!
다음 포스팅에서는 현재 파이프라인을 중심으로 추가 된 부분을 진행하도록 하겠다!
'Data engineering > Airflow' 카테고리의 다른 글