import os import sys import pandas as pd import shutil import threading from queue import Queue from sqlalchemy import Table, MetaData from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.exc import SQLAlchemyError sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from lib.common import get_logger from conf import db, db_schema # get_engine, get_session 포함 logger = get_logger("POS_UPS") DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../data")) FINISH_DIR = os.path.join(DATA_DIR, "finish") os.makedirs(FINISH_DIR, exist_ok=True) def nan_to_none(value): import pandas as pd if pd.isna(value): return None return value def load_excel_data(filepath: str): df = pd.read_excel(filepath, header=1) # 2행이 header, 3행부터 데이터 df.columns = [col.strip() for col in df.columns] required_cols = ['영수증 번호', '품목명'] for col in required_cols: if col not in df.columns: raise ValueError(f"필수 컬럼 누락: {col}") df = df.dropna(subset=required_cols) return df def prepare_bulk_data(df): bulk_data = [] for idx, row in df.iterrows(): try: data = { "sale_date": pd.to_datetime(row["매출일시"]), "shop_name": str(row["매장명"]).strip(), "pos_no": str(row["포스"]).strip(), "bill_no": str(row["영수증 번호"]).strip(), "product_cd": str(row["품목"]).strip(), "product_name": str(row["품목명"]).strip(), "qty": int(row["수량"]), "ca01": nan_to_none(row.get("대분류", None)), "ca02": nan_to_none(row.get("중분류", None)), "ca03": nan_to_none(row.get("소분류", None)), "barcode": nan_to_none(row.get("바코드", None)), "amt": int(row.get("단가", 0)), "tot_sale_amt": int(row.get("주문 금액", 0)), "dc_amt": int(row.get("할인 금액", 0)), "dcm_sale_amt": int(row.get("공급가액", 0)), "vat_amt": int(row.get("세금", 0)), "net_amt": int(row.get("결제 금액", 0)), "cash_receipt": int(row.get("현금영수증", 0)), "card": int(row.get("카드", 0)), } bulk_data.append(data) except Exception as e: logger.warning(f"[ERROR:ROW] 데이터 생성 실패: {e} / 인덱스: {idx}") return bulk_data def process_bulk_upsert(bulk_data, session, table, batch_size=1000): """ 데이터 일괄 삽입/업데이트 Args: bulk_data: 삽입할 데이터 리스트 session: DB 세션 table: 대상 테이블 batch_size: 배치 크기 Returns: int: 삽입된 행 수 """ if not bulk_data: logger.info("[INFO] 삽입할 데이터가 없습니다.") return 0 total = len(bulk_data) inserted_total = 0 for start in range(0, total, batch_size): batch = bulk_data[start:start+batch_size] insert_stmt = mysql_insert(table).values(batch) update_cols = {c.name: insert_stmt.inserted[c.name] for c in table.columns if c.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']} upsert_stmt = insert_stmt.on_duplicate_key_update(update_cols) try: result = session.execute(upsert_stmt) session.commit() inserted_total += len(batch) logger.info(f"[PROGRESS] {inserted_total} / {total} 건 처리 완료") except SQLAlchemyError as e: session.rollback() logger.error(f"[FAIL] batch upsert 실패: {e}") raise logger.info(f"[DONE] 총 {total}건 처리 완료 (insert+update)") return inserted_total def file_reader(queue, files): """파일 읽기 스레드""" for filepath in files: try: logger.info(f"[READ] {os.path.basename(filepath)} 읽기 시작") df = load_excel_data(filepath) logger.info(f"[READ] {os.path.basename(filepath)} 읽기 완료 - {len(df)}건") bulk_data = prepare_bulk_data(df) queue.put((filepath, bulk_data)) except Exception as e: logger.error(f"[FAIL] {os.path.basename(filepath)} 읽기 실패 - {e}") queue.put(None) # 종료 신호 def db_writer(queue, session, table): """DB 쓰기 스레드""" while True: item = queue.get() if item is None: break filepath, bulk_data = item logger.info(f"[START] {os.path.basename(filepath)} DB 삽입 시작") try: process_bulk_upsert(bulk_data, session, table) dest = os.path.join(FINISH_DIR, os.path.basename(filepath)) shutil.move(filepath, dest) logger.info(f"[MOVE] 완료: {dest}") except Exception as e: logger.error(f"[FAIL] {os.path.basename(filepath)} DB 삽입 실패 - {e}") def process_upsolution_file(filepath): """ UPSOLUTION 파일을 처리하고 DB에 저장 웹 업로드 인터페이스에서 사용하는 함수 Args: filepath (str): 업로드된 파일 경로 Returns: dict: { 'success': bool, 'message': str, 'rows_inserted': int } """ try: logger.info(f"[WEB] UPSOLUTION 파일 처리 시작: {filepath}") # 파일 읽기 df = load_excel_data(filepath) logger.info(f"[WEB] 데이터 읽기 완료: {len(df)}행") # 데이터 준비 bulk_data = prepare_bulk_data(df) logger.info(f"[WEB] 데이터 준비 완료: {len(bulk_data)}행") # DB 세션 및 테이블 가져오기 session = db.get_session() engine = db.get_engine() metadata = MetaData() table = Table( db_schema.get_full_table_name("pos_ups_billdata"), metadata, autoload_with=engine ) # 데이터 삽입 inserted = process_bulk_upsert(bulk_data, session, table) session.close() logger.info(f"[WEB] UPSOLUTION 파일 처리 완료: {inserted}행 삽입") return { 'success': True, 'message': f'{inserted}행이 저장되었습니다.', 'rows_inserted': inserted } except Exception as e: logger.error(f"[WEB] UPSOLUTION 파일 처리 오류: {e}", exc_info=True) return { 'success': False, 'message': f'파일 처리 중 오류: {str(e)}', 'rows_inserted': 0 } def main(): engine = db.get_engine() session = db.get_session() metadata = MetaData() table = Table( db_schema.get_full_table_name("pos_ups_billdata"), metadata, autoload_with=engine ) files = [os.path.join(DATA_DIR, f) for f in os.listdir(DATA_DIR) if f.endswith(".xlsx") and f.startswith("영수증별 상세매출")] logger.info(f"[INFO] 처리할 파일 {len(files)}개") queue = Queue(maxsize=3) # 2개 정도 여유 있게 reader_thread = threading.Thread(target=file_reader, args=(queue, files)) writer_thread = threading.Thread(target=db_writer, args=(queue, session, table)) reader_thread.start() writer_thread.start() reader_thread.join() writer_thread.join() if __name__ == "__main__": main()