diff --git a/lib/pos_update_upsolution.py b/lib/pos_update_upsolution.py index cab9656..29e6022 100644 --- a/lib/pos_update_upsolution.py +++ b/lib/pos_update_upsolution.py @@ -2,7 +2,8 @@ import os import sys import pandas as pd import shutil -from datetime import datetime +import threading +from queue import Queue from sqlalchemy import Table, MetaData from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.exc import SQLAlchemyError @@ -28,9 +29,7 @@ def nan_to_none(value): def load_excel_data(filepath: str): df = pd.read_excel(filepath, header=1) # 2행이 header, 3행부터 데이터 - # 컬럼명 공백 제거 등 정리 df.columns = [col.strip() for col in df.columns] - # 필수 컬럼 체크 required_cols = ['영수증 번호', '품목명'] for col in required_cols: if col not in df.columns: @@ -39,88 +38,88 @@ def load_excel_data(filepath: str): return df -def process_file(filepath: str, engine, session, table, batch_size=500): - try: - df = load_excel_data(filepath) - logger.info(f"[LOAD] {os.path.basename(filepath)} - {len(df)}건") +def prepare_bulk_data(df): + bulk_data = [] + for idx, row in df.iterrows(): + try: + data = { + "sale_date": pd.to_datetime(row["매출일시"]), + "shop_name": str(row["매장명"]).strip(), + "pos_no": str(row["포스"]).strip(), + "bill_no": str(row["영수증 번호"]).strip(), + "product_cd": str(row["품목"]).strip(), + "product_name": str(row["품목명"]).strip(), + "qty": int(row["수량"]), - inserted, updated, errors = 0, 0, 0 - batch_data = [] - - for idx, row in df.iterrows(): - data = None - try: - data = { - "sale_date": pd.to_datetime(row["매출일시"]), - "shop_name": str(row["매장명"]).strip(), - "pos_no": str(row["포스"]).strip(), - "bill_no": str(row["영수증 번호"]).strip(), - "product_cd": str(row["품목"]).strip(), - "product_name": str(row["품목명"]).strip(), - "qty": int(row["수량"]), - - "ca01": nan_to_none(row.get("대분류", None)), - "ca02": nan_to_none(row.get("중분류", None)), - "ca03": nan_to_none(row.get("소분류", None)), - "barcode": nan_to_none(row.get("바코드", None)), - "amt": int(row.get("단가", 0)), - "tot_sale_amt": int(row.get("주문 금액", 0)), - "dc_amt": int(row.get("할인 금액", 0)), - "dcm_sale_amt": int(row.get("공급가액", 0)), - "vat_amt": int(row.get("세금", 0)), - "net_amt": int(row.get("결제 금액", 0)), - "cash_receipt": int(row.get("현금영수증", 0)), - "card": int(row.get("카드", 0)), - } - batch_data.append(data) - - except Exception as e: - if data is not None: - logger.warning(f"[ERROR:ROW] {e} / 데이터: {data}") - else: - logger.warning(f"[ERROR:ROW] {e} / 데이터가 생성되지 않음") - errors += 1 - - # 배치 크기 도달시 DB에 한번에 처리 - if len(batch_data) >= batch_size: - stmt = mysql_insert(table) - update_cols = { - col.name: stmt.inserted[col.name] - for col in table.columns - if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd'] - } - upsert_stmt = stmt.on_duplicate_key_update(update_cols) - result = session.execute(upsert_stmt, batch_data) - session.commit() - - # rowcount가 정확하지 않을 수 있으므로 임시로 inserted 개수만 처리 - inserted += len(batch_data) - logger.info(f"[BATCH] {idx + 1} / {len(df)} 처리 중... (총 삽입: {inserted}, 오류: {errors})") - batch_data = [] - - # 남은 잔여 데이터 처리 - if batch_data: - stmt = mysql_insert(table) - update_cols = { - col.name: stmt.inserted[col.name] - for col in table.columns - if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd'] + "ca01": nan_to_none(row.get("대분류", None)), + "ca02": nan_to_none(row.get("중분류", None)), + "ca03": nan_to_none(row.get("소분류", None)), + "barcode": nan_to_none(row.get("바코드", None)), + "amt": int(row.get("단가", 0)), + "tot_sale_amt": int(row.get("주문 금액", 0)), + "dc_amt": int(row.get("할인 금액", 0)), + "dcm_sale_amt": int(row.get("공급가액", 0)), + "vat_amt": int(row.get("세금", 0)), + "net_amt": int(row.get("결제 금액", 0)), + "cash_receipt": int(row.get("현금영수증", 0)), + "card": int(row.get("카드", 0)), } - upsert_stmt = stmt.on_duplicate_key_update(update_cols) - result = session.execute(upsert_stmt, batch_data) - session.commit() + bulk_data.append(data) + except Exception as e: + logger.warning(f"[ERROR:ROW] 데이터 생성 실패: {e} / 인덱스: {idx}") + return bulk_data - inserted += len(batch_data) - logger.info(f"[BATCH] 최종 {len(batch_data)}건 처리 완료 (총 삽입: {inserted}, 오류: {errors})") - logger.info(f"[DONE] 삽입: {inserted}, 오류: {errors}") +def process_bulk_upsert(bulk_data, session, table): + if not bulk_data: + logger.info("[INFO] 삽입할 데이터가 없습니다.") + return - shutil.move(filepath, os.path.join(FINISH_DIR, os.path.basename(filepath))) - logger.info(f"[MOVE] 완료: {os.path.join(FINISH_DIR, os.path.basename(filepath))}") + insert_stmt = mysql_insert(table).values(bulk_data) - except Exception as e: - logger.error(f"[FAIL] 파일 처리 중 오류 발생 - {e}") + update_cols = {c.name: insert_stmt.inserted[c.name] for c in table.columns + if c.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']} + + upsert_stmt = insert_stmt.on_duplicate_key_update(update_cols) + + try: + result = session.execute(upsert_stmt) + session.commit() + logger.info(f"[DONE] 총 {len(bulk_data)}건 처리 완료 (insert+update)") + except SQLAlchemyError as e: session.rollback() + logger.error(f"[FAIL] bulk upsert 실패: {e}") + raise + + +def file_reader(queue, files): + for filepath in files: + try: + logger.info(f"[READ] {os.path.basename(filepath)} 읽기 시작") + df = load_excel_data(filepath) + logger.info(f"[READ] {os.path.basename(filepath)} 읽기 완료 - {len(df)}건") + bulk_data = prepare_bulk_data(df) + queue.put((filepath, bulk_data)) + except Exception as e: + logger.error(f"[FAIL] {os.path.basename(filepath)} 읽기 실패 - {e}") + queue.put(None) # 종료 신호 + + +def db_writer(queue, session, table): + while True: + item = queue.get() + if item is None: + break + filepath, bulk_data = item + logger.info(f"[START] {os.path.basename(filepath)} DB 삽입 시작") + try: + process_bulk_upsert(bulk_data, session, table) + dest = os.path.join(FINISH_DIR, os.path.basename(filepath)) + shutil.move(filepath, dest) + logger.info(f"[MOVE] 완료: {dest}") + except Exception as e: + logger.error(f"[FAIL] {os.path.basename(filepath)} DB 삽입 실패 - {e}") + def main(): engine = db.get_engine() @@ -138,9 +137,16 @@ def main(): logger.info(f"[INFO] 처리할 파일 {len(files)}개") - for file in sorted(files): - logger.info(f"[START] {os.path.basename(file)}") - process_file(file, engine, session, table) + queue = Queue(maxsize=2) # 2개 정도 여유 있게 + + reader_thread = threading.Thread(target=file_reader, args=(queue, files)) + writer_thread = threading.Thread(target=db_writer, args=(queue, session, table)) + + reader_thread.start() + writer_thread.start() + + reader_thread.join() + writer_thread.join() if __name__ == "__main__":