Files
static/lib/pos_update_upsolution.py

148 lines
5.5 KiB
Python

import os
import sys
import pandas as pd
import shutil
from datetime import datetime
from sqlalchemy import Table, MetaData
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.exc import SQLAlchemyError
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from lib.common import get_logger
from conf import db, db_schema # get_engine, get_session 포함
logger = get_logger("POS_UPS")
DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../data"))
FINISH_DIR = os.path.join(DATA_DIR, "finish")
os.makedirs(FINISH_DIR, exist_ok=True)
def nan_to_none(value):
import pandas as pd
if pd.isna(value):
return None
return value
def load_excel_data(filepath: str):
df = pd.read_excel(filepath, header=1) # 2행이 header, 3행부터 데이터
# 컬럼명 공백 제거 등 정리
df.columns = [col.strip() for col in df.columns]
# 필수 컬럼 체크
required_cols = ['영수증 번호', '품목명']
for col in required_cols:
if col not in df.columns:
raise ValueError(f"필수 컬럼 누락: {col}")
df = df.dropna(subset=required_cols)
return df
def process_file(filepath: str, engine, session, table, batch_size=500):
try:
df = load_excel_data(filepath)
logger.info(f"[LOAD] {os.path.basename(filepath)} - {len(df)}")
inserted, updated, errors = 0, 0, 0
batch_data = []
for idx, row in df.iterrows():
data = None
try:
data = {
"sale_date": pd.to_datetime(row["매출일시"]),
"shop_name": str(row["매장명"]).strip(),
"pos_no": str(row["포스"]).strip(),
"bill_no": str(row["영수증 번호"]).strip(),
"product_cd": str(row["품목"]).strip(),
"product_name": str(row["품목명"]).strip(),
"qty": int(row["수량"]),
"ca01": nan_to_none(row.get("대분류", None)),
"ca02": nan_to_none(row.get("중분류", None)),
"ca03": nan_to_none(row.get("소분류", None)),
"barcode": nan_to_none(row.get("바코드", None)),
"amt": int(row.get("단가", 0)),
"tot_sale_amt": int(row.get("주문 금액", 0)),
"dc_amt": int(row.get("할인 금액", 0)),
"dcm_sale_amt": int(row.get("공급가액", 0)),
"vat_amt": int(row.get("세금", 0)),
"net_amt": int(row.get("결제 금액", 0)),
"cash_receipt": int(row.get("현금영수증", 0)),
"card": int(row.get("카드", 0)),
}
batch_data.append(data)
except Exception as e:
if data is not None:
logger.warning(f"[ERROR:ROW] {e} / 데이터: {data}")
else:
logger.warning(f"[ERROR:ROW] {e} / 데이터가 생성되지 않음")
errors += 1
# 배치 크기 도달시 DB에 한번에 처리
if len(batch_data) >= batch_size:
stmt = mysql_insert(table)
update_cols = {
col.name: stmt.inserted[col.name]
for col in table.columns
if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']
}
upsert_stmt = stmt.on_duplicate_key_update(update_cols)
result = session.execute(upsert_stmt, batch_data)
session.commit()
# rowcount가 정확하지 않을 수 있으므로 임시로 inserted 개수만 처리
inserted += len(batch_data)
logger.info(f"[BATCH] {idx + 1} / {len(df)} 처리 중... (총 삽입: {inserted}, 오류: {errors})")
batch_data = []
# 남은 잔여 데이터 처리
if batch_data:
stmt = mysql_insert(table)
update_cols = {
col.name: stmt.inserted[col.name]
for col in table.columns
if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']
}
upsert_stmt = stmt.on_duplicate_key_update(update_cols)
result = session.execute(upsert_stmt, batch_data)
session.commit()
inserted += len(batch_data)
logger.info(f"[BATCH] 최종 {len(batch_data)}건 처리 완료 (총 삽입: {inserted}, 오류: {errors})")
logger.info(f"[DONE] 삽입: {inserted}, 오류: {errors}")
shutil.move(filepath, os.path.join(FINISH_DIR, os.path.basename(filepath)))
logger.info(f"[MOVE] 완료: {os.path.join(FINISH_DIR, os.path.basename(filepath))}")
except Exception as e:
logger.error(f"[FAIL] 파일 처리 중 오류 발생 - {e}")
session.rollback()
def main():
engine = db.get_engine()
session = db.get_session()
metadata = MetaData()
table = Table(
db_schema.get_full_table_name("pos_ups_billdata"),
metadata,
autoload_with=engine
)
files = [os.path.join(DATA_DIR, f) for f in os.listdir(DATA_DIR)
if f.endswith(".xlsx") and f.startswith("영수증별 상세매출")]
logger.info(f"[INFO] 처리할 파일 {len(files)}")
for file in sorted(files):
logger.info(f"[START] {os.path.basename(file)}")
process_file(file, engine, session, table)
if __name__ == "__main__":
main()