ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • FastAPI에서 DB Connection-mysql 연결하기
    Backend/FastAPI 2023. 9. 21. 14:48

    필자는 mysql 설치 완료 후, FastAPI에서 mysql을 연결하여 사용하려 한다.

     

    먼저, 이 프로젝트의 구조는 아래와 같다.

     

     

    api : 메인 로직 작성 (mvc 패턴에서 controller 역활과 같다.), router(url 경로에 따라 포함하고 있다.)

     

    db : db 연결, db 세션 관리 

     

    model : db 컬럼 정의, 그 외 모델들을 정의

     

    middlewares : 토큰 및 accesscontrol 정의

     

    schemas: type 정의

     

    utils: 공통 함수 정의 

     

    __init__.py: 해당 폴더가 패키지의 일부 즉, 연결 된다는 것을 나타냄. 파일 자체에 작성되어 있는 것은 없음

     

     

    본격, DB 연결하기

     

    필자의 경우, main.py에 설정하였다. 

     

    1. main.py

     

     
     # main.py
     
      import logging

      import uvicorn
      from fastapi import FastAPI
      from fastapi.security import HTTPBearer
      from sqlalchemy import text
     
      from config import get_env
      from app.db.connection import db
     

      HTTP_BEARER = HTTPBearer(auto_error=False)


      def start_app():
          env = get_env()
          db.init_db(app=app, **env.dict())

          return app


      app = start_app()

      if __name__ == "__main__":
          uvicorn.run("main:start_app", host="0.0.0.0", port=8000, reload=True, factory=True)

     

    connection 모듈 속의 init_db() 함수 호출 -> db 연결 테스트 

     

    2. app/db/connection.py

     

     
     # connection.py
     
    from fastapi import FastAPI
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker


    class DBConnection:
        def __init__(self):
            self._engine = None
            self._session = None
            self._read_session = None

        def init_db(self, app: FastAPI, **kwargs):
            db_url = kwargs.get("DB_URL")
            print(db_url," db url")  # db 연결 되지 않을 때, 확인 하기 위함 
            pool_recycle = kwargs.get("DB_POOL_RECYCLE")
            db_echo = kwargs.get("DB_ECHO")
            pool_size = kwargs.get("DB_POOL_SIZE")
            max_overflow = kwargs.get("DB_MAX_OVERFLOW")
            self._engine = create_engine(
                    db_url,
                    echo=db_echo,
                    pool_recycle=pool_recycle,
                    pool_pre_ping=True,
                    pool_size=pool_size,
                    max_overflow=max_overflow,
                )
            self._session = sessionmaker(autocommit=False, autoflush=False, bind=self._engine)
            self.init_app_event(app=app)

        def init_app_event(self, app: FastAPI):

            @app.on_event("startup")
            def startup():
                self._engine.connect()
                print("DB connection success")

            @app.on_event("shutdown")
            def shutdown():
                self._session.close_all()
                self._engine.dispose()
                print("DB connection close")

        def session(self):
            db_session = self._session()
            try:
                yield db_session   # yield 를 이용하여, db 연결 성공 한 경우, 세션 시작 
            finally:
                db_session.close()  # db 세션이 시작 된 후, api 호출이 마무리 되면 db 세션을 닫아준다. 

        @property
        def engine(self):
            return self._engine


    db = DBConnection()

     

      create_egine : 인자값으로 db_url을 추가하면, db host에 db 연결을 생성한다.

     

    ▶  sessionmaker: 호출 되면 세션을 생성해준다.

     

        ▷ autocommit : api가 호출로, db 의 내용이 변경되는 경우 -> 자동 commit하며 변경할지에 대한 여부를 결정한다. False로 지정한 경우 ->  update, insert, delete 등으로 인해 내용 변경 되었을 때, 수동적으로 commit 을 해야 한다. 

     

        ▷ authoflush: 호출되면서 commit 되지 않은 부분의 내역을 삭제할지의 여부를 정하는 부분

     

        ▷ bind : 어떤 엔진을 통해 db 연결을 할지 결정하는 부분이다.  MySQL, PostgresSQL, MSSQL 등 여러 SQL의 DB URL중 어느 SQL 상품으로 연결을 진행할지 선택하는 부분이다. 현재는 개발,운영, 테스트 등의 환경만 분리했으나, SQL을 여러 종류를 쓰는 경우에는 경우에 맞게 나눠줘야 한다. 

     

     

    3. config.py 

     

     
     # config.py
     
    import os
    from os import path
    from platform import system
    from typing import Optional

    from pydantic import BaseSettings
     
    class Settings(BaseSettings):
        BASE_DIR: str = path.dirname((path.abspath(__file__)))
        LOCAL_MODE: bool = (
            True if system().lower().startswith("darwin") or system().lower().startswith("windows") else False
        )
     
        DB_URL: str = ""
        DB_POOL_RECYCLE: Optional[int] = 900
        DB_ECHO: Optional[bool] = True
        DB_POOL_SIZE: Optional[int] = 1
        DB_MAX_OVERFLOW: Optional[int] = 1
     
     
    class DevSettings(Settings):
       
        DB_URL =f"mysql+pymysql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_DATABASE}"
        DB_POOL_SIZE = 5
        DB_MAX_OVERFLOW = 10

     

    def get_env():
        cfg_cls = dict(
            dev=DevSettings
        )
        env = cfg_cls[os.getenv("FASTAPI_ENV", "dev")]()

        return env


    settings = get_env()
     

     

    get_env() 에 들어있는 db 설정 클래스이다.

     

    필자는 mariadb를 지원하기 때문에 url에서 mysql+pymysql을 앞에 적어주고, db 접속 내용을 뒤에 추가로 기입하였다. 

     

    위는 사용하는 db에 따라 내용이 달라지며, db url 의 변경 -> create_engine 부분 또한 변경된다. 

     

    python main.py 를 통해 실행했을 때 아래와 같이 결과가 나온다면 연결된 것이다!

     

Designed by Tistory.