# =================================================================== # core/database.py # FGTools 데이터베이스 연결 관리 모듈 # =================================================================== # SQLAlchemy를 사용하여 MySQL/MariaDB 연결을 관리합니다. # 연결 풀링, 자동 재연결, 컨텍스트 매니저를 지원합니다. # =================================================================== """ 데이터베이스 연결 관리 모듈 SQLAlchemy 기반의 MySQL/MariaDB 연결 관리를 제공합니다. 연결 풀링, 자동 재연결, 트랜잭션 관리를 지원합니다. 사용 예시: from core.database import DBSession, get_engine # 컨텍스트 매니저 사용 (권장) with DBSession() as session: result = session.execute(text("SELECT 1")) # 엔진 직접 사용 engine = get_engine() """ import logging from typing import Optional from contextlib import contextmanager from sqlalchemy import create_engine, text, event, exc from sqlalchemy.pool import QueuePool from sqlalchemy.orm import sessionmaker, scoped_session, Session from .config import get_config logger = logging.getLogger(__name__) # 엔진 및 세션 팩토리 캐시 _engine = None _session_factory = None _scoped_session = None def _build_db_url() -> str: """ 데이터베이스 연결 URL 생성 Returns: SQLAlchemy 연결 URL 문자열 """ config = get_config() db = config.database return ( f"mysql+pymysql://{db['user']}:{db['password']}" f"@{db['host']}/{db['name']}?charset={db['charset']}" ) def get_engine(): """ SQLAlchemy 엔진 반환 (싱글톤) 연결 풀 설정: - pool_size: 기본 연결 수 (10) - max_overflow: 추가 가능한 연결 수 (20) - pool_recycle: 연결 재활용 시간 (1시간) - pool_pre_ping: 연결 전 상태 확인 Returns: SQLAlchemy Engine 인스턴스 """ global _engine if _engine is None: db_url = _build_db_url() _engine = create_engine( db_url, poolclass=QueuePool, pool_pre_ping=True, # 연결 전 핸들 확인 (끊어진 연결 방지) pool_recycle=3600, # 1시간마다 연결 재생성 pool_size=10, # 기본 연결 풀 크기 max_overflow=20, # 추가 오버플로우 연결 수 echo=get_config().debug, # 디버그 모드에서만 SQL 출력 connect_args={ 'connect_timeout': 10, 'charset': 'utf8mb4' } ) # 연결 이벤트 리스너 등록 _setup_event_listeners() logger.info("데이터베이스 엔진 초기화 완료") return _engine def _setup_event_listeners(): """SQLAlchemy 이벤트 리스너 설정""" @event.listens_for(_engine, "connect") def on_connect(dbapi_conn, connection_record): """데이터베이스 연결 성공 시 호출""" logger.debug("DB 연결 성공") @event.listens_for(_engine, "checkout") def on_checkout(dbapi_conn, connection_record, connection_proxy): """연결 풀에서 연결을 가져올 때 호출""" logger.debug("DB 연결 체크아웃") @event.listens_for(_engine, "checkin") def on_checkin(dbapi_conn, connection_record): """연결을 풀에 반환할 때 호출""" logger.debug("DB 연결 체크인") def get_session_factory(): """ 세션 팩토리 반환 Returns: SQLAlchemy sessionmaker 인스턴스 """ global _session_factory if _session_factory is None: _session_factory = sessionmaker(bind=get_engine()) return _session_factory def get_scoped_session(): """ 스레드 안전한 스코프 세션 반환 멀티스레드 환경에서 각 스레드가 독립적인 세션을 사용하도록 보장합니다. Returns: SQLAlchemy scoped_session 인스턴스 """ global _scoped_session if _scoped_session is None: _scoped_session = scoped_session(get_session_factory()) return _scoped_session def get_session() -> Session: """ 새로운 데이터베이스 세션 생성 Returns: SQLAlchemy Session 인스턴스 Raises: exc.DatabaseError: 연결 실패 시 """ session = get_session_factory()() try: # 연결 테스트 session.execute(text('SELECT 1')) except exc.DatabaseError as e: logger.error(f"DB 연결 실패: {e}") session.close() raise return session def close_session(): """스코프 세션 종료 및 정리""" global _scoped_session if _scoped_session is not None: _scoped_session.remove() class DBSession: """ 데이터베이스 세션 컨텍스트 매니저 with 문을 사용하여 세션의 생성, 커밋/롤백, 종료를 자동으로 관리합니다. 사용 예시: with DBSession() as session: result = session.execute(text("SELECT * FROM users")) for row in result: print(row) Attributes: session: SQLAlchemy Session 인스턴스 """ def __init__(self, auto_commit: bool = True): """ Args: auto_commit: 정상 종료 시 자동 커밋 여부 (기본: True) """ self.session: Optional[Session] = None self.auto_commit = auto_commit def __enter__(self) -> Session: """세션 생성 및 반환""" self.session = get_session() return self.session def __exit__(self, exc_type, exc_val, exc_tb): """ 세션 종료 처리 예외 발생 시 롤백, 정상 종료 시 커밋(auto_commit=True인 경우) """ if self.session is not None: if exc_type is not None: # 예외 발생 시 롤백 self.session.rollback() logger.error(f"트랜잭션 롤백: {exc_type.__name__}: {exc_val}") elif self.auto_commit: # 정상 종료 시 커밋 try: self.session.commit() except Exception as e: self.session.rollback() logger.error(f"커밋 실패, 롤백 수행: {e}") raise self.session.close() # 예외 전파하지 않음 (False 반환) return False @contextmanager def db_transaction(): """ 데이터베이스 트랜잭션 컨텍스트 매니저 (함수형) DBSession 클래스와 동일한 기능을 함수형으로 제공합니다. 사용 예시: with db_transaction() as session: session.execute(text("INSERT INTO users (name) VALUES (:name)"), {"name": "test"}) Yields: SQLAlchemy Session 인스턴스 """ session = get_session() try: yield session session.commit() except Exception as e: session.rollback() logger.error(f"트랜잭션 실패: {e}") raise finally: session.close() def dispose_engine(): """ 엔진 및 연결 풀 정리 애플리케이션 종료 시 또는 연결 초기화가 필요할 때 호출합니다. """ global _engine, _session_factory, _scoped_session if _scoped_session is not None: _scoped_session.remove() _scoped_session = None if _engine is not None: _engine.dispose() _engine = None _session_factory = None logger.info("데이터베이스 엔진 정리 완료")