# db.py import os import logging from sqlalchemy import create_engine, event, exc, pool from sqlalchemy.orm import sessionmaker, scoped_session import yaml logger = logging.getLogger(__name__) # 프로젝트 루트 경로 설정 BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) CONFIG_PATH = os.path.join(BASE_DIR, 'conf', 'config.yaml') def load_config(path=CONFIG_PATH): """설정 파일 로드 (환경변수 우선)""" try: # config.yaml 로드 with open(path, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) if not config: raise ValueError(f"설정 파일이 비어있음: {path}") # 환경변수로 데이터베이스 설정 덮어쓰기 if os.getenv('DB_HOST'): config.setdefault('database', {}) config['database']['host'] = os.getenv('DB_HOST', config['database'].get('host')) config['database']['user'] = os.getenv('DB_USER', config['database'].get('user')) config['database']['password'] = os.getenv('DB_PASSWORD', config['database'].get('password')) config['database']['name'] = os.getenv('DB_NAME', config['database'].get('name')) return config except FileNotFoundError: logger.error(f"설정 파일을 찾을 수 없음: {path}") raise except yaml.YAMLError as e: logger.error(f"YAML 파싱 오류: {e}") raise config = load_config() db_cfg = config.get('database', {}) # DB URL 구성 db_url = ( f"mysql+pymysql://{db_cfg.get('user')}:" f"{db_cfg.get('password')}@{db_cfg.get('host')}/" f"{db_cfg.get('name')}?charset=utf8mb4" ) # MySQL 엔진 생성 (재연결 및 연결 풀 설정) engine = create_engine( db_url, poolclass=pool.QueuePool, pool_pre_ping=True, # 연결 전 핸들 확인 pool_recycle=3600, # 3600초(1시간)마다 재연결 pool_size=10, # 연결 풀 크기 max_overflow=20, # 추가 오버플로우 연결 수 echo=False, # SQL 출력 (디버그용) connect_args={ 'connect_timeout': 10, 'charset': 'utf8mb4' } ) # 연결 에러 발생 시 자동 재연결 @event.listens_for(pool.Pool, "connect") def receive_connect(dbapi_conn, connection_record): """DB 연결 성공 로그""" logger.debug("DB 연결 성공") @event.listens_for(pool.Pool, "checkout") def receive_checkout(dbapi_conn, connection_record, connection_proxy): """연결 풀에서 체크아웃할 때""" pass @event.listens_for(pool.Pool, "checkin") def receive_checkin(dbapi_conn, connection_record): """연결 풀로 반환할 때""" pass # 세션 팩토리 Session = sessionmaker(bind=engine) SessionLocal = scoped_session(Session) def get_engine(): """엔진 반환""" return engine def get_session(): """새로운 세션 반환""" session = Session() try: # 연결 테스트 session.execute('SELECT 1') except exc.DatabaseError as e: logger.error(f"DB 연결 실패: {e}") session.close() raise return session def get_scoped_session(): """스코프 세션 반환 (스레드 안전)""" return SessionLocal def close_session(): """세션 종료""" SessionLocal.remove() class DBSession: """컨텍스트 매니저를 사용한 세션 관리""" def __init__(self): self.session = None def __enter__(self): self.session = get_session() return self.session def __exit__(self, exc_type, exc_val, exc_tb): if self.session: if exc_type: self.session.rollback() logger.error(f"트랜잭션 롤백: {exc_type.__name__}: {exc_val}") else: self.session.commit() self.session.close()