264 lines
9.4 KiB
Python
264 lines
9.4 KiB
Python
"""
|
|
영수증별매출상세현황 엑셀파일을 기반으로 MariaDB에 데이터 업데이트
|
|
|
|
1. 파일은 ./data 폴더에 위치 (파일명: '영수증별매출상세현황*.xls[x]')
|
|
2. 중복된 데이터는 update 처리됨 (on duplicate key update)
|
|
3. 처리 후 파일 자동 삭제 (파일 삭제 로직은 필요시 추가 가능)
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import re
|
|
import pandas as pd
|
|
from datetime import datetime
|
|
from sqlalchemy.dialects.mysql import insert
|
|
from sqlalchemy import select
|
|
|
|
# 상위 경로를 sys.path에 추가해 프로젝트 내 모듈 임포트 가능하게 설정
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
|
|
|
from conf import db, db_schema
|
|
from lib.common import load_config
|
|
|
|
# 설정 파일 로드 및 데이터 폴더 경로 설정
|
|
CONFIG = load_config()
|
|
DATA_DIR = os.path.join(os.path.dirname(__file__), '../data')
|
|
|
|
# 처리 대상 파일명 패턴: '영수증별매출상세현황'으로 시작하고 .xls 또는 .xlsx 확장자
|
|
FILE_PATTERN = re.compile(r"^영수증별매출상세현황.*\.xls[x]?$")
|
|
|
|
# 엑셀 상단 A3셀 형식 예: "조회일자 : 2025-07-27 매장선택 : [V83728] 퍼스트(삐아또"
|
|
HEADER_PATTERN = re.compile(r"조회일자\s*:\s*(\d{4}-\d{2}-\d{2})\s+매장선택\s*:\s*\[(\w+)]\s*(.+)")
|
|
|
|
def extract_file_info(filepath: str):
|
|
"""
|
|
엑셀 파일 상단에서 조회일자, 매장코드, 매장명을 추출한다.
|
|
A3 셀 (2행 0열, 0부터 시작 기준) 데이터를 정규식으로 파싱.
|
|
|
|
Args:
|
|
filepath (str): 엑셀파일 경로
|
|
|
|
Returns:
|
|
tuple: (sale_date: date, shop_cd: str, shop_name: str)
|
|
|
|
Raises:
|
|
ValueError: 정규식 매칭 실패 시
|
|
"""
|
|
print(f"[INFO] {filepath} 상단 조회일자 및 매장 정보 추출 시작")
|
|
df_head = pd.read_excel(filepath, header=None, nrows=5)
|
|
first_row = df_head.iloc[2, 0] # 3행 A열 (0-based index)
|
|
|
|
match = HEADER_PATTERN.search(str(first_row))
|
|
if not match:
|
|
raise ValueError(f"[ERROR] 조회일자 및 매장 정보 추출 실패: {filepath}")
|
|
|
|
sale_date = datetime.strptime(match.group(1), "%Y-%m-%d").date()
|
|
shop_cd = match.group(2)
|
|
shop_name = match.group(3).strip()
|
|
print(f"[INFO] 추출된 조회일자: {sale_date}, 매장코드: {shop_cd}, 매장명: {shop_name}")
|
|
return sale_date, shop_cd, shop_name
|
|
|
|
def load_excel_data(filepath: str):
|
|
"""
|
|
지정한 컬럼만 읽고, 헤더는 6번째 행(0-based index 5)으로 지정.
|
|
'합계'라는 단어가 '포스번호' 컬럼에 있으면 그 행부터 제거한다.
|
|
|
|
Args:
|
|
filepath (str): 엑셀파일 경로
|
|
|
|
Returns:
|
|
pd.DataFrame: 전처리된 데이터프레임
|
|
|
|
Raises:
|
|
ValueError: 필수 컬럼 누락 시
|
|
"""
|
|
print(f"[INFO] {filepath} 데이터 영역 로드 시작")
|
|
usecols = [
|
|
"포스번호", "영수증번호", "구분", "테이블명", "최초주문", "결제시각",
|
|
"상품코드", "바코드", "상품명", "수량", "총매출액", "ERP 매핑코드",
|
|
"비고", "할인액", "할인구분", "실매출액", "가액", "부가세"
|
|
]
|
|
# header=5 => 6번째 행이 컬럼명
|
|
df = pd.read_excel(filepath, header=5, dtype=str)
|
|
# 컬럼명 좌우 공백 제거
|
|
df.columns = df.columns.str.strip()
|
|
|
|
# '합계'인 행의 인덱스 찾기 및 제거
|
|
if '합계' in df['포스번호'].values:
|
|
idx = df[df['포스번호'] == '합계'].index[0]
|
|
df = df.loc[:idx-1]
|
|
print(f"[INFO] '합계' 행 이후 데이터 제거: {idx}번째 행부터 제외")
|
|
|
|
# 필수 컬럼 존재 여부 체크
|
|
if not set(usecols).issubset(df.columns):
|
|
raise ValueError(f"[ERROR] 필수 컬럼 누락: 현재 컬럼 {df.columns.tolist()}")
|
|
|
|
df = df[usecols]
|
|
print(f"[INFO] {filepath} 데이터 영역 로드 완료, 데이터 건수: {len(df)}")
|
|
return df
|
|
|
|
def normalize_data(df: pd.DataFrame, sale_date, shop_cd):
|
|
"""
|
|
컬럼명을 내부 규칙에 맞게 변경하고, 숫자 필드를 정수형으로 변환한다.
|
|
조회일자와 매장코드를 데이터프레임에 추가.
|
|
|
|
Args:
|
|
df (pd.DataFrame): 원본 데이터프레임
|
|
sale_date (date): 조회일자
|
|
shop_cd (str): 매장코드
|
|
|
|
Returns:
|
|
pd.DataFrame: 정규화된 데이터프레임
|
|
"""
|
|
print(f"[INFO] 데이터 정규화 시작")
|
|
def to_int(x):
|
|
try:
|
|
return int(str(x).replace(",", "").strip())
|
|
except:
|
|
return 0
|
|
|
|
df.rename(columns={
|
|
"포스번호": "pos_no",
|
|
"영수증번호": "bill_no",
|
|
"구분": "division",
|
|
"테이블명": "table_no",
|
|
"최초주문": "order_time",
|
|
"결제시각": "pay_time",
|
|
"상품코드": "product_cd",
|
|
"바코드": "barcode",
|
|
"상품명": "product_name",
|
|
"수량": "qty",
|
|
"총매출액": "tot_sale_amt",
|
|
"ERP 매핑코드": "erp_cd",
|
|
"비고": "remark",
|
|
"할인액": "dc_amt",
|
|
"할인구분": "dc_type",
|
|
"실매출액": "dcm_sale_amt",
|
|
"가액": "net_amt",
|
|
"부가세": "vat_amt"
|
|
}, inplace=True)
|
|
|
|
df["sale_date"] = sale_date
|
|
df["shop_cd"] = shop_cd
|
|
|
|
# 숫자형 컬럼 정수 변환
|
|
int_fields = ["qty", "tot_sale_amt", "dc_amt", "dcm_sale_amt", "net_amt", "vat_amt"]
|
|
for field in int_fields:
|
|
df[field] = df[field].apply(to_int)
|
|
|
|
# pos_no, bill_no는 반드시 int로 변환
|
|
df["pos_no"] = df["pos_no"].astype(int)
|
|
df["bill_no"] = df["bill_no"].astype(int)
|
|
|
|
print(f"[INFO] 데이터 정규화 완료")
|
|
return df
|
|
|
|
def upsert_data(df: pd.DataFrame, batch_size: int = 500) -> int:
|
|
"""
|
|
SQLAlchemy insert 구문을 사용하여
|
|
중복 PK 발생 시 update 처리 (on duplicate key update)
|
|
대량 데이터는 batch_size 단위로 나누어 처리
|
|
|
|
Args:
|
|
df (pd.DataFrame): DB에 삽입할 데이터
|
|
batch_size (int): 한번에 처리할 데이터 건수 (기본 500)
|
|
|
|
Returns:
|
|
int: 영향 받은 총 행 수
|
|
"""
|
|
print(f"[INFO] DB 저장 시작")
|
|
df = df.where(pd.notnull(df), None) # NaN → None 변환
|
|
|
|
engine = db.get_engine()
|
|
metadata = db_schema.metadata
|
|
table = db_schema.pos_billdata
|
|
total_affected = 0
|
|
|
|
with engine.connect() as conn:
|
|
for start in range(0, len(df), batch_size):
|
|
batch_df = df.iloc[start:start+batch_size]
|
|
records = batch_df.to_dict(orient="records")
|
|
insert_stmt = insert(table).values(records)
|
|
|
|
update_fields = {
|
|
col.name: insert_stmt.inserted[col.name]
|
|
for col in table.columns
|
|
if col.name not in table.primary_key.columns
|
|
}
|
|
upsert_stmt = insert_stmt.on_duplicate_key_update(update_fields)
|
|
|
|
try:
|
|
result = conn.execute(upsert_stmt)
|
|
conn.commit()
|
|
total_affected += result.rowcount
|
|
print(f"[INFO] 배치 처리 완료: {start} ~ {start+len(records)-1} / 영향 행 수: {result.rowcount}")
|
|
except Exception as e:
|
|
print(f"[ERROR] 배치 처리 실패: {start} ~ {start+len(records)-1} / 오류: {e}")
|
|
# 필요 시 raise 하거나 continue로 다음 배치 진행 가능
|
|
raise
|
|
|
|
print(f"[INFO] DB 저장 전체 완료, 총 영향 행 수: {total_affected}")
|
|
return total_affected
|
|
|
|
|
|
def ensure_shop_exists(shop_cd, shop_name):
|
|
"""
|
|
매장 정보 테이블에 매장코드가 없으면 신규 등록한다.
|
|
|
|
Args:
|
|
shop_cd (str): 매장 코드
|
|
shop_name (str): 매장 명
|
|
"""
|
|
print(f"[INFO] 매장 존재 여부 확인: {shop_cd}")
|
|
engine = db.get_engine()
|
|
conn = engine.connect()
|
|
shop_table = db_schema.pos_shop_name
|
|
|
|
try:
|
|
query = shop_table.select().where(shop_table.c.shop_cd == shop_cd)
|
|
result = conn.execute(query).fetchone()
|
|
if result is None:
|
|
print(f"[INFO] 신규 매장 등록: {shop_cd} / {shop_name}")
|
|
ins = shop_table.insert().values(shop_cd=shop_cd, shop_name=shop_name)
|
|
conn.execute(ins)
|
|
conn.commit()
|
|
else:
|
|
print(f"[INFO] 기존 매장 존재: {shop_cd}")
|
|
except Exception as e:
|
|
print(f"[ERROR] 매장 확인/등록 실패: {e}")
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
def main():
|
|
"""
|
|
대상 데이터 파일 목록을 찾고, 파일별로 처리 진행한다.
|
|
처리 성공 시 저장 건수를 출력하고, 실패 시 오류 메시지 출력.
|
|
"""
|
|
files = [f for f in os.listdir(DATA_DIR) if FILE_PATTERN.match(f)]
|
|
print(f"[INFO] 발견된 파일 {len(files)}개")
|
|
|
|
for file in files:
|
|
filepath = os.path.join(DATA_DIR, file)
|
|
print(f"[INFO] 파일: {file} 처리 시작")
|
|
|
|
try:
|
|
sale_date, shop_cd, shop_name = extract_file_info(filepath)
|
|
ensure_shop_exists(shop_cd, shop_name)
|
|
|
|
raw_df = load_excel_data(filepath)
|
|
df = normalize_data(raw_df, sale_date, shop_cd)
|
|
|
|
affected = upsert_data(df)
|
|
print(f"[DONE] 처리 완료: {file} / 저장 건수: {affected}")
|
|
|
|
# 처리 완료 후 파일 삭제 (필요 시 활성화)
|
|
# os.remove(filepath)
|
|
# print(f"[INFO] 처리 완료 후 파일 삭제: {file}")
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] {file} 처리 실패: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|