import os import sys import pandas as pd import shutil from datetime import datetime from sqlalchemy import Table, MetaData from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.exc import SQLAlchemyError sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from lib.common import get_logger from conf import db, db_schema # get_engine, get_session 포함 logger = get_logger("POS_UPS") DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../data")) FINISH_DIR = os.path.join(DATA_DIR, "finish") os.makedirs(FINISH_DIR, exist_ok=True) def nan_to_none(value): import pandas as pd if pd.isna(value): return None return value def load_excel_data(filepath: str): df = pd.read_excel(filepath, header=1) # 2행이 header, 3행부터 데이터 # 컬럼명 공백 제거 등 정리 df.columns = [col.strip() for col in df.columns] # 필수 컬럼 체크 required_cols = ['영수증 번호', '품목명'] for col in required_cols: if col not in df.columns: raise ValueError(f"필수 컬럼 누락: {col}") df = df.dropna(subset=required_cols) return df def process_file(filepath: str, engine, session, table): try: df = load_excel_data(filepath) logger.info(f"[LOAD] {os.path.basename(filepath)} - {len(df)}건") inserted, updated, errors = 0, 0, 0 for idx, row in df.iterrows(): data = None # 미리 초기화 try: data = { "sale_date": pd.to_datetime(row["매출일시"]), "shop_name": str(row["매장명"]).strip(), "pos_no": str(row["포스"]).strip(), "bill_no": str(row["영수증 번호"]).strip(), "product_cd": str(row["품목"]).strip(), "product_name": str(row["품목명"]).strip(), "qty": int(row["수량"]), "ca01": nan_to_none(row.get("대분류", None)), "ca02": nan_to_none(row.get("중분류", None)), "ca03": nan_to_none(row.get("소분류", None)), "barcode": nan_to_none(row.get("바코드", None)), "amt": int(row.get("단가", 0)), "tot_sale_amt": int(row.get("주문 금액", 0)), "dc_amt": int(row.get("할인 금액", 0)), "dcm_sale_amt": int(row.get("공급가액", 0)), "vat_amt": int(row.get("세금", 0)), "net_amt": int(row.get("결제 금액", 0)), "cash_receipt": int(row.get("현금영수증", 0)), "card": int(row.get("카드", 0)), } stmt = mysql_insert(table).values(**data) update_cols = {col.name: stmt.inserted[col.name] for col in table.columns if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']} upsert_stmt = stmt.on_duplicate_key_update(update_cols) result = session.execute(upsert_stmt) if result.rowcount == 1: inserted += 1 elif result.rowcount == 2: updated += 1 except Exception as e: if data is not None: logger.warning(f"[ERROR:ROW] {e} / 데이터: {data}") else: logger.warning(f"[ERROR:ROW] {e} / 데이터가 생성되지 않음") errors += 1 if (idx + 1) % 1000 == 0: logger.info(f"[PROGRESS] {idx + 1} / {len(df)} 처리 중...") session.commit() logger.info(f"[DONE] 삽입: {inserted}, 업데이트: {updated}, 오류: {errors}") shutil.move(filepath, os.path.join(FINISH_DIR, os.path.basename(filepath))) logger.info(f"[MOVE] 완료: {os.path.join(FINISH_DIR, os.path.basename(filepath))}") except Exception as e: logger.error(f"[FAIL] 파일 처리 중 오류 발생 - {e}") session.rollback() def main(): engine = db.get_engine() session = db.get_session() metadata = MetaData() table = Table( db_schema.get_full_table_name("pos_ups_billdata"), metadata, autoload_with=engine ) files = [os.path.join(DATA_DIR, f) for f in os.listdir(DATA_DIR) if f.endswith(".xlsx") and f.startswith("영수증별 상세매출")] logger.info(f"[INFO] 처리할 파일 {len(files)}개") for file in sorted(files): logger.info(f"[START] {os.path.basename(file)}") process_file(file, engine, session, table) if __name__ == "__main__": main()