필자는 mysql 설치 완료 후, FastAPI에서 mysql을 연결하여 사용하려 한다.
먼저, 이 프로젝트의 구조는 아래와 같다.
api : 메인 로직 작성 (mvc 패턴에서 controller 역활과 같다.), router(url 경로에 따라 포함하고 있다.)
db : db 연결, db 세션 관리
model : db 컬럼 정의, 그 외 모델들을 정의
middlewares : 토큰 및 accesscontrol 정의
schemas : type 정의
utils : 공통 함수 정의
__init__.py: 해당 폴더가 패키지의 일부 즉, 연결 된다는 것을 나타냄. 파일 자체에 작성되어 있는 것은 없음
본격, DB 연결하기
필자의 경우, main.py에 설정하였다.
1. main.py
# main.py
import logging
import uvicorn
from fastapi import FastAPI
from fastapi . security import HTTPBearer
from sqlalchemy import text
from config import get_env
from app . db . connection import db
HTTP_BEARER = HTTPBearer ( auto_error = False )
def start_app ():
env = get_env ()
db . init_db ( app = app , ** env . dict ())
return app
app = start_app ()
if __name__ == "__main__" :
uvicorn . run ( "main:start_app" , host = "0.0.0.0" , port = 8000 , reload = True , factory = True )
connection 모듈 속의 init_db() 함수 호출 -> db 연결 테스트
2. app/db/connection.py
# connection.py
from fastapi import FastAPI
from sqlalchemy import create_engine
from sqlalchemy . orm import sessionmaker
class DBConnection :
def __init__ ( self ):
self . _engine = None
self . _session = None
self . _read_session = None
def init_db ( self , app : FastAPI , ** kwargs ):
db_url = kwargs . get ( "DB_URL" )
print ( db_url , " db url" ) # db 연결 되지 않을 때, 확인 하기 위함
pool_recycle = kwargs . get ( "DB_POOL_RECYCLE" )
db_echo = kwargs . get ( "DB_ECHO" )
pool_size = kwargs . get ( "DB_POOL_SIZE" )
max_overflow = kwargs . get ( "DB_MAX_OVERFLOW" )
self . _engine = create_engine (
db_url ,
echo = db_echo ,
pool_recycle = pool_recycle ,
pool_pre_ping = True ,
pool_size = pool_size ,
max_overflow = max_overflow ,
)
self . _session = sessionmaker ( autocommit = False , autoflush = False , bind = self . _engine )
self . init_app_event ( app = app )
def init_app_event ( self , app : FastAPI ):
@ app . on_event ( "startup" )
def startup ():
self . _engine . connect ()
print ( "DB connection success" )
@ app . on_event ( "shutdown" )
def shutdown ():
self . _session . close_all ()
self . _engine . dispose ()
print ( "DB connection close" )
def session ( self ):
db_session = self . _session ()
try :
yield db_session # yield 를 이용하여, db 연결 성공 한 경우, 세션 시작
finally :
db_session . close () # db 세션이 시작 된 후, api 호출이 마무리 되면 db 세션을 닫아준다.
@ property
def engine ( self ):
return self . _engine
db = DBConnection ()
▶ create_egine : 인자값으로 db_url을 추가하면, db host에 db 연결을 생성한다.
▶ sessionmaker : 호출 되면 세션을 생성해준다.
▷ autocommit : api가 호출로, db 의 내용이 변경되는 경우 -> 자동 commit하며 변경할지에 대한 여부를 결정한다. False로 지정한 경우 -> update, insert, delete 등으로 인해 내용 변경 되었을 때, 수동적으로 commit 을 해야 한다.
▷ authoflush : 호출되면서 commit 되지 않은 부분의 내역을 삭제할지의 여부를 정하는 부분
▷ bind : 어떤 엔진을 통해 db 연결을 할지 결정하는 부분이다. MySQL, PostgresSQL, MSSQL 등 여러 SQL의 DB URL중 어느 SQL 상품으로 연결을 진행할지 선택하는 부분이다. 현재는 개발,운영, 테스트 등의 환경만 분리했으나, SQL을 여러 종류를 쓰는 경우에는 경우에 맞게 나눠줘야 한다.
3. config.py
# config.py
import os
from os import path
from platform import system
from typing import Optional
from pydantic import BaseSettings
class Settings ( BaseSettings ):
BASE_DIR : str = path . dirname (( path . abspath ( __file__ )))
LOCAL_MODE : bool = (
True if system (). lower (). startswith ( "darwin" ) or system (). lower (). startswith ( "windows" ) else False
)
DB_URL : str = ""
DB_POOL_RECYCLE : Optional [ int ] = 900
DB_ECHO : Optional [ bool ] = True
DB_POOL_SIZE : Optional [ int ] = 1
DB_MAX_OVERFLOW : Optional [ int ] = 1
class DevSettings ( Settings ):
DB_URL =f"mysql+pymysql:// {DB_USERNAME} : {DB_PASSWORD} @ {DB_HOST} : {DB_PORT} / {DB_DATABASE} "
DB_POOL_SIZE = 5
DB_MAX_OVERFLOW = 10
def get_env ():
cfg_cls = dict (
dev = DevSettings
)
env = cfg_cls [ os . getenv ( "FASTAPI_ENV" , "dev" )]()
return env
settings = get_env ()
get_env() 에 들어있는 db 설정 클래스이다.
필자는 mariadb를 지원하기 때문에 url에서 mysql+pymysql을 앞에 적어주고, db 접속 내용을 뒤에 추가로 기입하였다.
위는 사용하는 db에 따라 내용이 달라지며, db url 의 변경 -> create_engine 부분 또한 변경된다.
python main.py 를 통해 실행했을 때 아래와 같이 결과가 나온다면 연결된 것이다!