From ac54673983c33b442096bdfe7d8e269cba1ae395 Mon Sep 17 00:00:00 2001 From: KWON Date: Tue, 29 Jul 2025 15:49:08 +0900 Subject: [PATCH] =?UTF-8?q?=EB=8D=B0=EC=9D=B4=ED=84=B0=EA=B0=80=20?= =?UTF-8?q?=EC=A0=95=EC=83=81=EC=A0=81=EC=9C=BC=EB=A1=9C=20=EB=93=A4?= =?UTF-8?q?=EC=96=B4=EA=B0=80=EC=A7=80=20=EC=95=8A=EB=8A=94=20=EB=B6=80?= =?UTF-8?q?=EB=B6=84=20=EC=88=98=EC=A0=95,=20=EC=98=81=EC=88=98=EC=A6=9D?= =?UTF-8?q?=EB=B2=88=ED=98=B8=EC=99=80=20=ED=92=88=EB=AA=85,=20=EC=88=98?= =?UTF-8?q?=EB=9F=89=EC=9D=B4=20=EB=8F=99=EC=9D=BC=ED=95=9C=20=EA=B2=BD?= =?UTF-8?q?=EC=9A=B0=20=EC=A4=91=EB=B3=B5=EA=B0=92=EC=9C=BC=EB=A1=9C=20?= =?UTF-8?q?=EC=9D=B8=EC=8B=9D=EC=8B=9C=ED=82=A4=EA=B3=A0=20=EB=8D=AE?= =?UTF-8?q?=EC=96=B4=EC=94=8C=EC=9A=B0=EB=8F=84=EB=A1=9D=20=EC=88=98?= =?UTF-8?q?=EC=A0=95=ED=95=A8(=EC=9D=BC=EB=B6=80=20=EB=8D=B0=EC=9D=B4?= =?UTF-8?q?=ED=84=B0=EA=B0=80=20=EC=A4=91=EB=B3=B5=20=EB=8D=B0=EC=9D=B4?= =?UTF-8?q?=ED=84=B0=EA=B0=80=20=EC=A1=B4=EC=9E=AC)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/db_schema.py | 17 ++++++-- lib/pos_update_upsolution.py | 81 ++++++++++++++++-------------------- 2 files changed, 50 insertions(+), 48 deletions(-) diff --git a/conf/db_schema.py b/conf/db_schema.py index 6495ec6..4a0e1c5 100644 --- a/conf/db_schema.py +++ b/conf/db_schema.py @@ -1,7 +1,7 @@ # db_schema.py import os import yaml -from sqlalchemy import Table, Column, Date, Integer, String, Float, Text, MetaData, UniqueConstraint, DateTime, Time, PrimaryKeyConstraint +from sqlalchemy import Table, Column, Date, Integer, String, Float, Text, MetaData, UniqueConstraint, DateTime, Time, PrimaryKeyConstraint, Index from sqlalchemy.sql import func BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) @@ -218,8 +218,8 @@ pos_ups_billdata = Table( get_full_table_name('pos_ups_billdata'), metadata, Column('sale_date', DateTime, nullable=False), Column('shop_name', String(100), nullable=False), - Column('pos_no', Integer, nullable=False), - Column('bill_no', Integer, nullable=False), + Column('pos_no', String(20), nullable=False), + Column('bill_no', String(20), nullable=False), Column('product_cd', String(20), nullable=False), Column('ca01', String(50)), Column('ca02', String(50)), @@ -235,9 +235,18 @@ pos_ups_billdata = Table( Column('vat_amt', Integer), Column('cash_receipt', Integer), Column('card', Integer), - PrimaryKeyConstraint('sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd') + # PrimaryKeyConstraint 생략 + + mysql_engine='InnoDB', + mysql_charset='utf8mb4' ) +# 인덱스 추가 +Index('idx_sale_shop_pos_product', pos_ups_billdata.c.sale_date, pos_ups_billdata.c.shop_name, pos_ups_billdata.c.pos_no, pos_ups_billdata.c.product_cd) +Index('idx_category', pos_ups_billdata.c.ca01, pos_ups_billdata.c.ca02, pos_ups_billdata.c.ca03) +Index('idx_product_barcode', pos_ups_billdata.c.product_name, pos_ups_billdata.c.barcode) + + pos_shop_name = Table( get_full_table_name('pos_shop_name'), metadata, Column('shop_cd', String(20), primary_key=True, nullable=False), diff --git a/lib/pos_update_upsolution.py b/lib/pos_update_upsolution.py index 3d586e8..b5e99d8 100644 --- a/lib/pos_update_upsolution.py +++ b/lib/pos_update_upsolution.py @@ -38,76 +38,69 @@ def load_excel_data(filepath: str): df = df.dropna(subset=required_cols) return df -def process_file(filepath: str, engine, session, table): - batch_size = 1000 - inserted, updated, errors = 0, 0, 0 - count = 0 +def process_file(filepath: str, engine, session, table): try: df = load_excel_data(filepath) logger.info(f"[LOAD] {os.path.basename(filepath)} - {len(df)}건") - for _, row in df.iterrows(): - try: - def safe_int(val, default=0): - try: - return int(val) - except Exception: - return default + inserted, updated, errors = 0, 0, 0 + for idx, row in df.iterrows(): + data = None # 미리 초기화 + try: data = { "sale_date": pd.to_datetime(row["매출일시"]), - "shop_name": str(row.get("매장명", "")).strip(), - "pos_no": safe_int(row.get("포스번호")), - "bill_no": safe_int(row.get("영수증 번호")), - "product_cd": str(row.get("품목", "")).strip(), - "ca01": nan_to_none(row.get("대분류")), - "ca02": nan_to_none(row.get("중분류")), - "ca03": nan_to_none(row.get("소분류")), - "product_name": str(row.get("품목명", "")).strip(), - "barcode": nan_to_none(row.get("바코드")), - "amt": safe_int(row.get("단가")), - "qty": safe_int(row.get("수량")), - "tot_sale_amt": safe_int(row.get("주문 금액")), - "dc_amt": safe_int(row.get("할인 금액")), - "dcm_sale_amt": safe_int(row.get("공급가액")), - "net_amt": safe_int(row.get("세금")), - "vat_amt": safe_int(row.get("부가세")), - "cash_receipt": safe_int(row.get("현금영수증")), - "card": safe_int(row.get("카드")), + "shop_name": str(row["매장명"]).strip(), + "pos_no": str(row["포스"]).strip(), + "bill_no": str(row["영수증 번호"]).strip(), + "product_cd": str(row["품목"]).strip(), + "product_name": str(row["품목명"]).strip(), + "qty": int(row["수량"]), + + "ca01": nan_to_none(row.get("대분류", None)), + "ca02": nan_to_none(row.get("중분류", None)), + "ca03": nan_to_none(row.get("소분류", None)), + "barcode": nan_to_none(row.get("바코드", None)), + "amt": int(row.get("단가", 0)), + "tot_sale_amt": int(row.get("주문 금액", 0)), + "dc_amt": int(row.get("할인 금액", 0)), + "dcm_sale_amt": int(row.get("공급가액", 0)), + "vat_amt": int(row.get("세금", 0)), + "net_amt": int(row.get("결제 금액", 0)), + "cash_receipt": int(row.get("현금영수증", 0)), + "card": int(row.get("카드", 0)), } + stmt = mysql_insert(table).values(**data) - update_stmt = stmt.on_duplicate_key_update({ - col.name: stmt.inserted[col.name] - for col in table.columns - if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd'] - }) - result = session.execute(update_stmt) + update_cols = {col.name: stmt.inserted[col.name] for col in table.columns + if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']} + upsert_stmt = stmt.on_duplicate_key_update(update_cols) + + result = session.execute(upsert_stmt) if result.rowcount == 1: inserted += 1 elif result.rowcount == 2: updated += 1 - count += 1 - if count % batch_size == 0: - session.commit() - logger.info(f"[COMMIT] {count}건 처리 완료") - except Exception as e: - logger.warning(f"[ERROR:ROW] {e}") + if data is not None: + logger.warning(f"[ERROR:ROW] {e} / 데이터: {data}") + else: + logger.warning(f"[ERROR:ROW] {e} / 데이터가 생성되지 않음") errors += 1 + if (idx + 1) % 1000 == 0: + logger.info(f"[PROGRESS] {idx + 1} / {len(df)} 처리 중...") + session.commit() logger.info(f"[DONE] 삽입: {inserted}, 업데이트: {updated}, 오류: {errors}") shutil.move(filepath, os.path.join(FINISH_DIR, os.path.basename(filepath))) logger.info(f"[MOVE] 완료: {os.path.join(FINISH_DIR, os.path.basename(filepath))}") - except SQLAlchemyError as e: - logger.error(f"[FAIL] DB 처리 중 오류 발생 - 롤백: {e}") - session.rollback() except Exception as e: logger.error(f"[FAIL] 파일 처리 중 오류 발생 - {e}") session.rollback()