-
초기 데이터 적재 - Google Cloud Storage 업로드 하기Data engineering/Batch&Pipeline 2023. 7. 4. 14:27
필자는 개인 프로젝트로 GCP를 이용하여 샘플데이터를 로컬에서 Google Cloud Storage에 업로드 하는 파이프라인에 대해 기술하려고 한다.
초기 데이터가 잘 적재되었다면, 서버에 배포하여 배치 작업까지 개발 할 예정이다. (shell script를 이용 할 예정)
먼저, 구조는 아래와 같다.
python main.py args -> 와 같은 명령어를 통해 실행시킬 예정
그래서 main.py를 밖에 두고, 그 외는 project 폴더 속에 만들었다.
초기 적재라는 목적을 가진 py의 이름은 initial_data.py라고 만들었다.
그리고 main에서 호출하기 쉽도록, class로 만들어 주었다.
<그리고 아래 부분은 로깅 셋팅 및 상수를 정리해놓았다.>
# 초기 로깅 셋팅logger = logging.getLogger("info")errLogger = logging.getLogger("error")initial_data_Logger = logging.getLogger("initial_data")
GCS_BUCKET_NAME = str(os.environ.get("GCS_BUCKET_NAME"))GCS_DESTINATION = f"batch/data/"DIR_PATH = 'C:/test4'1. GCS_BUCKET_NAME : gcs 버킷 이름명 명시
2. GCS_DESTINATION : 지정된 gcs 버킷 내의 폴더 위치 (폴더명 및 패스가 없으면 명시한 대로 만들어 지니 걱정하지 말것)
3. DIR_PATH : 로컬에서 업로드 하려는 폴더 PATH
위와 같이 변수 준비가 끝났다면 본격 CLASS 에 대해 알아보자
# 초기 적재를 위한 CLASSclass Initial_Data():def __init__(self, type , status):self.type = typeself.status = statusself.preprocess_data()# 초기 전체 파일을 불러오는 함수def get_files(self):FOLDER_LIST = os.listdir(DIR_PATH)# DIR_PATH 같은 경우 -> 초기 다운로드 받은 전체 파일이 로컬에 있을 때, 그 파일이 있는 로컬 PATH를 기입FILE_LIST = []for i in range(len(FOLDER_LIST)):# 20230202 폴더 속에 20230202.CSV 형식의 파일이 존재하기 때문에 FOR 문 실행inside_file = os.listdir(f'{DIR_PATH}/{FOLDER_LIST[i]}')if "." not in FOLDER_LIST[i] :for file_name in inside_file:address = f'{DIR_PATH}/{FOLDER_LIST[i]}/{file_name}'FILE_LIST.append(address)else:address = f'{DIR_PATH}/{FOLDER_LIST[i]}'FILE_LIST.append(address)i+=1return FILE_LIST# 초기 적재 전체 파일을 하나씩 전처리 하는 함수def preprocess_data(self):headers = ['DATE','BIRTH_DATE',"REMOVE",'COM_CODE','MEMBER_NO','NAME','COMPANY','INCOME','GENDER']
# 폴더 내 모든 파일 업로드 할 때data_list = self.get_files()for i, x in enumerate(data_list):# 파일 읽고 전처리 구간org_FilePath = data_list[i]load = pd.read_csv(org_FilePath, names = headers, low_memory = False)data = load.drop(['REMOVE', 'COMPANY'], axis = 'columns')# 파일명 변경 구간 - custom 구간recentFilename = org_FilePath.split('/')[-1]
# DATAFRAME을 JSONL로 변경 후 GCS에 초기 적재data = utils.dataframe_to_jsonl(data)gcsupload = upload.Upload([])destFilepath = f"{GCS_DESTINATION}{recentFilename}"gcsupload.fn_string_upload(data, destFilepath, errLogger , self.status)<여기서 사용된 또 다른 CLASS 가 존재함>
-> upload.py 속에 있는 Upload 라는 클래스이며, gcs 업로드에 필요한 함수들을 묶어놓았다.
필자의 경우, Upload 클래스 중에서 fn_string_upload() 함수를 사용하였다.
### GCS String 업로드 부분 ####def fn_string_upload(self ,jsonlines_data , destinationName , typeLogger , status):self.upload_cnt += 1try :if status == "test" :gcs_project_id = GCS_PROJECT_IDgcs_bucket_name = GCS_BUCKET_NAMEgcs_key_file_name = GCS_KEY_FILE
print(status)storageClient = storage.Client.from_service_account_json(gcs_key_file_name, project=gcs_project_id)print(storageClient, " storageclient")bucket = storageClient.bucket(gcs_bucket_name)# GCS 에 파일 업로드 하기
blob = bucket.blob(destinationName)
blob.upload_from_string(jsonlines_data)# def fnCheckContentsInGcs(self, bucket, org_Contents_Rows, blobName , typeLogger):typeLogger.info("complete upload_from_string")cntBlob = bucket.get_blob(destinationName)blobRowCount = len(cntBlob.download_as_string().splitlines())typeLogger.info("org_data_count : " + str(jsonlines_data.count('\n')) + " upload_data_count : " + str(blobRowCount) )
if(jsonlines_data.count('\n') != blobRowCount) :typeLogger.info("not Equals --- org_data_count : " + str(jsonlines_data.count('\n')) + " upload_data_count : " + str(blobRowCount) )if(self.upload_cnt < 5) :if(self.upload_cnt > 1) :sleep(600)self.fn_string_upload(jsonlines_data , destinationName , typeLogger)else :return Trueexcept Exception as e:typeLogger.info(f"fnStringUpload Error Raise : {e}")if(self.upload_cnt < 5) :sleep(600)self.fn_string_upload(jsonlines_data , destinationName , typeLogger , status)return Falseclient를 이용하여, service key를 읽어와서 업로드를 진행하는 방식이며,
정합성 체크를 위해 로컬의 파일과 업로드된 파일의 rowcount를 측정하여 비교하는 로직을 넣었다.
-> 커스텀 해서 사용 가능함!
<주의 하면 좋은점>
- GCS 업로드를 할 때는 JSONLINE을 사용하는 것이 좋다. 왜냐하면 빅쿼리에 테이블을 생성 할 때, JSONLINES를 사용하기 때문이다 -> 즉 전체적인 파이프라인을 위해
업로드가 성공하였다면 아래와 같이 버킷에서 확인 할 수 있다!
'Data engineering > Batch&Pipeline' 카테고리의 다른 글
crontab 이용하여 공공 api 데이터를 빅쿼리에 적재 (0) 2023.08.21 배치 파일을 배포하기 위한 스크립트 파일 작성하기 (1) 2023.08.20