feat: initial commit - unified FGTools from static, weather, mattermost-noti

This commit is contained in:
2025-12-31 09:56:37 +09:00
commit 4ff5dba4b1
29 changed files with 5786 additions and 0 deletions

29
core/__init__.py Normal file
View File

@ -0,0 +1,29 @@
# ===================================================================
# core/__init__.py
# FGTools 핵심 모듈 패키지 초기화
# ===================================================================
# 공통으로 사용되는 핵심 모듈들을 제공합니다:
# - config: 환경설정 관리
# - database: 데이터베이스 연결 및 세션 관리
# - logging_utils: 로깅 유틸리티
# - http_client: HTTP 요청 재시도 클라이언트
# - message_sender: 다중 플랫폼 메시지 발송
# ===================================================================
from .config import Config, get_config
from .database import get_engine, get_session, DBSession
from .logging_utils import setup_logging, get_logger
from .http_client import create_retry_session
from .message_sender import MessageSender
__all__ = [
'Config',
'get_config',
'get_engine',
'get_session',
'DBSession',
'setup_logging',
'get_logger',
'create_retry_session',
'MessageSender',
]

267
core/config.py Normal file
View File

@ -0,0 +1,267 @@
# ===================================================================
# core/config.py
# FGTools 통합 설정 관리 모듈
# ===================================================================
# 환경변수(.env)를 기반으로 모든 설정을 통합 관리합니다.
# python-dotenv를 사용하여 .env 파일을 자동으로 로드합니다.
# ===================================================================
"""
FGTools 설정 관리 모듈
환경변수 기반의 통합 설정 관리를 제공합니다.
.env 파일에서 설정을 로드하고, 기본값 및 타입 변환을 지원합니다.
사용 예시:
from core.config import get_config
config = get_config()
db_host = config.database['host']
"""
import os
import sys
from typing import Any, Dict, List, Optional
from functools import lru_cache
# .env 파일 로드
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
class Config:
"""
환경변수 기반 설정 관리 클래스
모든 설정은 환경변수에서 로드되며, 기본값을 제공합니다.
민감한 정보는 .env 파일에 저장하고 .gitignore에 추가하세요.
"""
def __init__(self):
"""설정 초기화 및 환경변수 로드"""
self._load_all_configs()
def _get_env(self, key: str, default: str = '', required: bool = False) -> str:
"""
환경변수 조회
Args:
key: 환경변수 키
default: 기본값
required: 필수 여부 (True면 없을 시 에러)
Returns:
환경변수 값
Raises:
SystemExit: 필수 환경변수가 없을 경우
"""
value = os.getenv(key, default)
if required and not value:
print(f"[ERROR] 필수 환경변수가 설정되지 않았습니다: {key}")
sys.exit(1)
return value
def _get_bool(self, key: str, default: bool = False) -> bool:
"""환경변수를 불리언으로 변환"""
return self._get_env(key, str(default)).lower() in ('true', '1', 'yes')
def _get_int(self, key: str, default: int = 0) -> int:
"""환경변수를 정수로 변환"""
try:
return int(self._get_env(key, str(default)))
except ValueError:
return default
def _get_float(self, key: str, default: float = 0.0) -> float:
"""환경변수를 실수로 변환"""
try:
return float(self._get_env(key, str(default)))
except ValueError:
return default
def _get_list(self, key: str, default: str = '', separator: str = ',') -> List[str]:
"""환경변수를 리스트로 변환 (구분자로 분리)"""
value = self._get_env(key, default)
return [item.strip() for item in value.split(separator) if item.strip()]
def _get_int_list(self, key: str, default: str = '') -> List[int]:
"""환경변수를 정수 리스트로 변환"""
str_list = self._get_list(key, default)
result = []
for item in str_list:
try:
result.append(int(item))
except ValueError:
continue
return result
def _load_all_configs(self):
"""모든 설정 로드"""
# 공통 설정
self.debug = self._get_bool('DEBUG', False)
self.log_level = self._get_env('LOG_LEVEL', 'INFO')
self.max_workers = self._get_int('MAX_WORKERS', 4)
# 데이터베이스 설정
self.database = {
'host': self._get_env('DB_HOST', 'localhost'),
'user': self._get_env('DB_USER', 'firstgarden'),
'password': self._get_env('DB_PASSWORD', ''),
'name': self._get_env('DB_NAME', 'firstgarden'),
'charset': self._get_env('DB_CHARSET', 'utf8mb4'),
}
self.table_prefix = self._get_env('TABLE_PREFIX', 'fg_manager_static_')
# 공공데이터포털 API 설정
self.data_api = {
'service_key': self._get_env('DATA_API_SERVICE_KEY', ''),
'start_date': self._get_env('DATA_API_START_DATE', '20170101'),
'end_date': self._get_env('DATA_API_END_DATE', '20250701'),
'air_stations': self._get_list('AIR_STATION_NAMES', '운정'),
'weather_station_ids': self._get_int_list('WEATHER_STN_IDS', '99'),
}
# GA4 설정
self.ga4 = {
'api_token': self._get_env('GA4_API_TOKEN', ''),
'property_id': self._get_int('GA4_PROPERTY_ID', 0),
'service_account_file': self._get_env('GA4_SERVICE_ACCOUNT_FILE', './conf/service-account-credentials.json'),
'start_date': self._get_env('GA4_START_DATE', '20170101'),
'end_date': self._get_env('GA4_END_DATE', '20990731'),
'max_rows_per_request': self._get_int('GA4_MAX_ROWS_PER_REQUEST', 10000),
}
# POS 설정
self.pos = {
'visitor_categories': self._get_list('VISITOR_CATEGORIES', '입장료,티켓,기업제휴'),
}
# UPSolution 설정
self.upsolution = {
'id': self._get_env('UPSOLUTION_ID', ''),
'code': self._get_env('UPSOLUTION_CODE', ''),
'pw': self._get_env('UPSOLUTION_PW', ''),
}
# 예측 가중치 설정
self.forecast_weight = {
'visitor_multiplier': self._get_float('FORECAST_VISITOR_MULTIPLIER', 0.5),
'min_temp': self._get_float('FORECAST_WEIGHT_MIN_TEMP', 1.0),
'max_temp': self._get_float('FORECAST_WEIGHT_MAX_TEMP', 1.0),
'precipitation': self._get_float('FORECAST_WEIGHT_PRECIPITATION', 10.0),
'humidity': self._get_float('FORECAST_WEIGHT_HUMIDITY', 1.0),
'pm25': self._get_float('FORECAST_WEIGHT_PM25', 1.0),
'holiday': self._get_int('FORECAST_WEIGHT_HOLIDAY', 20),
}
self.force_update = self._get_bool('FORCE_UPDATE', False)
# Weather 서비스 설정
self.weather_service = {
'service_key': self._get_env('SERVICE_KEY', ''),
}
# FTP 설정
self.ftp = {
'host': self._get_env('FTP_HOST', ''),
'user': self._get_env('FTP_USER', ''),
'password': self._get_env('FTP_PASSWORD', ''),
'upload_dir': self._get_env('FTP_UPLOAD_DIR', ''),
}
# 게시판 설정
self.board = {
'id': self._get_env('BOARD_ID', ''),
'ca_name': self._get_env('BOARD_CA_NAME', ''),
'content': self._get_env('BOARD_CONTENT', ''),
'mb_id': self._get_env('BOARD_MB_ID', ''),
'nickname': self._get_env('BOARD_NICKNAME', ''),
}
# Mattermost 설정
self.mattermost = {
'url': self._get_env('MATTERMOST_URL', ''),
'token': self._get_env('MATTERMOST_TOKEN', ''),
'channel_id': self._get_env('MATTERMOST_CHANNEL_ID', ''),
'webhook_url': self._get_env('MATTERMOST_WEBHOOK_URL', ''),
}
# Telegram 설정
self.telegram = {
'bot_token': self._get_env('TELEGRAM_BOT_TOKEN', ''),
'chat_id': self._get_env('TELEGRAM_CHAT_ID', ''),
}
# Synology Chat 설정
self.synology = {
'chat_url': self._get_env('SYNOLOGY_CHAT_URL', ''),
'chat_token': self._get_env('SYNOLOGY_CHAT_TOKEN', ''),
}
# Notion 설정
self.notion = {
'api_secret': self._get_env('NOTION_API_SECRET', ''),
}
# Flask 설정
self.flask = {
'secret_key': self._get_env('FLASK_SECRET_KEY', 'dev-secret-key'),
'host': self._get_env('FLASK_HOST', '0.0.0.0'),
'port': self._get_int('FLASK_PORT', 5000),
}
def get(self, key: str, default: Any = None) -> Any:
"""
설정값 조회 (딕셔너리 스타일)
Args:
key: 설정 키 (점 표기법 지원, 예: 'database.host')
default: 기본값
Returns:
설정값
"""
keys = key.split('.')
value = self
for k in keys:
if hasattr(value, k):
value = getattr(value, k)
elif isinstance(value, dict) and k in value:
value = value[k]
else:
return default
return value
# 싱글톤 인스턴스
_config_instance: Optional[Config] = None
@lru_cache(maxsize=1)
def get_config() -> Config:
"""
설정 싱글톤 인스턴스 반환
처음 호출 시 Config 인스턴스를 생성하고 캐시합니다.
이후 호출에서는 캐시된 인스턴스를 반환합니다.
Returns:
Config 인스턴스
"""
return Config()
def reload_config() -> Config:
"""
설정 다시 로드 (캐시 무효화)
환경변수가 변경된 후 설정을 다시 로드할 때 사용합니다.
Returns:
새로운 Config 인스턴스
"""
get_config.cache_clear()
return get_config()

