""" 영수증별매출상세현황 엑셀파일을 기반으로 MariaDB에 데이터 업데이트 1. 파일은 ./data 폴더에 위치 (파일명: '영수증별매출상세현황*.xls[x]') 2. 중복된 데이터는 update 처리됨 (on duplicate key update) 3. 처리 후 파일 자동 삭제 (파일 삭제 로직은 필요시 추가 가능) """ import os import sys import re import pandas as pd from datetime import datetime from sqlalchemy.dialects.mysql import insert from sqlalchemy import select # 상위 경로를 sys.path에 추가해 프로젝트 내 모듈 임포트 가능하게 설정 sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from conf import db, db_schema from lib.common import load_config # 설정 파일 로드 및 데이터 폴더 경로 설정 CONFIG = load_config() DATA_DIR = os.path.join(os.path.dirname(__file__), '../data') # 처리 대상 파일명 패턴: '영수증별매출상세현황'으로 시작하고 .xls 또는 .xlsx 확장자 FILE_PATTERN = re.compile(r"^영수증별매출상세현황.*\.xls[x]?$") # 엑셀 상단 A3셀 형식 예: "조회일자 : 2025-07-27 매장선택 : [V83728] 퍼스트(삐아또" HEADER_PATTERN = re.compile(r"조회일자\s*:\s*(\d{4}-\d{2}-\d{2})\s+매장선택\s*:\s*\[(\w+)]\s*(.+)") def extract_file_info(filepath: str): """ 엑셀 파일 상단에서 조회일자, 매장코드, 매장명을 추출한다. A3 셀 (2행 0열, 0부터 시작 기준) 데이터를 정규식으로 파싱. Args: filepath (str): 엑셀파일 경로 Returns: tuple: (sale_date: date, shop_cd: str, shop_name: str) Raises: ValueError: 정규식 매칭 실패 시 """ print(f"[INFO] {filepath} 상단 조회일자 및 매장 정보 추출 시작") df_head = pd.read_excel(filepath, header=None, nrows=5) first_row = df_head.iloc[2, 0] # 3행 A열 (0-based index) match = HEADER_PATTERN.search(str(first_row)) if not match: raise ValueError(f"[ERROR] 조회일자 및 매장 정보 추출 실패: {filepath}") sale_date = datetime.strptime(match.group(1), "%Y-%m-%d").date() shop_cd = match.group(2) shop_name = match.group(3).strip() print(f"[INFO] 추출된 조회일자: {sale_date}, 매장코드: {shop_cd}, 매장명: {shop_name}") return sale_date, shop_cd, shop_name def load_excel_data(filepath: str): """ 지정한 컬럼만 읽고, 헤더는 6번째 행(0-based index 5)으로 지정. '합계'라는 단어가 '포스번호' 컬럼에 있으면 그 행부터 제거한다. Args: filepath (str): 엑셀파일 경로 Returns: pd.DataFrame: 전처리된 데이터프레임 Raises: ValueError: 필수 컬럼 누락 시 """ print(f"[INFO] {filepath} 데이터 영역 로드 시작") usecols = [ "포스번호", "영수증번호", "구분", "테이블명", "최초주문", "결제시각", "상품코드", "바코드", "상품명", "수량", "총매출액", "ERP 매핑코드", "비고", "할인액", "할인구분", "실매출액", "가액", "부가세" ] # header=5 => 6번째 행이 컬럼명 df = pd.read_excel(filepath, header=5, dtype=str) # 컬럼명 좌우 공백 제거 df.columns = df.columns.str.strip() # '합계'인 행의 인덱스 찾기 및 제거 if '합계' in df['포스번호'].values: idx = df[df['포스번호'] == '합계'].index[0] df = df.loc[:idx-1] print(f"[INFO] '합계' 행 이후 데이터 제거: {idx}번째 행부터 제외") # 필수 컬럼 존재 여부 체크 if not set(usecols).issubset(df.columns): raise ValueError(f"[ERROR] 필수 컬럼 누락: 현재 컬럼 {df.columns.tolist()}") df = df[usecols] print(f"[INFO] {filepath} 데이터 영역 로드 완료, 데이터 건수: {len(df)}") return df def normalize_data(df: pd.DataFrame, sale_date, shop_cd): """ 컬럼명을 내부 규칙에 맞게 변경하고, 숫자 필드를 정수형으로 변환한다. 조회일자와 매장코드를 데이터프레임에 추가. Args: df (pd.DataFrame): 원본 데이터프레임 sale_date (date): 조회일자 shop_cd (str): 매장코드 Returns: pd.DataFrame: 정규화된 데이터프레임 """ print(f"[INFO] 데이터 정규화 시작") def to_int(x): try: return int(str(x).replace(",", "").strip()) except: return 0 df.rename(columns={ "포스번호": "pos_no", "영수증번호": "bill_no", "구분": "division", "테이블명": "table_no", "최초주문": "order_time", "결제시각": "pay_time", "상품코드": "product_cd", "바코드": "barcode", "상품명": "product_name", "수량": "qty", "총매출액": "tot_sale_amt", "ERP 매핑코드": "erp_cd", "비고": "remark", "할인액": "dc_amt", "할인구분": "dc_type", "실매출액": "dcm_sale_amt", "가액": "net_amt", "부가세": "vat_amt" }, inplace=True) df["sale_date"] = sale_date df["shop_cd"] = shop_cd # 숫자형 컬럼 정수 변환 int_fields = ["qty", "tot_sale_amt", "dc_amt", "dcm_sale_amt", "net_amt", "vat_amt"] for field in int_fields: df[field] = df[field].apply(to_int) # pos_no, bill_no는 반드시 int로 변환 df["pos_no"] = df["pos_no"].astype(int) df["bill_no"] = df["bill_no"].astype(int) print(f"[INFO] 데이터 정규화 완료") return df def upsert_data(df: pd.DataFrame, batch_size: int = 500) -> int: """ SQLAlchemy insert 구문을 사용하여 중복 PK 발생 시 update 처리 (on duplicate key update) 대량 데이터는 batch_size 단위로 나누어 처리 Args: df (pd.DataFrame): DB에 삽입할 데이터 batch_size (int): 한번에 처리할 데이터 건수 (기본 500) Returns: int: 영향 받은 총 행 수 """ print(f"[INFO] DB 저장 시작") df = df.where(pd.notnull(df), None) # NaN → None 변환 engine = db.get_engine() metadata = db_schema.metadata table = db_schema.pos_billdata total_affected = 0 with engine.connect() as conn: for start in range(0, len(df), batch_size): batch_df = df.iloc[start:start+batch_size] records = batch_df.to_dict(orient="records") insert_stmt = insert(table).values(records) update_fields = { col.name: insert_stmt.inserted[col.name] for col in table.columns if col.name not in table.primary_key.columns } upsert_stmt = insert_stmt.on_duplicate_key_update(update_fields) try: result = conn.execute(upsert_stmt) conn.commit() total_affected += result.rowcount print(f"[INFO] 배치 처리 완료: {start} ~ {start+len(records)-1} / 영향 행 수: {result.rowcount}") except Exception as e: print(f"[ERROR] 배치 처리 실패: {start} ~ {start+len(records)-1} / 오류: {e}") # 필요 시 raise 하거나 continue로 다음 배치 진행 가능 raise print(f"[INFO] DB 저장 전체 완료, 총 영향 행 수: {total_affected}") return total_affected def ensure_shop_exists(shop_cd, shop_name): """ 매장 정보 테이블에 매장코드가 없으면 신규 등록한다. Args: shop_cd (str): 매장 코드 shop_name (str): 매장 명 """ print(f"[INFO] 매장 존재 여부 확인: {shop_cd}") engine = db.get_engine() conn = engine.connect() shop_table = db_schema.pos_shop_name try: query = shop_table.select().where(shop_table.c.shop_cd == shop_cd) result = conn.execute(query).fetchone() if result is None: print(f"[INFO] 신규 매장 등록: {shop_cd} / {shop_name}") ins = shop_table.insert().values(shop_cd=shop_cd, shop_name=shop_name) conn.execute(ins) conn.commit() else: print(f"[INFO] 기존 매장 존재: {shop_cd}") except Exception as e: print(f"[ERROR] 매장 확인/등록 실패: {e}") raise finally: conn.close() def main(): """ 대상 데이터 파일 목록을 찾고, 파일별로 처리 진행한다. 처리 성공 시 저장 건수를 출력하고, 실패 시 오류 메시지 출력. """ files = [f for f in os.listdir(DATA_DIR) if FILE_PATTERN.match(f)] print(f"[INFO] 발견된 파일 {len(files)}개") for file in files: filepath = os.path.join(DATA_DIR, file) print(f"[INFO] 파일: {file} 처리 시작") try: sale_date, shop_cd, shop_name = extract_file_info(filepath) ensure_shop_exists(shop_cd, shop_name) raw_df = load_excel_data(filepath) df = normalize_data(raw_df, sale_date, shop_cd) affected = upsert_data(df) print(f"[DONE] 처리 완료: {file} / 저장 건수: {affected}") # 처리 완료 후 파일 삭제 (필요 시 활성화) # os.remove(filepath) # print(f"[INFO] 처리 완료 후 파일 삭제: {file}") except Exception as e: print(f"[ERROR] {file} 처리 실패: {e}") if __name__ == "__main__": main()