ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 초기 데이터 적재 - Google Cloud Storage 업로드 하기
    Data engineering/Batch&Pipeline 2023. 7. 4. 14:27

    필자는 개인 프로젝트로 GCP를 이용하여 샘플데이터를 로컬에서 Google Cloud Storage에 업로드 하는 파이프라인에 대해 기술하려고 한다. 

     

    초기 데이터가 잘 적재되었다면, 서버에 배포하여 배치 작업까지 개발 할 예정이다. (shell script를 이용 할 예정)

     

    먼저, 구조는 아래와 같다.

     

    python main.py args  -> 와 같은 명령어를 통해 실행시킬 예정

    그래서 main.py를 밖에 두고, 그 외는 project 폴더 속에 만들었다. 

     

    초기 적재라는 목적을 가진 py의 이름은 initial_data.py라고 만들었다.

    그리고 main에서 호출하기 쉽도록, class로 만들어 주었다.

     

    <그리고 아래 부분은 로깅 셋팅 및 상수를 정리해놓았다.>

     
    # 초기 로깅 셋팅
    logger = logging.getLogger("info")
    errLogger = logging.getLogger("error")
    initial_data_Logger = logging.getLogger("initial_data")

    GCS_BUCKET_NAME = str(os.environ.get("GCS_BUCKET_NAME"))
    GCS_DESTINATION = f"batch/data/"
    DIR_PATH = 'C:/test4'  

    1. GCS_BUCKET_NAME : gcs 버킷 이름명 명시

    2. GCS_DESTINATION : 지정된 gcs 버킷 내의 폴더 위치 (폴더명 및 패스가 없으면 명시한 대로 만들어 지니 걱정하지 말것)

    3. DIR_PATH : 로컬에서 업로드 하려는 폴더 PATH  

     

    위와 같이 변수 준비가 끝났다면 본격 CLASS 에 대해 알아보자 

     

     
    #  초기 적재를 위한 CLASS
     
    class Initial_Data():
        def __init__(self, type , status):
            self.type = type
            self.status = status
            self.preprocess_data()
           
        # 초기 전체 파일을 불러오는 함수
        def get_files(self):
            FOLDER_LIST = os.listdir(DIR_PATH)
           
            # DIR_PATH 같은 경우 -> 초기 다운로드 받은 전체 파일이 로컬에 있을 때, 그 파일이 있는 로컬 PATH를 기입
           
            FILE_LIST = []
            for i in range(len(FOLDER_LIST)):
               
                # 20230202 폴더 속에 20230202.CSV 형식의 파일이 존재하기 때문에 FOR 문 실행
                inside_file = os.listdir(f'{DIR_PATH}/{FOLDER_LIST[i]}')
               
     
                if "." not in FOLDER_LIST[i] :
                    for file_name in inside_file:
                       
                        address = f'{DIR_PATH}/{FOLDER_LIST[i]}/{file_name}'
                       
                        FILE_LIST.append(address)
                else:
                    address = f'{DIR_PATH}/{FOLDER_LIST[i]}'
                    FILE_LIST.append(address)
                   
            i+=1
            return FILE_LIST
       
        # 초기 적재 전체 파일을 하나씩 전처리 하는 함수
        def preprocess_data(self):
            headers = ['DATE','BIRTH_DATE',"REMOVE",'COM_CODE','MEMBER_NO','NAME','COMPANY','INCOME','GENDER']

            # 폴더 내 모든 파일 업로드 할 때
            data_list = self.get_files()
           
            for i, x in enumerate(data_list):
               
                # 파일 읽고 전처리 구간
                org_FilePath = data_list[i]
                load = pd.read_csv(org_FilePath, names = headers, low_memory = False)
                data = load.drop(['REMOVE', 'COMPANY'], axis = 'columns')
               
                # 파일명 변경 구간 - custom 구간
                recentFilename = org_FilePath.split('/')[-1]

                # DATAFRAME을 JSONL로 변경 후 GCS에 초기 적재
                data = utils.dataframe_to_jsonl(data)    
                gcsupload = upload.Upload([])
                destFilepath = f"{GCS_DESTINATION}{recentFilename}"
                gcsupload.fn_string_upload(data, destFilepath, errLogger , self.status)
               

     

    <여기서 사용된 또 다른 CLASS 가 존재함> 

    -> upload.py 속에 있는 Upload 라는 클래스이며, gcs 업로드에 필요한 함수들을 묶어놓았다.

    필자의 경우, Upload 클래스 중에서 fn_string_upload() 함수를 사용하였다.

     

        ### GCS String 업로드 부분 ####
        def fn_string_upload(self ,jsonlines_data , destinationName , typeLogger , status):
            self.upload_cnt += 1
            try :
           
                if status == "test" :
                    gcs_project_id = GCS_PROJECT_ID
                    gcs_bucket_name = GCS_BUCKET_NAME
                    gcs_key_file_name = GCS_KEY_FILE

                print(status)
               
                storageClient = storage.Client.from_service_account_json(
                        gcs_key_file_name, project=gcs_project_id)
               
                print(storageClient, " storageclient")
               
                bucket = storageClient.bucket(gcs_bucket_name)
               
                # GCS 에 파일 업로드 하기

                blob = bucket.blob(destinationName)

                blob.upload_from_string(jsonlines_data)
           
                # def fnCheckContentsInGcs(self, bucket, org_Contents_Rows, blobName , typeLogger):
                typeLogger.info("complete upload_from_string")
               
                cntBlob = bucket.get_blob(destinationName)
                blobRowCount = len(cntBlob.download_as_string().splitlines())
                typeLogger.info("org_data_count : " + str(jsonlines_data.count('\n')) + " upload_data_count : " + str(blobRowCount) )

               
                if(jsonlines_data.count('\n') != blobRowCount) :
                    typeLogger.info("not Equals --- org_data_count : " + str(jsonlines_data.count('\n')) + " upload_data_count : " + str(blobRowCount) )
                    if(self.upload_cnt < 5) :
                        if(self.upload_cnt > 1) :
                            sleep(600)
                        self.fn_string_upload(jsonlines_data , destinationName , typeLogger)
                else :
                    return True
            except Exception as e:
                typeLogger.info(f"fnStringUpload Error Raise : {e}")
                if(self.upload_cnt < 5) :
                    sleep(600)
                    self.fn_string_upload(jsonlines_data , destinationName , typeLogger , status)
                return False
     

    client를 이용하여, service key를 읽어와서 업로드를 진행하는 방식이며,

    정합성 체크를 위해 로컬의 파일과 업로드된 파일의 rowcount를 측정하여 비교하는 로직을 넣었다.

    -> 커스텀 해서 사용 가능함! 

     

     

    <주의 하면 좋은점>

     

    - GCS 업로드를 할 때는 JSONLINE을 사용하는 것이 좋다. 왜냐하면 빅쿼리에 테이블을 생성 할 때, JSONLINES를 사용하기 때문이다 -> 즉 전체적인 파이프라인을 위해 

     

    업로드가 성공하였다면 아래와 같이 버킷에서 확인 할 수 있다!

     

     

     

Designed by Tistory.