Files
static/lib/pos_update_upsolution.py
KWON 7121f250bc feat: Flask 애플리케이션 모듈화 및 웹 대시보드 구현
- Flask Blueprint 아키텍처로 전환 (dashboard, upload, backup, status)
- app.py 681줄  95줄로 축소 (86% 감소)
- HTML 템플릿 모듈화 (base.html + 기능별 templates)
- CSS/JS 파일 분리 (common + 기능별 파일)
- 대시보드 기능 추가 (통계, 주간 예보, 방문객 추이)
- 파일 업로드 웹 인터페이스 구현
- 백업/복구 관리 UI 구현
- Docker 배포 환경 개선
- .gitignore 업데이트 (uploads, backups, cache 등)
2025-12-26 17:31:37 +09:00

235 lines
7.5 KiB
Python

import os
import sys
import pandas as pd
import shutil
import threading
from queue import Queue
from sqlalchemy import Table, MetaData
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.exc import SQLAlchemyError
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from lib.common import get_logger
from conf import db, db_schema # get_engine, get_session 포함
logger = get_logger("POS_UPS")
DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../data"))
FINISH_DIR = os.path.join(DATA_DIR, "finish")
os.makedirs(FINISH_DIR, exist_ok=True)
def nan_to_none(value):
import pandas as pd
if pd.isna(value):
return None
return value
def load_excel_data(filepath: str):
df = pd.read_excel(filepath, header=1) # 2행이 header, 3행부터 데이터
df.columns = [col.strip() for col in df.columns]
required_cols = ['영수증 번호', '품목명']
for col in required_cols:
if col not in df.columns:
raise ValueError(f"필수 컬럼 누락: {col}")
df = df.dropna(subset=required_cols)
return df
def prepare_bulk_data(df):
bulk_data = []
for idx, row in df.iterrows():
try:
data = {
"sale_date": pd.to_datetime(row["매출일시"]),
"shop_name": str(row["매장명"]).strip(),
"pos_no": str(row["포스"]).strip(),
"bill_no": str(row["영수증 번호"]).strip(),
"product_cd": str(row["품목"]).strip(),
"product_name": str(row["품목명"]).strip(),
"qty": int(row["수량"]),
"ca01": nan_to_none(row.get("대분류", None)),
"ca02": nan_to_none(row.get("중분류", None)),
"ca03": nan_to_none(row.get("소분류", None)),
"barcode": nan_to_none(row.get("바코드", None)),
"amt": int(row.get("단가", 0)),
"tot_sale_amt": int(row.get("주문 금액", 0)),
"dc_amt": int(row.get("할인 금액", 0)),
"dcm_sale_amt": int(row.get("공급가액", 0)),
"vat_amt": int(row.get("세금", 0)),
"net_amt": int(row.get("결제 금액", 0)),
"cash_receipt": int(row.get("현금영수증", 0)),
"card": int(row.get("카드", 0)),
}
bulk_data.append(data)
except Exception as e:
logger.warning(f"[ERROR:ROW] 데이터 생성 실패: {e} / 인덱스: {idx}")
return bulk_data
def process_bulk_upsert(bulk_data, session, table, batch_size=1000):
"""
데이터 일괄 삽입/업데이트
Args:
bulk_data: 삽입할 데이터 리스트
session: DB 세션
table: 대상 테이블
batch_size: 배치 크기
Returns:
int: 삽입된 행 수
"""
if not bulk_data:
logger.info("[INFO] 삽입할 데이터가 없습니다.")
return 0
total = len(bulk_data)
inserted_total = 0
for start in range(0, total, batch_size):
batch = bulk_data[start:start+batch_size]
insert_stmt = mysql_insert(table).values(batch)
update_cols = {c.name: insert_stmt.inserted[c.name] for c in table.columns
if c.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']}
upsert_stmt = insert_stmt.on_duplicate_key_update(update_cols)
try:
result = session.execute(upsert_stmt)
session.commit()
inserted_total += len(batch)
logger.info(f"[PROGRESS] {inserted_total} / {total} 건 처리 완료")
except SQLAlchemyError as e:
session.rollback()
logger.error(f"[FAIL] batch upsert 실패: {e}")
raise
logger.info(f"[DONE] 총 {total}건 처리 완료 (insert+update)")
return inserted_total
def file_reader(queue, files):
"""파일 읽기 스레드"""
for filepath in files:
try:
logger.info(f"[READ] {os.path.basename(filepath)} 읽기 시작")
df = load_excel_data(filepath)
logger.info(f"[READ] {os.path.basename(filepath)} 읽기 완료 - {len(df)}")
bulk_data = prepare_bulk_data(df)
queue.put((filepath, bulk_data))
except Exception as e:
logger.error(f"[FAIL] {os.path.basename(filepath)} 읽기 실패 - {e}")
queue.put(None) # 종료 신호
def db_writer(queue, session, table):
"""DB 쓰기 스레드"""
while True:
item = queue.get()
if item is None:
break
filepath, bulk_data = item
logger.info(f"[START] {os.path.basename(filepath)} DB 삽입 시작")
try:
process_bulk_upsert(bulk_data, session, table)
dest = os.path.join(FINISH_DIR, os.path.basename(filepath))
shutil.move(filepath, dest)
logger.info(f"[MOVE] 완료: {dest}")
except Exception as e:
logger.error(f"[FAIL] {os.path.basename(filepath)} DB 삽입 실패 - {e}")
def process_upsolution_file(filepath):
"""
UPSOLUTION 파일을 처리하고 DB에 저장
웹 업로드 인터페이스에서 사용하는 함수
Args:
filepath (str): 업로드된 파일 경로
Returns:
dict: {
'success': bool,
'message': str,
'rows_inserted': int
}
"""
try:
logger.info(f"[WEB] UPSOLUTION 파일 처리 시작: {filepath}")
# 파일 읽기
df = load_excel_data(filepath)
logger.info(f"[WEB] 데이터 읽기 완료: {len(df)}")
# 데이터 준비
bulk_data = prepare_bulk_data(df)
logger.info(f"[WEB] 데이터 준비 완료: {len(bulk_data)}")
# DB 세션 및 테이블 가져오기
session = db.get_session()
engine = db.get_engine()
metadata = MetaData()
table = Table(
db_schema.get_full_table_name("pos_ups_billdata"),
metadata,
autoload_with=engine
)
# 데이터 삽입
inserted = process_bulk_upsert(bulk_data, session, table)
session.close()
logger.info(f"[WEB] UPSOLUTION 파일 처리 완료: {inserted}행 삽입")
return {
'success': True,
'message': f'{inserted}행이 저장되었습니다.',
'rows_inserted': inserted
}
except Exception as e:
logger.error(f"[WEB] UPSOLUTION 파일 처리 오류: {e}", exc_info=True)
return {
'success': False,
'message': f'파일 처리 중 오류: {str(e)}',
'rows_inserted': 0
}
def main():
engine = db.get_engine()
session = db.get_session()
metadata = MetaData()
table = Table(
db_schema.get_full_table_name("pos_ups_billdata"),
metadata,
autoload_with=engine
)
files = [os.path.join(DATA_DIR, f) for f in os.listdir(DATA_DIR)
if f.endswith(".xlsx") and f.startswith("영수증별 상세매출")]
logger.info(f"[INFO] 처리할 파일 {len(files)}")
queue = Queue(maxsize=3) # 2개 정도 여유 있게
reader_thread = threading.Thread(target=file_reader, args=(queue, files))
writer_thread = threading.Thread(target=db_writer, args=(queue, session, table))
reader_thread.start()
writer_thread.start()
reader_thread.join()
writer_thread.join()
if __name__ == "__main__":
main()