From cb3b1522170b962d0353812fa913a1d56bd969d3 Mon Sep 17 00:00:00 2001 From: KWON Date: Tue, 29 Jul 2025 15:54:53 +0900 Subject: [PATCH] =?UTF-8?q?=EB=8D=B0=EC=9D=B4=ED=84=B0=20=EC=82=BD?= =?UTF-8?q?=EC=9E=85=20=EB=B0=A9=EC=8B=9D=EC=9D=84=20=EB=B0=B0=EC=B9=98?= =?UTF-8?q?=EB=A1=9C=20=EB=B3=80=EA=B2=BD=ED=95=98=EC=97=AC=20=EC=B2=98?= =?UTF-8?q?=EB=A6=AC=20=EC=86=8D=EB=8F=84=20=ED=96=A5=EC=83=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/pos_update_upsolution.py | 56 +++++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/lib/pos_update_upsolution.py b/lib/pos_update_upsolution.py index b5e99d8..cab9656 100644 --- a/lib/pos_update_upsolution.py +++ b/lib/pos_update_upsolution.py @@ -39,15 +39,16 @@ def load_excel_data(filepath: str): return df -def process_file(filepath: str, engine, session, table): +def process_file(filepath: str, engine, session, table, batch_size=500): try: df = load_excel_data(filepath) logger.info(f"[LOAD] {os.path.basename(filepath)} - {len(df)}건") inserted, updated, errors = 0, 0, 0 + batch_data = [] for idx, row in df.iterrows(): - data = None # 미리 초기화 + data = None try: data = { "sale_date": pd.to_datetime(row["매출일시"]), @@ -71,19 +72,7 @@ def process_file(filepath: str, engine, session, table): "cash_receipt": int(row.get("현금영수증", 0)), "card": int(row.get("카드", 0)), } - - - stmt = mysql_insert(table).values(**data) - update_cols = {col.name: stmt.inserted[col.name] for col in table.columns - if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']} - upsert_stmt = stmt.on_duplicate_key_update(update_cols) - - result = session.execute(upsert_stmt) - - if result.rowcount == 1: - inserted += 1 - elif result.rowcount == 2: - updated += 1 + batch_data.append(data) except Exception as e: if data is not None: @@ -92,11 +81,39 @@ def process_file(filepath: str, engine, session, table): logger.warning(f"[ERROR:ROW] {e} / 데이터가 생성되지 않음") errors += 1 - if (idx + 1) % 1000 == 0: - logger.info(f"[PROGRESS] {idx + 1} / {len(df)} 처리 중...") + # 배치 크기 도달시 DB에 한번에 처리 + if len(batch_data) >= batch_size: + stmt = mysql_insert(table) + update_cols = { + col.name: stmt.inserted[col.name] + for col in table.columns + if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd'] + } + upsert_stmt = stmt.on_duplicate_key_update(update_cols) + result = session.execute(upsert_stmt, batch_data) + session.commit() - session.commit() - logger.info(f"[DONE] 삽입: {inserted}, 업데이트: {updated}, 오류: {errors}") + # rowcount가 정확하지 않을 수 있으므로 임시로 inserted 개수만 처리 + inserted += len(batch_data) + logger.info(f"[BATCH] {idx + 1} / {len(df)} 처리 중... (총 삽입: {inserted}, 오류: {errors})") + batch_data = [] + + # 남은 잔여 데이터 처리 + if batch_data: + stmt = mysql_insert(table) + update_cols = { + col.name: stmt.inserted[col.name] + for col in table.columns + if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd'] + } + upsert_stmt = stmt.on_duplicate_key_update(update_cols) + result = session.execute(upsert_stmt, batch_data) + session.commit() + + inserted += len(batch_data) + logger.info(f"[BATCH] 최종 {len(batch_data)}건 처리 완료 (총 삽입: {inserted}, 오류: {errors})") + + logger.info(f"[DONE] 삽입: {inserted}, 오류: {errors}") shutil.move(filepath, os.path.join(FINISH_DIR, os.path.basename(filepath))) logger.info(f"[MOVE] 완료: {os.path.join(FINISH_DIR, os.path.basename(filepath))}") @@ -105,7 +122,6 @@ def process_file(filepath: str, engine, session, table): logger.error(f"[FAIL] 파일 처리 중 오류 발생 - {e}") session.rollback() - def main(): engine = db.get_engine() session = db.get_session()