diff --git a/lib/pos_update_upsolution.py b/lib/pos_update_upsolution.py index b5e99d8..cab9656 100644 --- a/lib/pos_update_upsolution.py +++ b/lib/pos_update_upsolution.py @@ -39,15 +39,16 @@ def load_excel_data(filepath: str): return df -def process_file(filepath: str, engine, session, table): +def process_file(filepath: str, engine, session, table, batch_size=500): try: df = load_excel_data(filepath) logger.info(f"[LOAD] {os.path.basename(filepath)} - {len(df)}건") inserted, updated, errors = 0, 0, 0 + batch_data = [] for idx, row in df.iterrows(): - data = None # 미리 초기화 + data = None try: data = { "sale_date": pd.to_datetime(row["매출일시"]), @@ -71,19 +72,7 @@ def process_file(filepath: str, engine, session, table): "cash_receipt": int(row.get("현금영수증", 0)), "card": int(row.get("카드", 0)), } - - - stmt = mysql_insert(table).values(**data) - update_cols = {col.name: stmt.inserted[col.name] for col in table.columns - if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']} - upsert_stmt = stmt.on_duplicate_key_update(update_cols) - - result = session.execute(upsert_stmt) - - if result.rowcount == 1: - inserted += 1 - elif result.rowcount == 2: - updated += 1 + batch_data.append(data) except Exception as e: if data is not None: @@ -92,11 +81,39 @@ def process_file(filepath: str, engine, session, table): logger.warning(f"[ERROR:ROW] {e} / 데이터가 생성되지 않음") errors += 1 - if (idx + 1) % 1000 == 0: - logger.info(f"[PROGRESS] {idx + 1} / {len(df)} 처리 중...") + # 배치 크기 도달시 DB에 한번에 처리 + if len(batch_data) >= batch_size: + stmt = mysql_insert(table) + update_cols = { + col.name: stmt.inserted[col.name] + for col in table.columns + if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd'] + } + upsert_stmt = stmt.on_duplicate_key_update(update_cols) + result = session.execute(upsert_stmt, batch_data) + session.commit() - session.commit() - logger.info(f"[DONE] 삽입: {inserted}, 업데이트: {updated}, 오류: {errors}") + # rowcount가 정확하지 않을 수 있으므로 임시로 inserted 개수만 처리 + inserted += len(batch_data) + logger.info(f"[BATCH] {idx + 1} / {len(df)} 처리 중... (총 삽입: {inserted}, 오류: {errors})") + batch_data = [] + + # 남은 잔여 데이터 처리 + if batch_data: + stmt = mysql_insert(table) + update_cols = { + col.name: stmt.inserted[col.name] + for col in table.columns + if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd'] + } + upsert_stmt = stmt.on_duplicate_key_update(update_cols) + result = session.execute(upsert_stmt, batch_data) + session.commit() + + inserted += len(batch_data) + logger.info(f"[BATCH] 최종 {len(batch_data)}건 처리 완료 (총 삽입: {inserted}, 오류: {errors})") + + logger.info(f"[DONE] 삽입: {inserted}, 오류: {errors}") shutil.move(filepath, os.path.join(FINISH_DIR, os.path.basename(filepath))) logger.info(f"[MOVE] 완료: {os.path.join(FINISH_DIR, os.path.basename(filepath))}") @@ -105,7 +122,6 @@ def process_file(filepath: str, engine, session, table): logger.error(f"[FAIL] 파일 처리 중 오류 발생 - {e}") session.rollback() - def main(): engine = db.get_engine() session = db.get_session()