280
core/database.py Normal file
View File

@ -0,0 +1,280 @@
# ===================================================================
# core/database.py
# FGTools 데이터베이스 연결 관리 모듈
# ===================================================================
# SQLAlchemy를 사용하여 MySQL/MariaDB 연결을 관리합니다.
# 연결 풀링, 자동 재연결, 컨텍스트 매니저를 지원합니다.
# ===================================================================
"""
데이터베이스 연결 관리 모듈
SQLAlchemy 기반의 MySQL/MariaDB 연결 관리를 제공합니다.
연결 풀링, 자동 재연결, 트랜잭션 관리를 지원합니다.
사용 예시:
from core.database import DBSession, get_engine
# 컨텍스트 매니저 사용 (권장)
with DBSession() as session:
result = session.execute(text("SELECT 1"))
# 엔진 직접 사용
engine = get_engine()
"""
import logging
from typing import Optional
from contextlib import contextmanager
from sqlalchemy import create_engine, text, event, exc
from sqlalchemy.pool import QueuePool
from sqlalchemy.orm import sessionmaker, scoped_session, Session
from .config import get_config
logger = logging.getLogger(__name__)
# 엔진 및 세션 팩토리 캐시
_engine = None
_session_factory = None
_scoped_session = None
def _build_db_url() -> str:
"""
데이터베이스 연결 URL 생성
Returns:
SQLAlchemy 연결 URL 문자열
"""
config = get_config()
db = config.database
return (
f"mysql+pymysql://{db['user']}:{db['password']}"
f"@{db['host']}/{db['name']}?charset={db['charset']}"
)
def get_engine():
"""
SQLAlchemy 엔진 반환 (싱글톤)
연결 풀 설정:
- pool_size: 기본 연결 수 (10)
- max_overflow: 추가 가능한 연결 수 (20)
- pool_recycle: 연결 재활용 시간 (1시간)
- pool_pre_ping: 연결 전 상태 확인
Returns:
SQLAlchemy Engine 인스턴스
"""
global _engine
if _engine is None:
db_url = _build_db_url()
_engine = create_engine(
db_url,
poolclass=QueuePool,
pool_pre_ping=True, # 연결 전 핸들 확인 (끊어진 연결 방지)
pool_recycle=3600, # 1시간마다 연결 재생성
pool_size=10, # 기본 연결 풀 크기
max_overflow=20, # 추가 오버플로우 연결 수
echo=get_config().debug, # 디버그 모드에서만 SQL 출력
connect_args={
'connect_timeout': 10,
'charset': 'utf8mb4'
}
)
# 연결 이벤트 리스너 등록
_setup_event_listeners()
logger.info("데이터베이스 엔진 초기화 완료")
return _engine
def _setup_event_listeners():
"""SQLAlchemy 이벤트 리스너 설정"""
@event.listens_for(_engine, "connect")
def on_connect(dbapi_conn, connection_record):
"""데이터베이스 연결 성공 시 호출"""
logger.debug("DB 연결 성공")
@event.listens_for(_engine, "checkout")
def on_checkout(dbapi_conn, connection_record, connection_proxy):
"""연결 풀에서 연결을 가져올 때 호출"""
logger.debug("DB 연결 체크아웃")
@event.listens_for(_engine, "checkin")
def on_checkin(dbapi_conn, connection_record):
"""연결을 풀에 반환할 때 호출"""
logger.debug("DB 연결 체크인")
def get_session_factory():
"""
세션 팩토리 반환
Returns:
SQLAlchemy sessionmaker 인스턴스
"""
global _session_factory
if _session_factory is None:
_session_factory = sessionmaker(bind=get_engine())
return _session_factory
def get_scoped_session():
"""
스레드 안전한 스코프 세션 반환
멀티스레드 환경에서 각 스레드가 독립적인 세션을 사용하도록 보장합니다.
Returns:
SQLAlchemy scoped_session 인스턴스
"""
global _scoped_session
if _scoped_session is None:
_scoped_session = scoped_session(get_session_factory())
return _scoped_session
def get_session() -> Session:
"""
새로운 데이터베이스 세션 생성
Returns:
SQLAlchemy Session 인스턴스
Raises:
exc.DatabaseError: 연결 실패 시
"""
session = get_session_factory()()
try:
# 연결 테스트
session.execute(text('SELECT 1'))
except exc.DatabaseError as e:
logger.error(f"DB 연결 실패: {e}")
session.close()
raise
return session
def close_session():
"""스코프 세션 종료 및 정리"""
global _scoped_session
if _scoped_session is not None:
_scoped_session.remove()
class DBSession:
"""
데이터베이스 세션 컨텍스트 매니저
with 문을 사용하여 세션의 생성, 커밋/롤백, 종료를 자동으로 관리합니다.
사용 예시:
with DBSession() as session:
result = session.execute(text("SELECT * FROM users"))
for row in result:
print(row)
Attributes:
session: SQLAlchemy Session 인스턴스
"""
def __init__(self, auto_commit: bool = True):
"""
Args:
auto_commit: 정상 종료 시 자동 커밋 여부 (기본: True)
"""
self.session: Optional[Session] = None
self.auto_commit = auto_commit
def __enter__(self) -> Session:
"""세션 생성 및 반환"""
self.session = get_session()
return self.session
def __exit__(self, exc_type, exc_val, exc_tb):
"""
세션 종료 처리
예외 발생 시 롤백, 정상 종료 시 커밋(auto_commit=True인 경우)
"""
if self.session is not None:
if exc_type is not None:
# 예외 발생 시 롤백
self.session.rollback()
logger.error(f"트랜잭션 롤백: {exc_type.__name__}: {exc_val}")
elif self.auto_commit:
# 정상 종료 시 커밋
try:
self.session.commit()
except Exception as e:
self.session.rollback()
logger.error(f"커밋 실패, 롤백 수행: {e}")
raise
self.session.close()
# 예외 전파하지 않음 (False 반환)
return False
@contextmanager
def db_transaction():
"""
데이터베이스 트랜잭션 컨텍스트 매니저 (함수형)
DBSession 클래스와 동일한 기능을 함수형으로 제공합니다.
사용 예시:
with db_transaction() as session:
session.execute(text("INSERT INTO users (name) VALUES (:name)"), {"name": "test"})
Yields:
SQLAlchemy Session 인스턴스
"""
session = get_session()
try:
yield session
session.commit()
except Exception as e:
session.rollback()
logger.error(f"트랜잭션 실패: {e}")
raise
finally:
session.close()
def dispose_engine():
"""
엔진 및 연결 풀 정리
애플리케이션 종료 시 또는 연결 초기화가 필요할 때 호출합니다.
"""
global _engine, _session_factory, _scoped_session
if _scoped_session is not None:
_scoped_session.remove()
_scoped_session = None
if _engine is not None:
_engine.dispose()
_engine = None
_session_factory = None
logger.info("데이터베이스 엔진 정리 완료")

332
core/http_client.py Normal file
View File

@ -0,0 +1,332 @@
# ===================================================================
# core/http_client.py
# FGTools HTTP 클라이언트 유틸리티 모듈
# ===================================================================
# 자동 재시도, 타임아웃, 연결 풀링을 지원하는 HTTP 클라이언트입니다.
# requests 라이브러리 기반으로 안정적인 API 호출을 제공합니다.
# ===================================================================
"""
HTTP 클라이언트 유틸리티 모듈
자동 재시도, 지수 백오프, 타임아웃 설정을 지원하는
안정적인 HTTP 클라이언트를 제공합니다.
사용 예시:
from core.http_client import create_retry_session, get_json
# 재시도 세션 사용
session = create_retry_session(retries=3)
response = session.get("https://api.example.com/data")
# 간편 JSON 요청
data = get_json("https://api.example.com/data", params={"key": "value"})
"""
import time
import logging
from typing import Any, Dict, Optional, Callable
from functools import wraps
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from .logging_utils import get_logger
logger = get_logger(__name__)
# 기본 설정 상수
DEFAULT_TIMEOUT = 30
DEFAULT_RETRIES = 3
DEFAULT_BACKOFF_FACTOR = 0.5
RETRY_STATUS_CODES = [429, 500, 502, 503, 504]
def create_retry_session(
retries: int = DEFAULT_RETRIES,
backoff_factor: float = DEFAULT_BACKOFF_FACTOR,
status_forcelist: Optional[list] = None,
timeout: int = DEFAULT_TIMEOUT
) -> requests.Session:
"""
자동 재시도를 지원하는 requests 세션 생성
실패한 요청을 지수 백오프 방식으로 자동 재시도합니다.
연결 풀링을 통해 다수의 요청을 효율적으로 처리합니다.
Args:
retries: 최대 재시도 횟수
backoff_factor: 재시도 간격 배수 (0.5면 0.5, 1, 2초...)
status_forcelist: 재시도할 HTTP 상태 코드 목록
timeout: 요청 타임아웃 (초)
Returns:
설정된 requests.Session 인스턴스
사용 예시:
session = create_retry_session(retries=5, timeout=60)
response = session.get("https://api.example.com/data")
"""
if status_forcelist is None:
status_forcelist = RETRY_STATUS_CODES
session = requests.Session()
# 재시도 전략 설정
retry_strategy = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"],
raise_on_status=False
)
# HTTP/HTTPS 어댑터에 재시도 전략 적용
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10,
pool_maxsize=10
)
session.mount("http://", adapter)
session.mount("https://", adapter)
# 기본 타임아웃 설정 (request 메서드 래핑)
original_request = session.request
def request_with_timeout(*args, **kwargs):
kwargs.setdefault('timeout', timeout)
return original_request(*args, **kwargs)
session.request = request_with_timeout
return session
def retry_on_exception(
max_retries: int = DEFAULT_RETRIES,
delay: float = 1.0,
backoff: float = 2.0,
exceptions: tuple = (Exception,)
) -> Callable:
"""
함수 실행 실패 시 재시도하는 데코레이터
지정된 예외가 발생하면 지수 백오프 방식으로 재시도합니다.
Args:
max_retries: 최대 재시도 횟수
delay: 초기 재시도 지연 시간 (초)
backoff: 재시도마다 지연 시간 배수
exceptions: 재시도할 예외 타입들
Returns:
데코레이터 함수
사용 예시:
@retry_on_exception(max_retries=3, delay=1.0)
def fetch_data():
return requests.get("https://api.example.com").json()
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
last_exception = None
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt < max_retries - 1:
wait_time = delay * (backoff ** attempt)
logger.warning(
f"{func.__name__} 재시도 {attempt + 1}/{max_retries} "
f"({wait_time:.1f}초 대기): {e}"
)
time.sleep(wait_time)
else:
logger.error(f"{func.__name__} 모든 재시도 실패: {e}")
raise last_exception
return wrapper
return decorator
def get_json(
url: str,
params: Optional[Dict] = None,
headers: Optional[Dict] = None,
timeout: int = DEFAULT_TIMEOUT,
retries: int = DEFAULT_RETRIES
) -> Optional[Dict]:
"""
JSON API 요청 간편 함수
GET 요청을 보내고 JSON 응답을 파싱하여 반환합니다.
실패 시 None을 반환하고 에러를 로깅합니다.
Args:
url: 요청 URL
params: 쿼리 파라미터
headers: 요청 헤더
timeout: 타임아웃 (초)
retries: 최대 재시도 횟수
Returns:
JSON 응답 딕셔너리 또는 None
"""
session = create_retry_session(retries=retries, timeout=timeout)
try:
response = session.get(url, params=params, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"HTTP 요청 실패: {url} - {e}")
return None
except ValueError as e:
logger.error(f"JSON 파싱 실패: {url} - {e}")
return None
finally:
session.close()
def post_json(
url: str,
data: Optional[Dict] = None,
json_data: Optional[Dict] = None,
headers: Optional[Dict] = None,
timeout: int = DEFAULT_TIMEOUT,
retries: int = DEFAULT_RETRIES
) -> Optional[Dict]:
"""
JSON POST 요청 간편 함수
POST 요청을 보내고 JSON 응답을 파싱하여 반환합니다.
Args:
url: 요청 URL
data: form 데이터
json_data: JSON 데이터
headers: 요청 헤더
timeout: 타임아웃 (초)
retries: 최대 재시도 횟수
Returns:
JSON 응답 딕셔너리 또는 None
"""
session = create_retry_session(retries=retries, timeout=timeout)
try:
response = session.post(url, data=data, json=json_data, headers=headers)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"HTTP POST 요청 실패: {url} - {e}")
return None
except ValueError as e:
logger.error(f"JSON 파싱 실패: {url} - {e}")
return None
finally:
session.close()
class APIClient:
"""
API 클라이언트 기본 클래스
특정 API 서비스에 대한 클라이언트를 구현할 때 상속하여 사용합니다.
공통적인 인증, 베이스 URL, 에러 처리 로직을 제공합니다.
사용 예시:
class WeatherAPIClient(APIClient):
def __init__(self, api_key):
super().__init__("https://api.weather.go.kr")
self.api_key = api_key
def get_forecast(self, city):
return self.get("/forecast", params={"city": city, "key": self.api_key})
"""
def __init__(
self,
base_url: str,
default_headers: Optional[Dict] = None,
timeout: int = DEFAULT_TIMEOUT,
retries: int = DEFAULT_RETRIES
):
"""
Args:
base_url: API 기본 URL
default_headers: 기본 요청 헤더
timeout: 기본 타임아웃
retries: 기본 재시도 횟수
"""
self.base_url = base_url.rstrip('/')
self.default_headers = default_headers or {}
self.timeout = timeout
self.session = create_retry_session(retries=retries, timeout=timeout)
def _build_url(self, endpoint: str) -> str:
"""엔드포인트를 포함한 전체 URL 생성"""
if endpoint.startswith('http'):
return endpoint
return f"{self.base_url}/{endpoint.lstrip('/')}"
def _merge_headers(self, headers: Optional[Dict]) -> Dict:
"""기본 헤더와 요청 헤더 병합"""
merged = self.default_headers.copy()
if headers:
merged.update(headers)
return merged
def get(
self,
endpoint: str,
params: Optional[Dict] = None,
headers: Optional[Dict] = None
) -> Optional[requests.Response]:
"""GET 요청"""
url = self._build_url(endpoint)
merged_headers = self._merge_headers(headers)
try:
response = self.session.get(url, params=params, headers=merged_headers)
return response
except requests.exceptions.RequestException as e:
logger.error(f"GET 요청 실패: {url} - {e}")
return None
def post(
self,
endpoint: str,
data: Optional[Dict] = None,
json_data: Optional[Dict] = None,
headers: Optional[Dict] = None
) -> Optional[requests.Response]:
"""POST 요청"""
url = self._build_url(endpoint)
merged_headers = self._merge_headers(headers)
try:
response = self.session.post(
url, data=data, json=json_data, headers=merged_headers
)
return response
except requests.exceptions.RequestException as e:
logger.error(f"POST 요청 실패: {url} - {e}")
return None
def close(self):
"""세션 종료"""
self.session.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
return False

248
core/logging_utils.py Normal file
View File

@ -0,0 +1,248 @@
# ===================================================================
# core/logging_utils.py
# FGTools 로깅 유틸리티 모듈
# ===================================================================
# 일관된 로그 포맷과 설정을 제공하는 로깅 유틸리티입니다.
# 파일 및 콘솔 출력, 로그 레벨 설정을 지원합니다.
# ===================================================================
"""
로깅 유틸리티 모듈
일관된 로그 포맷과 핸들러 설정을 제공합니다.
콘솔 출력, 파일 저장, 로그 로테이션을 지원합니다.
사용 예시:
from core.logging_utils import get_logger, setup_logging
# 간단한 로거 사용
logger = get_logger(__name__)
logger.info("메시지")
# 상세 설정이 필요한 경우
logger = setup_logging("my_module", level="DEBUG", log_file="app.log")
"""
import os
import sys
import logging
from typing import Optional
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
from .config import get_config
# 로그 포맷 상수
DEFAULT_FORMAT = '[%(asctime)s] %(name)s - %(levelname)s: %(message)s'
DETAILED_FORMAT = '[%(asctime)s] %(name)s (%(filename)s:%(lineno)d) - %(levelname)s: %(message)s'
DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
# 로그 디렉토리
LOG_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'logs')
def _ensure_log_dir():
"""로그 디렉토리 생성"""
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR, exist_ok=True)
def setup_logging(
name: str,
level: Optional[str] = None,
log_file: Optional[str] = None,
log_format: str = DEFAULT_FORMAT,
max_bytes: int = 10 * 1024 * 1024, # 10MB
backup_count: int = 5,
console_output: bool = True
) -> logging.Logger:
"""
로거 설정 및 반환
일관된 포맷으로 로거를 설정합니다. 콘솔 출력과 파일 저장을
동시에 지원하며, 로그 로테이션을 자동으로 처리합니다.
Args:
name: 로거 이름 (보통 __name__ 사용)
level: 로그 레벨 (DEBUG, INFO, WARNING, ERROR, CRITICAL)
None이면 환경 설정에서 로드
log_file: 로그 파일명 (None이면 파일 저장 안 함)
log_format: 로그 메시지 포맷
max_bytes: 로그 파일 최대 크기 (로테이션 기준)
backup_count: 보관할 백업 파일 수
console_output: 콘솔 출력 여부
Returns:
설정된 Logger 인스턴스
"""
# 설정에서 기본 레벨 로드
if level is None:
try:
level = get_config().log_level
except Exception:
level = 'INFO'
logger = logging.getLogger(name)
# 이미 핸들러가 있으면 레벨만 설정하고 반환
if logger.handlers:
logger.setLevel(getattr(logging, level.upper(), logging.INFO))
return logger
# 로그 레벨 설정
log_level = getattr(logging, level.upper(), logging.INFO)
logger.setLevel(log_level)
# 포매터 생성
formatter = logging.Formatter(log_format, datefmt=DATE_FORMAT)
# 콘솔 핸들러 추가
if console_output:
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(log_level)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# 파일 핸들러 추가 (요청 시)
if log_file:
_ensure_log_dir()
file_path = os.path.join(LOG_DIR, log_file)
file_handler = RotatingFileHandler(
file_path,
maxBytes=max_bytes,
backupCount=backup_count,
encoding='utf-8'
)
file_handler.setLevel(log_level)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# 부모 로거로 전파 방지
logger.propagate = False
return logger
def get_logger(name: str) -> logging.Logger:
"""
간편 로거 반환
기본 설정으로 로거를 반환합니다. 이미 설정된 로거가 있으면 재사용합니다.
Args:
name: 로거 이름 (보통 __name__ 사용)
Returns:
Logger 인스턴스
"""
return setup_logging(name)
def setup_file_logger(
name: str,
log_file: str,
level: str = 'INFO',
rotation: str = 'size', # 'size' 또는 'time'
when: str = 'midnight', # rotation='time'일 때 사용
interval: int = 1, # rotation='time'일 때 사용
) -> logging.Logger:
"""
파일 전용 로거 설정
콘솔 출력 없이 파일에만 로그를 기록합니다.
크기 기반 또는 시간 기반 로테이션을 선택할 수 있습니다.
Args:
name: 로거 이름
log_file: 로그 파일명
level: 로그 레벨
rotation: 로테이션 방식 ('size' 또는 'time')
when: 시간 기반 로테이션 주기 (midnight, H, D, W0-W6)
interval: 로테이션 간격
Returns:
설정된 Logger 인스턴스
"""
_ensure_log_dir()
logger = logging.getLogger(name)
if logger.handlers:
return logger
log_level = getattr(logging, level.upper(), logging.INFO)
logger.setLevel(log_level)
formatter = logging.Formatter(DETAILED_FORMAT, datefmt=DATE_FORMAT)
file_path = os.path.join(LOG_DIR, log_file)
if rotation == 'time':
# 시간 기반 로테이션 (예: 매일 자정)
handler = TimedRotatingFileHandler(
file_path,
when=when,
interval=interval,
backupCount=30,
encoding='utf-8'
)
else:
# 크기 기반 로테이션
handler = RotatingFileHandler(
file_path,
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5,
encoding='utf-8'
)
handler.setLevel(log_level)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.propagate = False
return logger
class LogContext:
"""
로그 컨텍스트 관리자
특정 작업의 시작과 종료를 자동으로 로깅합니다.
작업 소요 시간도 함께 기록됩니다.
사용 예시:
with LogContext(logger, "데이터 처리"):
process_data()
# 출력:
# [시작] 데이터 처리
# [완료] 데이터 처리 (소요시간: 1.23초)
"""
def __init__(self, logger: logging.Logger, task_name: str, level: int = logging.INFO):
"""
Args:
logger: Logger 인스턴스
task_name: 작업 이름
level: 로그 레벨
"""
self.logger = logger
self.task_name = task_name
self.level = level
self.start_time = None
def __enter__(self):
"""작업 시작 로깅"""
import time
self.start_time = time.time()
self.logger.log(self.level, f"[시작] {self.task_name}")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""작업 종료 로깅"""
import time
elapsed = time.time() - self.start_time
if exc_type is not None:
self.logger.error(f"[실패] {self.task_name} - {exc_type.__name__}: {exc_val}")
else:
self.logger.log(self.level, f"[완료] {self.task_name} (소요시간: {elapsed:.2f}초)")
return False # 예외 전파

331
core/message_sender.py Normal file
View File

@ -0,0 +1,331 @@
# ===================================================================
# core/message_sender.py
# FGTools 다중 플랫폼 메시지 발송 모듈
# ===================================================================
# Mattermost, Telegram, Synology Chat 등 다양한 플랫폼으로
# 메시지를 발송하는 통합 클래스를 제공합니다.
# ===================================================================
"""
다중 플랫폼 메시지 발송 모듈
Mattermost, Telegram, Synology Chat 등 다양한 메시징 플랫폼으로
메시지를 발송하는 기능을 제공합니다.
사용 예시:
from core.message_sender import MessageSender, send_notification
# 클래스 직접 사용
sender = MessageSender.from_config()
sender.send("알림 메시지", platforms=['mattermost', 'telegram'])
# 간편 함수 사용
send_notification("알림 메시지", platforms=['mattermost'])
"""
import logging
from typing import List, Optional, Union
import requests
from .config import get_config
from .logging_utils import get_logger
logger = get_logger(__name__)
class MessageSender:
"""
다중 플랫폼 메시지 발송 클래스
Mattermost, Telegram, Synology Chat 등 여러 플랫폼으로
동시에 메시지를 발송할 수 있습니다.
Attributes:
mattermost_url: Mattermost 서버 URL
mattermost_token: Mattermost Bot 토큰
mattermost_channel_id: Mattermost 채널 ID
mattermost_webhook_url: Mattermost 웹훅 URL (선택)
telegram_bot_token: Telegram Bot 토큰
telegram_chat_id: Telegram 채팅 ID
synology_webhook_url: Synology Chat 웹훅 URL
"""
def __init__(
self,
mattermost_url: str = "",
mattermost_token: str = "",
mattermost_channel_id: str = "",
mattermost_webhook_url: str = "",
telegram_bot_token: str = "",
telegram_chat_id: str = "",
synology_webhook_url: str = ""
):
"""
메시지 발송자 초기화
Args:
mattermost_url: Mattermost 서버 URL (예: https://mattermost.example.com)
mattermost_token: Mattermost Bot 인증 토큰
mattermost_channel_id: 메시지를 보낼 채널 ID
mattermost_webhook_url: 웹훅 URL (웹훅 방식 사용 시)
telegram_bot_token: Telegram Bot API 토큰
telegram_chat_id: Telegram 채팅방 ID
synology_webhook_url: Synology Chat 웹훅 URL
"""
# Mattermost 설정
self.mattermost_url = mattermost_url.rstrip('/') if mattermost_url else ""
self.mattermost_token = mattermost_token
self.mattermost_channel_id = mattermost_channel_id
self.mattermost_webhook_url = mattermost_webhook_url
# Telegram 설정
self.telegram_bot_token = telegram_bot_token
self.telegram_chat_id = telegram_chat_id
# Synology Chat 설정
self.synology_webhook_url = synology_webhook_url
@classmethod
def from_config(cls) -> 'MessageSender':
"""
설정 파일에서 메시지 발송자 생성
환경변수에서 모든 메시징 설정을 로드하여 인스턴스를 생성합니다.
Returns:
설정이 적용된 MessageSender 인스턴스
"""
config = get_config()
return cls(
mattermost_url=config.mattermost.get('url', ''),
mattermost_token=config.mattermost.get('token', ''),
mattermost_channel_id=config.mattermost.get('channel_id', ''),
mattermost_webhook_url=config.mattermost.get('webhook_url', ''),
telegram_bot_token=config.telegram.get('bot_token', ''),
telegram_chat_id=config.telegram.get('chat_id', ''),
synology_webhook_url=config.synology.get('chat_url', ''),
)
def send(
self,
message: str,
platforms: Optional[Union[str, List[str]]] = None,
use_webhook: bool = False
) -> bool:
"""
지정된 플랫폼으로 메시지 발송
여러 플랫폼에 동시에 메시지를 보낼 수 있습니다.
Args:
message: 발송할 메시지 내용
platforms: 발송할 플랫폼 목록 (['mattermost', 'telegram', 'synology'])
None이면 모든 설정된 플랫폼으로 발송
use_webhook: Mattermost 웹훅 사용 여부 (API 대신)
Returns:
모든 발송 성공 시 True, 하나라도 실패 시 False
"""
# 플랫폼이 지정되지 않으면 설정된 모든 플랫폼으로 발송
if platforms is None:
platforms = self._get_configured_platforms()
if not platforms:
logger.warning("전송할 플랫폼이 지정되지 않았습니다.")
return False
# 문자열이면 리스트로 변환
if isinstance(platforms, str):
platforms = [platforms]
success = True
for platform in platforms:
platform_lower = platform.lower()
if platform_lower == "mattermost":
result = self._send_to_mattermost(message, use_webhook)
elif platform_lower == "telegram":
result = self._send_to_telegram(message)
elif platform_lower == "synology":
result = self._send_to_synology(message)
else:
logger.error(f"지원하지 않는 플랫폼: {platform}")
result = False
if not result:
success = False
return success
def _get_configured_platforms(self) -> List[str]:
"""설정된 플랫폼 목록 반환"""
platforms = []
if self.mattermost_url and (self.mattermost_token or self.mattermost_webhook_url):
platforms.append('mattermost')
if self.telegram_bot_token and self.telegram_chat_id:
platforms.append('telegram')
if self.synology_webhook_url:
platforms.append('synology')
return platforms
def _send_to_mattermost(self, message: str, use_webhook: bool = False) -> bool:
"""
Mattermost로 메시지 발송
웹훅 또는 API 방식으로 메시지를 발송합니다.
Args:
message: 발송할 메시지
use_webhook: 웹훅 사용 여부 (False면 API 사용)
Returns:
발송 성공 여부
"""
try:
if use_webhook and self.mattermost_webhook_url:
# 웹훅 방식
response = requests.post(
self.mattermost_webhook_url,
json={"text": message},
headers={"Content-Type": "application/json"},
timeout=10
)
else:
# API 방식
if not self._validate_mattermost_config():
return False
url = f"{self.mattermost_url}/api/v4/posts"
headers = {
"Authorization": f"Bearer {self.mattermost_token}",
"Content-Type": "application/json"
}
payload = {
"channel_id": self.mattermost_channel_id,
"message": message
}
response = requests.post(url, json=payload, headers=headers, timeout=10)
if response.status_code in [200, 201]:
logger.info("Mattermost 메시지 전송 완료")
return True
else:
logger.error(f"Mattermost 전송 실패: {response.status_code} - {response.text}")
return False
except requests.exceptions.Timeout:
logger.error("Mattermost 전송 타임아웃")
return False
except requests.exceptions.RequestException as e:
logger.error(f"Mattermost 전송 예외: {e}")
return False
def _validate_mattermost_config(self) -> bool:
"""Mattermost API 설정 검증"""
if not self.mattermost_url or not self.mattermost_url.startswith(('http://', 'https://')):
logger.error(f"Mattermost URL이 유효하지 않습니다: {self.mattermost_url}")
return False
if not self.mattermost_token:
logger.error("Mattermost 토큰이 설정되지 않았습니다.")
return False
if not self.mattermost_channel_id:
logger.error("Mattermost 채널 ID가 설정되지 않았습니다.")
return False
return True
def _send_to_telegram(self, message: str) -> bool:
"""
Telegram으로 메시지 발송
Args:
message: 발송할 메시지
Returns:
발송 성공 여부
"""
if not self.telegram_bot_token or not self.telegram_chat_id:
logger.error("Telegram 설정이 완료되지 않았습니다.")
return False
try:
url = f"https://api.telegram.org/bot{self.telegram_bot_token}/sendMessage"
payload = {
"chat_id": self.telegram_chat_id,
"text": message,
"parse_mode": "Markdown"
}
response = requests.post(url, data=payload, timeout=10)
if response.status_code == 200:
logger.info("Telegram 메시지 전송 완료")
return True
else:
logger.error(f"Telegram 전송 실패: {response.status_code} - {response.text}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"Telegram 전송 예외: {e}")
return False
def _send_to_synology(self, message: str) -> bool:
"""
Synology Chat으로 메시지 발송
Args:
message: 발송할 메시지
Returns:
발송 성공 여부
"""
if not self.synology_webhook_url:
logger.error("Synology Chat 웹훅 URL이 설정되지 않았습니다.")
return False
try:
payload = {"text": message}
headers = {"Content-Type": "application/json"}
response = requests.post(
self.synology_webhook_url,
json=payload,
headers=headers,
timeout=10
)
if response.status_code == 200:
logger.info("Synology Chat 메시지 전송 완료")
return True
else:
logger.error(f"Synology Chat 전송 실패: {response.status_code} - {response.text}")
return False
except requests.exceptions.RequestException as e:
logger.error(f"Synology Chat 전송 예외: {e}")
return False
def send_notification(
message: str,
platforms: Optional[Union[str, List[str]]] = None,
use_webhook: bool = False
) -> bool:
"""
알림 메시지 발송 간편 함수
설정 파일에서 자동으로 설정을 로드하여 메시지를 발송합니다.
Args:
message: 발송할 메시지
platforms: 발송할 플랫폼 목록 (None이면 모든 설정된 플랫폼)
use_webhook: Mattermost 웹훅 사용 여부
Returns:
발송 성공 여부
사용 예시:
send_notification("서버 점검 알림", platforms=['mattermost'])
"""
sender = MessageSender.from_config()
return sender.send(message, platforms=platforms, use_webhook=use_webhook)