# getweekdata.py import os, sys from datetime import datetime, timedelta import pandas as pd from sqlalchemy import select, and_ from tabulate import tabulate # pip install tabulate 필요 # 상위 폴더의 모듈 불러오기 sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from conf.db import get_session from conf.db_schema import pos # === 검색 조건 변수 === TARGET_CA01 = '매표소' TARGET_CA03_LIST = ['입장료', '티켓', '기업제휴'] PRODUCT_NAMES = ['투어패스', '뮤지엄헤이'] def get_date_ranges(): today = datetime.today().date() # 이번 달 시작~끝 first_day_month = today.replace(day=1) next_month = first_day_month.replace(day=28) + timedelta(days=4) last_day_month = next_month - timedelta(days=next_month.day) # 지난 주 월~일 last_week_sun = today - timedelta(days=today.weekday() + 1) last_week_mon = last_week_sun - timedelta(days=6) return { 'this_month': (first_day_month, last_day_month), 'last_week': (last_week_mon, last_week_sun) } def fetch_data(session, start_date, end_date, product_keyword): stmt = select(pos).where( and_( pos.c.date.between(start_date, end_date), pos.c.ca01.contains(TARGET_CA01), pos.c.ca03.in_(TARGET_CA03_LIST), pos.c.name.contains(product_keyword) ) ) result = session.execute(stmt).fetchall() return pd.DataFrame([row._mapping for row in result]) if result else pd.DataFrame() def main(): date_ranges = get_date_ranges() session = get_session() today = datetime.today().date() year_start = today.replace(month=1, day=1) year_end = today.replace(month=12, day=31) # 수집용 딕셔너리 summary = { "금주": [], "금월": [], "누계": [] } # --- 금주 --- last_week_start, last_week_end = date_ranges['last_week'] week_vals = [] for product in PRODUCT_NAMES: df = fetch_data(session, last_week_start, last_week_end, product) week_vals.append(df['qty'].sum() if not df.empty else 0) summary["금주"] = week_vals + [sum(week_vals)] # --- 금월 --- this_month_start, this_month_end = date_ranges['this_month'] month_vals = [] for product in PRODUCT_NAMES: df = fetch_data(session, this_month_start, this_month_end, product) month_vals.append(df['qty'].sum() if not df.empty else 0) summary["금월"] = month_vals + [sum(month_vals)] # --- 누계 (2025년 전체) --- year_vals = [] for product in PRODUCT_NAMES: df = fetch_data(session, year_start, year_end, product) year_vals.append(df['qty'].sum() if not df.empty else 0) summary["누계"] = year_vals + [sum(year_vals)] session.close() # --- 출력 --- table_data = [] for key in ["금주", "금월", "누계"]: table_data.append([key] + summary[key]) headers = ["구분", "경기투어패스", "뮤지엄헤이", "계"] print(tabulate(table_data, headers=headers, tablefmt="simple", colalign=("center", "right", "right", "right"))) if __name__ == '__main__': main()