From 7f371071f2c6c46dc778673b491b57297ad39700 Mon Sep 17 00:00:00 2001 From: KWON Date: Tue, 29 Jul 2025 16:16:14 +0900 Subject: [PATCH] =?UTF-8?q?=EB=B0=B0=EC=B9=98=20=EC=B2=98=EB=A6=AC=20?= =?UTF-8?q?=EB=B0=A9=EC=8B=9D=EC=9C=BC=EB=A1=9C=20=EB=B3=80=EA=B2=BD(?= =?UTF-8?q?=EC=B2=98=EB=A6=AC=20=EC=83=81=ED=83=9C=20=ED=99=95=EC=9D=B8?= =?UTF-8?q?=EC=9A=A9)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/pos_update_upsolution.py | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/lib/pos_update_upsolution.py b/lib/pos_update_upsolution.py index 29e6022..7edc385 100644 --- a/lib/pos_update_upsolution.py +++ b/lib/pos_update_upsolution.py @@ -70,27 +70,32 @@ def prepare_bulk_data(df): return bulk_data -def process_bulk_upsert(bulk_data, session, table): +def process_bulk_upsert(bulk_data, session, table, batch_size=1000): if not bulk_data: logger.info("[INFO] 삽입할 데이터가 없습니다.") return - insert_stmt = mysql_insert(table).values(bulk_data) + total = len(bulk_data) + inserted_total = 0 - update_cols = {c.name: insert_stmt.inserted[c.name] for c in table.columns - if c.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']} + for start in range(0, total, batch_size): + batch = bulk_data[start:start+batch_size] + insert_stmt = mysql_insert(table).values(batch) + update_cols = {c.name: insert_stmt.inserted[c.name] for c in table.columns + if c.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']} + upsert_stmt = insert_stmt.on_duplicate_key_update(update_cols) - upsert_stmt = insert_stmt.on_duplicate_key_update(update_cols) - - try: - result = session.execute(upsert_stmt) - session.commit() - logger.info(f"[DONE] 총 {len(bulk_data)}건 처리 완료 (insert+update)") - except SQLAlchemyError as e: - session.rollback() - logger.error(f"[FAIL] bulk upsert 실패: {e}") - raise + try: + result = session.execute(upsert_stmt) + session.commit() + inserted_total += len(batch) + logger.info(f"[PROGRESS] {inserted_total} / {total} 건 처리 완료") + except SQLAlchemyError as e: + session.rollback() + logger.error(f"[FAIL] batch upsert 실패: {e}") + raise + logger.info(f"[DONE] 총 {total}건 처리 완료 (insert+update)") def file_reader(queue, files): for filepath in files: