diff --git a/lib/pos_update_upsolution.py b/lib/pos_update_upsolution.py new file mode 100644 index 0000000..d33883c --- /dev/null +++ b/lib/pos_update_upsolution.py @@ -0,0 +1,128 @@ +import os +import sys +import pandas as pd +import shutil +from datetime import datetime +from sqlalchemy import Table, MetaData +from sqlalchemy.dialects.mysql import insert as mysql_insert +from sqlalchemy.exc import SQLAlchemyError + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from lib.common import get_logger +from conf import db, db_schema # get_engine, get_session 포함 + +logger = get_logger("POS_UPS") + +DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../data")) +FINISH_DIR = os.path.join(DATA_DIR, "finish") +os.makedirs(FINISH_DIR, exist_ok=True) + + +def nan_to_none(value): + import pandas as pd + if pd.isna(value): + return None + return value + + +def load_excel_data(filepath: str): + df = pd.read_excel(filepath, header=1) # 2행이 header, 3행부터 데이터 + # 컬럼명 공백 제거 등 정리 + df.columns = [col.strip() for col in df.columns] + # 필수 컬럼 체크 + required_cols = ['영수증 번호', '품목명'] + for col in required_cols: + if col not in df.columns: + raise ValueError(f"필수 컬럼 누락: {col}") + df = df.dropna(subset=required_cols) + return df + + +def process_file(filepath: str, engine, session, table): + try: + # sale_date는 엑셀 내부 각 행별로 존재하므로, 파일명 기준 추출는 제외 + df = load_excel_data(filepath) + logger.info(f"[LOAD] {os.path.basename(filepath)} - {len(df)}건") + + inserted, updated, errors = 0, 0, 0 + + for _, row in df.iterrows(): + try: + data = { + "sale_date": pd.to_datetime(row["매출일시"]), + "shop_name": str(row.get("매장명", "")).strip(), + "pos_no": int(row.get("포스번호", 0)), + "bill_no": int(row.get("영수증 번호", 0)), + "product_cd": str(row.get("품목", "")).strip(), + "ca01": nan_to_none(row.get("대분류")), + "ca02": nan_to_none(row.get("중분류")), + "ca03": nan_to_none(row.get("소분류")), + "product_name": str(row.get("품목명", "")).strip(), + "barcode": nan_to_none(row.get("바코드")), + "amt": int(row.get("단가", 0)), + "qty": int(row.get("수량", 0)), + "tot_sale_amt": int(row.get("주문 금액", 0)), + "dc_amt": int(row.get("할인 금액", 0)), + "dcm_sale_amt": int(row.get("공급가액", 0)), + "net_amt": int(row.get("세금", 0)), + "vat_amt": int(row.get("부가세", 0)), + "cash_receipt": int(row.get("현금영수증", 0)), + "card": int(row.get("카드", 0)), + } + + stmt = mysql_insert(table).values(**data) + update_stmt = stmt.on_duplicate_key_update({ + col.name: stmt.inserted[col.name] + for col in table.columns + if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd'] + }) + result = session.execute(update_stmt) + + # rowcount 1이면 insert, 2면 update (MySQL 특성상) + if result.rowcount == 1: + inserted += 1 + elif result.rowcount == 2: + updated += 1 + + except Exception as e: + logger.warning(f"[ERROR:ROW] {e}") + errors += 1 + + session.commit() + logger.info(f"[DONE] 삽입: {inserted}, 업데이트: {updated}, 오류: {errors}") + + shutil.move(filepath, os.path.join(FINISH_DIR, os.path.basename(filepath))) + logger.info(f"[MOVE] 완료: {os.path.join(FINISH_DIR, os.path.basename(filepath))}") + + except SQLAlchemyError as e: + logger.error(f"[FAIL] DB 처리 중 오류 발생 - 롤백: {e}") + session.rollback() + except Exception as e: + logger.error(f"[FAIL] 파일 처리 중 오류 발생 - {e}") + session.rollback() + + +def main(): + engine = db.get_engine() + session = db.get_session() + + metadata = MetaData() + table = Table( + db_schema.get_full_table_name("pos_ups_billdata"), + metadata, + autoload_with=engine + ) + + files = [os.path.join(DATA_DIR, f) for f in os.listdir(DATA_DIR) + if f.endswith(".xlsx") and f.startswith("영수증별 상세매출")] + + logger.info(f"[INFO] 처리할 파일 {len(files)}개") + + for file in sorted(files): + logger.info(f"[START] {os.path.basename(file)}") + process_file(file, engine, session, table) + + +if __name__ == "__main__": + main()