Backend/FastAPI

FastAPI에서 DB Connection-mysql 연결하기

amelia-suyeon 2023. 9. 21. 14:48

필자는 mysql 설치 완료 후, FastAPI에서 mysql을 연결하여 사용하려 한다.

 

먼저, 이 프로젝트의 구조는 아래와 같다.

 

 

api : 메인 로직 작성 (mvc 패턴에서 controller 역활과 같다.), router(url 경로에 따라 포함하고 있다.)

 

db : db 연결, db 세션 관리 

 

model : db 컬럼 정의, 그 외 모델들을 정의

 

middlewares : 토큰 및 accesscontrol 정의

 

schemas: type 정의

 

utils: 공통 함수 정의 

 

__init__.py: 해당 폴더가 패키지의 일부 즉, 연결 된다는 것을 나타냄. 파일 자체에 작성되어 있는 것은 없음

 

 

본격, DB 연결하기

 

필자의 경우, main.py에 설정하였다. 

 

1. main.py

 

 
 # main.py
 
  import logging

  import uvicorn
  from fastapi import FastAPI
  from fastapi.security import HTTPBearer
  from sqlalchemy import text
 
  from config import get_env
  from app.db.connection import db
 

  HTTP_BEARER = HTTPBearer(auto_error=False)


  def start_app():
      env = get_env()
      db.init_db(app=app, **env.dict())

      return app


  app = start_app()

  if __name__ == "__main__":
      uvicorn.run("main:start_app", host="0.0.0.0", port=8000, reload=True, factory=True)

 

connection 모듈 속의 init_db() 함수 호출 -> db 연결 테스트 

 

2. app/db/connection.py

 

 
 # connection.py
 
from fastapi import FastAPI
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker


class DBConnection:
    def __init__(self):
        self._engine = None
        self._session = None
        self._read_session = None

    def init_db(self, app: FastAPI, **kwargs):
        db_url = kwargs.get("DB_URL")
        print(db_url," db url")  # db 연결 되지 않을 때, 확인 하기 위함 
        pool_recycle = kwargs.get("DB_POOL_RECYCLE")
        db_echo = kwargs.get("DB_ECHO")
        pool_size = kwargs.get("DB_POOL_SIZE")
        max_overflow = kwargs.get("DB_MAX_OVERFLOW")
        self._engine = create_engine(
                db_url,
                echo=db_echo,
                pool_recycle=pool_recycle,
                pool_pre_ping=True,
                pool_size=pool_size,
                max_overflow=max_overflow,
            )
        self._session = sessionmaker(autocommit=False, autoflush=False, bind=self._engine)
        self.init_app_event(app=app)

    def init_app_event(self, app: FastAPI):

        @app.on_event("startup")
        def startup():
            self._engine.connect()
            print("DB connection success")

        @app.on_event("shutdown")
        def shutdown():
            self._session.close_all()
            self._engine.dispose()
            print("DB connection close")

    def session(self):
        db_session = self._session()
        try:
            yield db_session   # yield 를 이용하여, db 연결 성공 한 경우, 세션 시작 
        finally:
            db_session.close()  # db 세션이 시작 된 후, api 호출이 마무리 되면 db 세션을 닫아준다. 

    @property
    def engine(self):
        return self._engine


db = DBConnection()

 

  create_egine : 인자값으로 db_url을 추가하면, db host에 db 연결을 생성한다.

 

▶  sessionmaker: 호출 되면 세션을 생성해준다.

 

    ▷ autocommit : api가 호출로, db 의 내용이 변경되는 경우 -> 자동 commit하며 변경할지에 대한 여부를 결정한다. False로 지정한 경우 ->  update, insert, delete 등으로 인해 내용 변경 되었을 때, 수동적으로 commit 을 해야 한다. 

 

    ▷ authoflush: 호출되면서 commit 되지 않은 부분의 내역을 삭제할지의 여부를 정하는 부분

 

    ▷ bind : 어떤 엔진을 통해 db 연결을 할지 결정하는 부분이다.  MySQL, PostgresSQL, MSSQL 등 여러 SQL의 DB URL중 어느 SQL 상품으로 연결을 진행할지 선택하는 부분이다. 현재는 개발,운영, 테스트 등의 환경만 분리했으나, SQL을 여러 종류를 쓰는 경우에는 경우에 맞게 나눠줘야 한다. 

 

 

3. config.py 

 

 
 # config.py
 
import os
from os import path
from platform import system
from typing import Optional

from pydantic import BaseSettings
 
class Settings(BaseSettings):
    BASE_DIR: str = path.dirname((path.abspath(__file__)))
    LOCAL_MODE: bool = (
        True if system().lower().startswith("darwin") or system().lower().startswith("windows") else False
    )
 
    DB_URL: str = ""
    DB_POOL_RECYCLE: Optional[int] = 900
    DB_ECHO: Optional[bool] = True
    DB_POOL_SIZE: Optional[int] = 1
    DB_MAX_OVERFLOW: Optional[int] = 1
 
 
class DevSettings(Settings):
   
    DB_URL =f"mysql+pymysql://{DB_USERNAME}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_DATABASE}"
    DB_POOL_SIZE = 5
    DB_MAX_OVERFLOW = 10

 

def get_env():
    cfg_cls = dict(
        dev=DevSettings
    )
    env = cfg_cls[os.getenv("FASTAPI_ENV", "dev")]()

    return env


settings = get_env()
 

 

get_env() 에 들어있는 db 설정 클래스이다.

 

필자는 mariadb를 지원하기 때문에 url에서 mysql+pymysql을 앞에 적어주고, db 접속 내용을 뒤에 추가로 기입하였다. 

 

위는 사용하는 db에 따라 내용이 달라지며, db url 의 변경 -> create_engine 부분 또한 변경된다. 

 

python main.py 를 통해 실행했을 때 아래와 같이 결과가 나온다면 연결된 것이다!