엑셀 파일이 여러개인 경우 다음 파일을 미리 읽어 지연 해소, row를 한개씩 삽입하던 방식에서 1,000개씩 삽입하는 방식으로 변경하여 처리속도 증가

This commit is contained in:
2025-07-29 16:12:21 +09:00
parent 9df9c73818
commit d8945b35fe

View File

@ -2,7 +2,8 @@ import os
import sys
import pandas as pd
import shutil
from datetime import datetime
import threading
from queue import Queue
from sqlalchemy import Table, MetaData
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.exc import SQLAlchemyError
@ -28,9 +29,7 @@ def nan_to_none(value):
def load_excel_data(filepath: str):
df = pd.read_excel(filepath, header=1) # 2행이 header, 3행부터 데이터
# 컬럼명 공백 제거 등 정리
df.columns = [col.strip() for col in df.columns]
# 필수 컬럼 체크
required_cols = ['영수증 번호', '품목명']
for col in required_cols:
if col not in df.columns:
@ -39,16 +38,9 @@ def load_excel_data(filepath: str):
return df
def process_file(filepath: str, engine, session, table, batch_size=500):
try:
df = load_excel_data(filepath)
logger.info(f"[LOAD] {os.path.basename(filepath)} - {len(df)}")
inserted, updated, errors = 0, 0, 0
batch_data = []
def prepare_bulk_data(df):
bulk_data = []
for idx, row in df.iterrows():
data = None
try:
data = {
"sale_date": pd.to_datetime(row["매출일시"]),
@ -72,55 +64,62 @@ def process_file(filepath: str, engine, session, table, batch_size=500):
"cash_receipt": int(row.get("현금영수증", 0)),
"card": int(row.get("카드", 0)),
}
batch_data.append(data)
bulk_data.append(data)
except Exception as e:
if data is not None:
logger.warning(f"[ERROR:ROW] {e} / 데이터: {data}")
else:
logger.warning(f"[ERROR:ROW] {e} / 데이터가 생성되지 않음")
errors += 1
logger.warning(f"[ERROR:ROW] 데이터 생성 실패: {e} / 인덱스: {idx}")
return bulk_data
# 배치 크기 도달시 DB에 한번에 처리
if len(batch_data) >= batch_size:
stmt = mysql_insert(table)
update_cols = {
col.name: stmt.inserted[col.name]
for col in table.columns
if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']
}
upsert_stmt = stmt.on_duplicate_key_update(update_cols)
result = session.execute(upsert_stmt, batch_data)
def process_bulk_upsert(bulk_data, session, table):
if not bulk_data:
logger.info("[INFO] 삽입할 데이터가 없습니다.")
return
insert_stmt = mysql_insert(table).values(bulk_data)
update_cols = {c.name: insert_stmt.inserted[c.name] for c in table.columns
if c.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']}
upsert_stmt = insert_stmt.on_duplicate_key_update(update_cols)
try:
result = session.execute(upsert_stmt)
session.commit()
# rowcount가 정확하지 않을 수 있으므로 임시로 inserted 개수만 처리
inserted += len(batch_data)
logger.info(f"[BATCH] {idx + 1} / {len(df)} 처리 중... (총 삽입: {inserted}, 오류: {errors})")
batch_data = []
# 남은 잔여 데이터 처리
if batch_data:
stmt = mysql_insert(table)
update_cols = {
col.name: stmt.inserted[col.name]
for col in table.columns
if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']
}
upsert_stmt = stmt.on_duplicate_key_update(update_cols)
result = session.execute(upsert_stmt, batch_data)
session.commit()
inserted += len(batch_data)
logger.info(f"[BATCH] 최종 {len(batch_data)}건 처리 완료 (총 삽입: {inserted}, 오류: {errors})")
logger.info(f"[DONE] 삽입: {inserted}, 오류: {errors}")
shutil.move(filepath, os.path.join(FINISH_DIR, os.path.basename(filepath)))
logger.info(f"[MOVE] 완료: {os.path.join(FINISH_DIR, os.path.basename(filepath))}")
except Exception as e:
logger.error(f"[FAIL] 파일 처리 중 오류 발생 - {e}")
logger.info(f"[DONE] 총 {len(bulk_data)}건 처리 완료 (insert+update)")
except SQLAlchemyError as e:
session.rollback()
logger.error(f"[FAIL] bulk upsert 실패: {e}")
raise
def file_reader(queue, files):
for filepath in files:
try:
logger.info(f"[READ] {os.path.basename(filepath)} 읽기 시작")
df = load_excel_data(filepath)
logger.info(f"[READ] {os.path.basename(filepath)} 읽기 완료 - {len(df)}")
bulk_data = prepare_bulk_data(df)
queue.put((filepath, bulk_data))
except Exception as e:
logger.error(f"[FAIL] {os.path.basename(filepath)} 읽기 실패 - {e}")
queue.put(None) # 종료 신호
def db_writer(queue, session, table):
while True:
item = queue.get()
if item is None:
break
filepath, bulk_data = item
logger.info(f"[START] {os.path.basename(filepath)} DB 삽입 시작")
try:
process_bulk_upsert(bulk_data, session, table)
dest = os.path.join(FINISH_DIR, os.path.basename(filepath))
shutil.move(filepath, dest)
logger.info(f"[MOVE] 완료: {dest}")
except Exception as e:
logger.error(f"[FAIL] {os.path.basename(filepath)} DB 삽입 실패 - {e}")
def main():
engine = db.get_engine()
@ -138,9 +137,16 @@ def main():
logger.info(f"[INFO] 처리할 파일 {len(files)}")
for file in sorted(files):
logger.info(f"[START] {os.path.basename(file)}")
process_file(file, engine, session, table)
queue = Queue(maxsize=2) # 2개 정도 여유 있게
reader_thread = threading.Thread(target=file_reader, args=(queue, files))
writer_thread = threading.Thread(target=db_writer, args=(queue, session, table))
reader_thread.start()
writer_thread.start()
reader_thread.join()
writer_thread.join()
if __name__ == "__main__":