import sys, os sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) import yaml import requests import xml.etree.ElementTree as ET from datetime import date, datetime, timedelta from sqlalchemy import select, insert, delete # config.yaml 경로 및 로딩 BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) CONFIG_PATH = os.path.join(BASE_DIR, 'conf', 'config.yaml') with open(CONFIG_PATH, 'r', encoding='utf-8') as f: cfg = yaml.safe_load(f) SERVICE_KEY = cfg['DATA_API']['serviceKey'] # DB 관련 from conf import db, db_schema holiday_table = db_schema.holiday def fetch_holiday_api_xml(year): """특일정보 API 호출 (XML)""" url = "http://apis.data.go.kr/B090041/openapi/service/SpcdeInfoService/getRestDeInfo" params = { 'serviceKey': SERVICE_KEY, 'solYear': str(year), 'numOfRows': 100, 'pageNo': 1, 'type': 'xml', } try: resp = requests.get(url, params=params, timeout=10) resp.raise_for_status() full_url = resp.url xml_root = ET.fromstring(resp.content) items = [] body = xml_root.find('.//body') items_node = body.find('items') if body is not None else None if items_node is None: print(f"[WARN] {year}년 특일정보 없음 or 응답 형식 이상") return full_url, [] for item in items_node.findall('item'): locdate = item.findtext('locdate') dateName = item.findtext('dateName') if locdate and dateName: items.append({'locdate': locdate, 'dateName': dateName}) return full_url, items except requests.exceptions.RequestException as e: print(f"[ERROR] {year}년 특일정보 API 요청 실패: {e}") return None, [] except ET.ParseError as e: print(f"[ERROR] {year}년 XML 파싱 실패: {e}") return None, [] def holiday_exists(year): """DB에 해당 연도 특일 정보 존재 여부 확인""" session = db.get_session() try: stmt = select(holiday_table).where( holiday_table.c.date.between(f"{year}0101", f"{year}1231") ) return session.execute(stmt).first() is not None finally: session.close() def insert_holidays(items): """특일정보 DB 삽입 (중복 제거 후 삽입)""" session = db.get_session() try: for item in items: date_str = item.get('locdate') name = item.get('dateName') if date_str and name: session.execute( delete(holiday_table).where(holiday_table.c.date == date_str) ) session.execute( insert(holiday_table).values(date=date_str, name=name) ) session.commit() except Exception as e: session.rollback() print(f"[ERROR] 특일정보 DB 삽입 실패: {e}") finally: session.close() def update_holidays_for_year(year): """해당 연도의 특일정보 API에서 조회 후 DB에 저장""" url, items = fetch_holiday_api_xml(year) if url: print(f"📡 호출 URL: {url}") if items: insert_holidays(items) print(f"✅ {year}년 특일 {len(items)}건 저장 완료") else: print(f"⚠️ {year}년 특일정보 없음 또는 실패") def init_holidays(): """2017년부터 현재 연도까지 특일정보 저장""" this_year = datetime.now().year for y in range(2017, this_year): if not holiday_exists(y): print(f"📅 {y}년 → 특일정보 없음 → API 호출") update_holidays_for_year(y) else: print(f"✅ {y}년 → 특일정보 이미 존재함") print(f"🔄 {this_year}년은 매번 최신 갱신") update_holidays_for_year(this_year) def is_korean_holiday(dt: date) -> bool: """주어진 날짜가 특일인지 여부""" session = db.get_session() try: date_str = dt.strftime("%Y%m%d") stmt = select(holiday_table).where(holiday_table.c.date == date_str) return session.execute(stmt).first() is not None finally: session.close() def get_holiday_dates(start_date: date, end_date: date) -> set[date]: """특정 기간 내의 휴일 목록 반환""" session = db.get_session() try: stmt = select(holiday_table.c.date).where( holiday_table.c.date.between(start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d")) ) results = session.execute(stmt).scalars().all() return set(datetime.strptime(d, "%Y%m%d").date() for d in results) finally: session.close() def get_weekday_dates(start_date: date, end_date: date) -> set[date]: """특정 기간 중 평일(월~금 & 비휴일) 목록 반환""" holiday_dates = get_holiday_dates(start_date, end_date) result = set() curr = start_date while curr <= end_date: if curr.weekday() < 5 and curr not in holiday_dates: # 월(0)~금(4) result.add(curr) curr += timedelta(days=1) return result if __name__ == "__main__": print("📌 휴일 테스트 시작") init_holidays() from datetime import date start = date(2025, 1, 1) end = date(2025, 12, 31) holidays = get_holiday_dates(start, end) print(f"🔍 {start} ~ {end} 사이 휴일 {len(holidays)}건") for d in sorted(holidays): print(" -", d)