ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [병렬 처리] python multithreading 도입기 (feat.multiprocess 차이)
    Backend/Python 2024. 2. 23. 15:28

    필자는 현재 프로젝트에서 배치작업을 했었는데, 이상치 검증 로직 때문에 배치 시간이 무려 1분이나 되는 것을 발견하였다! 

     

    사실은 1분 30초 정도였는데, 구역을 나누어 진행하여 1구역당 50초 정도 소요되었다.

    하지만, 적재되어 있는 데이터가 늘어날수록, 소요 시간이 더 늘어날 것 같았고, 10분 단위 대시보드 프로젝트에서는 1분 1초가 소중하기 때문에 병렬 처리를 도입하기로 하였다.

     

    먼저, 병렬처리를 하기에 앞서, 항상 언급되는 부분 Multithreading 과 Multiprocessing 에 관해 비교해보자 

    -> 필자도 항상 머리속에 있다고 생각하지만, 가끔 헷갈리는(?) 부분이라 같이 정리하고자 한다.

     

    그렇다면, 가장 큰 차이점은 무엇일까?

    바로 Thread 와 Process 이다.

    나아가 이것은 concurrent 와 paralle 와도 연결된다.

     

     

    공룡책에서도 다루었던 thread 와 process의 구조에 대해서 아래의 그림을 보자

     

    크게, CPU > Process > Thread 이렇게 속해 있다.

     

    • 1개의 CPU내에 1개의 Process가 있으며, 1개의 Process는 여러개의 Thread 를 가질 수 있다.
    • Process내의 Thread들은 서로 메모리를 공유한다.
    • 1개의 Process 가 여러개 thread 를 실행하는 것을 concurrent 실행이라 한다.
    • 여러 process를 동시에 실행하는 것을 parallel 실행이라 한다.

    위를 바탕으로 한마디로 정의하자면 

    multithreading = thread를 동시에 실행

    multiprocessing = process를 동시에 실행하는 것 

     

    그렇다면, 각각 어느 때에 써야할 까?

     

    1. CPU를 많이 할당하지 않는 상태에서 여러가지를 작업을 처리할 때

    -> 이런 경우에는 thread로 묶는다

    필자도 이와 비슷한 상황인데, 같은 쿼리를 300번 호출해야 할 때 매번 db와 contact 하지 않고 300번의 쿼리를 읽어 들인다. -> 즉, memory를 공유하는 multithreading이 빠르고 안전

     

    2. 복잡한 계산을 가지고 있는 작업을 여러개 진행할 때-> CPU 많이 할당 필요 

    -> 이런 경우는 multiprocessing으로 cpu & process를 여러대 사용해야 하기 때문에

    여러대의  프로세스에 할당하여 진행한다. -> Multiprocessing 진행 

     

    실제로 , multithreading이 유리한데 바로 비용의 측면이다.

    (좀 더 빠른 것도 multithreading이다.)

     

    따라서 필자는 multithreading을 적용하였다. 아래에서 부터는 어떻게 적용했는지 알아보자.

     

    [환경] 

    DB: MariaDB

    Language: Python3.10.8

    Module: concurrent.futures

     

    먼저, 동적 쿼리를 300번 넘게 호출하는 부분에서 multithreading을 사용하였다.

     

    기존의 로직은 아래와 같다.

     

    {2중 포문 -> 쿼리 호출 (이상치 필터) -> db 연결(select) -> 결과에 따른 필터 -> 결과를 [리스트] 담고 } *300번 후 한번에 insert

     

    변경된 로직은 아래와 같다.

     

    2중포문 -> 쿼리호출(이상치필터) *300번 호출과 동시에 multithreading 적용 -> 300개의 쿼리를 리스트에 담은 후 -> db연결(select) -> 300개의 결과 리스트에 바로 필터링 -> 최종 결과 리스트 insert 

     

    (색상을 칠한 부분이 주요 관점 포인트 인데, 필자는 이부분을 하나의 클래스로 만들었다 -> 왜냐하면 이와 같은 구조의 배치로직이 많기 때문이다.)

     

    실제 사용한  class는 아래와 같다.

     

    from datetime import datetime
    from common.connector import DBConnection
    from common.logger import LoggerFactory
    from loguru import logger
    from concurrent.futures import ThreadPoolExecutor
    
    
    class SettingParam():
        
        def __init__(self,region_mappings, col_names, table_1, table_2):
            self.region_mappings = region_mappings
            self.col_names = col_names
            self.table_1 = table_1
            self.table_2 = table_2
            
    class MultiThreading():
        
        def __init__(self):
            pass
        
        @staticmethod
        def run_query(query):
            exc = DBConnection()
            result = exc.select_query(query)
            return result
        
        @classmethod
        def ProcessQuery(cls, a:SettingParam):
            queries = []
            results =[]
            
            try: 
                for region_nm in a.region_mappings:
                    code = a.region_mappings.get(region_nm,[])
                    for one in code:
                        where_option_v1 = f"region = '{region_nm}' and area = '{one}'"
                        where_option_v2 = f"a.region = '{region_nm}' and a.area = '{one}'"
                        for element in a.col_names:
                            query = f"""
                                    with condition_dt as 
                                    (select *
                                    from (select save_dt, region, area, {element}
                                        from {a.table_1}
                                        where substr(save_dt,1,10) = "2024-02-22"
                                        and substr(save_dt,12,2) = "14") data 
                                    where substr(data.save_dt ,15,2) = '30'
                                    and {where_option_v1}
                                    group by  save_dt, region, area, {element}
                                    )
    
                                    -- min, max 값 포함
                                    , recent as (select col_name, min_range, max_range, save_dt, con_type
                                    from {a.table_2} 
                                    where (col_name ,save_dt ) 
                                    in (select col_name, max(save_dt) as max_date
                                        from {a.table_2}
                                        group by col_name)
                                    )
    
                                    select b.col_name, a.region, a.area, a.{element} as value, b.min_range, b.max_range, b.con_type,
                                            CASE WHEN a.{element} < b.min_range THEN 'under'
                                                WHEN a.{element} > b.max_range THEN 'over'
                                                WHEN a.{element} = 'zzz' then 'zzz'
                                                ELSE 'n' END AS alarm
                                    from condition_dt a, recent b 
                                    where {where_option_v2}  
                                    and b.col_name ='{element}' 
                                    """
                            queries.append(query)
                
                
                with ThreadPoolExecutor(max_workers=16) as executor:
                    results = executor.map(MultiThreading.run_query,queries)
                    return results
                
            except Exception as e:
                logger.error(e)
                return None

     

    설명을 덧붙이자면

     

    1. SettingParam() 클래스 같은 경우, 다른 함수에서 호출할 때, 편리하게 사용하기 위해 하나의 객체를 만드는 용도로 사용하였다.

     

    2. run_query()는 고정된 상태이기 때문에 staticmethod로 MultiThreading() 클래스 내에 존재하도록 하였다. 

     

    3. ProcessQuery() 함수에서는 동적인 쿼리가 작동하도록 만들었고, 대신에 1개의 쿼리문이 끝나는 지점에서 리스트에 쿼리를 넣는 방식으로 진행하였다 -> 리스트에 쿼리 300개 넘개 쌓기 위해서

     

    4. 그리고 대망의 with절을 보면 ThreadPoolExecutor가 등장하는데, 이 부분이 바로 multithreading을 진행하는 곳이다. worker 같은 경우 다른 블로그에서 보았을 때 무조건 많다고 빠른 것이 아니였기 때문에 16으로 두었다. 

     

    특이하게도, executor내에서 map을 사용하여 처리하려면 함수를 사용해야 한다. 그래서 run_query()라는 static method를 만들고 호출하여 사용하였다.

     

    내가 얻고자 하는 결과는 함수와 쿼리묶음(300개)를 map하여 executor에 실행해주면-> 300개의 쿼리를 16개의 worker가 병렬로 처리해준다.

     

    도입전과 후를 비교해보자.

     

    도입 전 

    구역을 나눔에도 불구하고 (배치를 2개로 나누어서 진행) 55초 가량 소요되었다.

     

    도입 후

     

    구역을 1개로 합쳤음에도 불구하고(2개의 배치를 다시 1개로) 21초로 줄어들었다.

     

    배포한 후에도 맘에 걸렸던 병렬처리를 이번 기회로 적용하게 되어서 좋았다.

    시간이 생간 김에, 리팩토링(클래스 화) 및 코드 정리도 해서 좀 더 가독성 코드를 만드는 것이 나의 목표!

     

     

Designed by Tistory.