diff --git a/lib/pos_update_upsolution.py b/lib/pos_update_upsolution.py index 29e6022..7edc385 100644 --- a/lib/pos_update_upsolution.py +++ b/lib/pos_update_upsolution.py @@ -70,27 +70,32 @@ def prepare_bulk_data(df): return bulk_data -def process_bulk_upsert(bulk_data, session, table): +def process_bulk_upsert(bulk_data, session, table, batch_size=1000): if not bulk_data: logger.info("[INFO] 삽입할 데이터가 없습니다.") return - insert_stmt = mysql_insert(table).values(bulk_data) + total = len(bulk_data) + inserted_total = 0 - update_cols = {c.name: insert_stmt.inserted[c.name] for c in table.columns - if c.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']} + for start in range(0, total, batch_size): + batch = bulk_data[start:start+batch_size] + insert_stmt = mysql_insert(table).values(batch) + update_cols = {c.name: insert_stmt.inserted[c.name] for c in table.columns + if c.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']} + upsert_stmt = insert_stmt.on_duplicate_key_update(update_cols) - upsert_stmt = insert_stmt.on_duplicate_key_update(update_cols) - - try: - result = session.execute(upsert_stmt) - session.commit() - logger.info(f"[DONE] 총 {len(bulk_data)}건 처리 완료 (insert+update)") - except SQLAlchemyError as e: - session.rollback() - logger.error(f"[FAIL] bulk upsert 실패: {e}") - raise + try: + result = session.execute(upsert_stmt) + session.commit() + inserted_total += len(batch) + logger.info(f"[PROGRESS] {inserted_total} / {total} 건 처리 완료") + except SQLAlchemyError as e: + session.rollback() + logger.error(f"[FAIL] batch upsert 실패: {e}") + raise + logger.info(f"[DONE] 총 {total}건 처리 완료 (insert+update)") def file_reader(queue, files): for filepath in files: