배치 처리 방식으로 변경(처리 상태 확인용)

This commit is contained in:
2025-07-29 16:16:14 +09:00
parent d8945b35fe
commit 7f371071f2

View File

@ -70,27 +70,32 @@ def prepare_bulk_data(df):
return bulk_data return bulk_data
def process_bulk_upsert(bulk_data, session, table): def process_bulk_upsert(bulk_data, session, table, batch_size=1000):
if not bulk_data: if not bulk_data:
logger.info("[INFO] 삽입할 데이터가 없습니다.") logger.info("[INFO] 삽입할 데이터가 없습니다.")
return return
insert_stmt = mysql_insert(table).values(bulk_data) total = len(bulk_data)
inserted_total = 0
update_cols = {c.name: insert_stmt.inserted[c.name] for c in table.columns for start in range(0, total, batch_size):
if c.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']} batch = bulk_data[start:start+batch_size]
insert_stmt = mysql_insert(table).values(batch)
update_cols = {c.name: insert_stmt.inserted[c.name] for c in table.columns
if c.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']}
upsert_stmt = insert_stmt.on_duplicate_key_update(update_cols)
upsert_stmt = insert_stmt.on_duplicate_key_update(update_cols) try:
result = session.execute(upsert_stmt)
try: session.commit()
result = session.execute(upsert_stmt) inserted_total += len(batch)
session.commit() logger.info(f"[PROGRESS] {inserted_total} / {total} 건 처리 완료")
logger.info(f"[DONE] 총 {len(bulk_data)}건 처리 완료 (insert+update)") except SQLAlchemyError as e:
except SQLAlchemyError as e: session.rollback()
session.rollback() logger.error(f"[FAIL] batch upsert 실패: {e}")
logger.error(f"[FAIL] bulk upsert 실패: {e}") raise
raise
logger.info(f"[DONE] 총 {total}건 처리 완료 (insert+update)")
def file_reader(queue, files): def file_reader(queue, files):
for filepath in files: for filepath in files: