./data 폴더를 모니터랑 하고, 새 파일이 생기면 일치하는 파일 형식인지 찾은 후 데이터를 파싱해서 DB에 저장

This commit is contained in:
2025-07-28 16:17:24 +09:00
parent 1e275d2ac7
commit 29319cb12c
4 changed files with 321 additions and 244 deletions

View File

@ -1,266 +1,263 @@
"""
영수증별매출상세현황 엑셀파일을 기반으로 MariaDB에 데이터 업데이트
1. 파일은 ./data 폴더에 위치 (파일명: '영수증별매출상세현황*.xls[x]')
2. 중복된 데이터는 update 처리됨 (on duplicate key update)
3. 처리 후 파일 자동 삭제 (파일 삭제 로직은 필요시 추가 가능)
"""
import os
import sys
import re
import time
import logging
from datetime import datetime
import pandas as pd
import xlrd
from openpyxl import load_workbook
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.exc import IntegrityError
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from datetime import datetime
from sqlalchemy.dialects.mysql import insert
from sqlalchemy import select
# 프로젝트 루트 상위 경로 추가
# 상위 경로를 sys.path에 추가해 프로젝트 내 모듈 임포트 가능하게 설정
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from conf import db, db_schema
from lib.common import load_config
# 설정 파일 로드 및 데이터 폴더 경로 설정
CONFIG = load_config()
DATA_DIR = os.path.join(os.path.dirname(__file__), '../data')
FILE_PATTERN = re.compile(r"^영수증별매출상세현황.*\.xls[x]?$") # xls 또는 xlsx
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
)
# 처리 대상 파일명 패턴: '영수증별매출상세현황'으로 시작하고 .xls 또는 .xlsx 확장자
FILE_PATTERN = re.compile(r"^영수증별매출상세현황.*\.xls[x]?$")
def parse_header(filepath):
ext = os.path.splitext(filepath)[1].lower()
if ext == '.xlsx':
from openpyxl import load_workbook
wb = load_workbook(filepath, read_only=True, data_only=True)
ws = wb.active
cell_val = ws['A3'].value
elif ext == '.xls':
wb = xlrd.open_workbook(filepath)
sheet = wb.sheet_by_index(0)
cell_val = sheet.cell_value(2, 0) # 0-based row,col → A3 is row=2, col=0
else:
raise ValueError(f"지원하지 않는 확장자: {ext}")
# 엑셀 상단 A3셀 형식 예: "조회일자 : 2025-07-27 매장선택 : [V83728] 퍼스트(삐아또"
HEADER_PATTERN = re.compile(r"조회일자\s*:\s*(\d{4}-\d{2}-\d{2})\s+매장선택\s*:\s*\[(\w+)]\s*(.+)")
if not cell_val:
raise ValueError("A3 셀에 값이 없습니다.")
def extract_file_info(filepath: str):
"""
엑셀 파일 상단에서 조회일자, 매장코드, 매장명을 추출한다.
A3 셀 (2행 0열, 0부터 시작 기준) 데이터를 정규식으로 파싱.
date_match = re.search(r'조회일자\s*:\s*([\d\-]+)', cell_val)
shop_match = re.search(r'매장선택\s*:\s*\[(.*?)\]\s*(.+)', cell_val)
Args:
filepath (str): 엑셀파일 경로
if not date_match or not shop_match:
raise ValueError("A3 셀에서 날짜 또는 매장 정보 파싱 실패")
Returns:
tuple: (sale_date: date, shop_cd: str, shop_name: str)
sale_date = date_match.group(1)
shop_cd = shop_match.group(1)
shop_name = shop_match.group(2).strip()
Raises:
ValueError: 정규식 매칭 실패 시
"""
print(f"[INFO] {filepath} 상단 조회일자 및 매장 정보 추출 시작")
df_head = pd.read_excel(filepath, header=None, nrows=5)
first_row = df_head.iloc[2, 0] # 3행 A열 (0-based index)
match = HEADER_PATTERN.search(str(first_row))
if not match:
raise ValueError(f"[ERROR] 조회일자 및 매장 정보 추출 실패: {filepath}")
sale_date = datetime.strptime(match.group(1), "%Y-%m-%d").date()
shop_cd = match.group(2)
shop_name = match.group(3).strip()
print(f"[INFO] 추출된 조회일자: {sale_date}, 매장코드: {shop_cd}, 매장명: {shop_name}")
return sale_date, shop_cd, shop_name
def check_and_update_shop(shop_cd, shop_name):
conn = db.engine.connect()
try:
result = conn.execute(
"SELECT shop_name FROM fg_manager_pos_shop_name WHERE shop_cd = %s",
(shop_cd,)
).fetchone()
def load_excel_data(filepath: str):
"""
지정한 컬럼만 읽고, 헤더는 6번째 행(0-based index 5)으로 지정.
'합계'라는 단어가 '포스번호' 컬럼에 있으면 그 행부터 제거한다.
Args:
filepath (str): 엑셀파일 경로
Returns:
pd.DataFrame: 전처리된 데이터프레임
Raises:
ValueError: 필수 컬럼 누락 시
"""
print(f"[INFO] {filepath} 데이터 영역 로드 시작")
usecols = [
"포스번호", "영수증번호", "구분", "테이블명", "최초주문", "결제시각",
"상품코드", "바코드", "상품명", "수량", "총매출액", "ERP 매핑코드",
"비고", "할인액", "할인구분", "실매출액", "가액", "부가세"
]
# header=5 => 6번째 행이 컬럼명
df = pd.read_excel(filepath, header=5, dtype=str)
# 컬럼명 좌우 공백 제거
df.columns = df.columns.str.strip()
# '합계'인 행의 인덱스 찾기 및 제거
if '합계' in df['포스번호'].values:
idx = df[df['포스번호'] == '합계'].index[0]
df = df.loc[:idx-1]
print(f"[INFO] '합계' 행 이후 데이터 제거: {idx}번째 행부터 제외")
# 필수 컬럼 존재 여부 체크
if not set(usecols).issubset(df.columns):
raise ValueError(f"[ERROR] 필수 컬럼 누락: 현재 컬럼 {df.columns.tolist()}")
df = df[usecols]
print(f"[INFO] {filepath} 데이터 영역 로드 완료, 데이터 건수: {len(df)}")
return df
def normalize_data(df: pd.DataFrame, sale_date, shop_cd):
"""
컬럼명을 내부 규칙에 맞게 변경하고, 숫자 필드를 정수형으로 변환한다.
조회일자와 매장코드를 데이터프레임에 추가.
Args:
df (pd.DataFrame): 원본 데이터프레임
sale_date (date): 조회일자
shop_cd (str): 매장코드
Returns:
pd.DataFrame: 정규화된 데이터프레임
"""
print(f"[INFO] 데이터 정규화 시작")
def to_int(x):
try:
return int(str(x).replace(",", "").strip())
except:
return 0
df.rename(columns={
"포스번호": "pos_no",
"영수증번호": "bill_no",
"구분": "division",
"테이블명": "table_no",
"최초주문": "order_time",
"결제시각": "pay_time",
"상품코드": "product_cd",
"바코드": "barcode",
"상품명": "product_name",
"수량": "qty",
"총매출액": "tot_sale_amt",
"ERP 매핑코드": "erp_cd",
"비고": "remark",
"할인액": "dc_amt",
"할인구분": "dc_type",
"실매출액": "dcm_sale_amt",
"가액": "net_amt",
"부가세": "vat_amt"
}, inplace=True)
df["sale_date"] = sale_date
df["shop_cd"] = shop_cd
# 숫자형 컬럼 정수 변환
int_fields = ["qty", "tot_sale_amt", "dc_amt", "dcm_sale_amt", "net_amt", "vat_amt"]
for field in int_fields:
df[field] = df[field].apply(to_int)
# pos_no, bill_no는 반드시 int로 변환
df["pos_no"] = df["pos_no"].astype(int)
df["bill_no"] = df["bill_no"].astype(int)
print(f"[INFO] 데이터 정규화 완료")
return df
def upsert_data(df: pd.DataFrame, batch_size: int = 500) -> int:
"""
SQLAlchemy insert 구문을 사용하여
중복 PK 발생 시 update 처리 (on duplicate key update)
대량 데이터는 batch_size 단위로 나누어 처리
Args:
df (pd.DataFrame): DB에 삽입할 데이터
batch_size (int): 한번에 처리할 데이터 건수 (기본 500)
Returns:
int: 영향 받은 총 행 수
"""
print(f"[INFO] DB 저장 시작")
df = df.where(pd.notnull(df), None) # NaN → None 변환
engine = db.get_engine()
metadata = db_schema.metadata
table = db_schema.pos_billdata
total_affected = 0
with engine.connect() as conn:
for start in range(0, len(df), batch_size):
batch_df = df.iloc[start:start+batch_size]
records = batch_df.to_dict(orient="records")
insert_stmt = insert(table).values(records)
update_fields = {
col.name: insert_stmt.inserted[col.name]
for col in table.columns
if col.name not in table.primary_key.columns
}
upsert_stmt = insert_stmt.on_duplicate_key_update(update_fields)
try:
result = conn.execute(upsert_stmt)
conn.commit()
total_affected += result.rowcount
print(f"[INFO] 배치 처리 완료: {start} ~ {start+len(records)-1} / 영향 행 수: {result.rowcount}")
except Exception as e:
print(f"[ERROR] 배치 처리 실패: {start} ~ {start+len(records)-1} / 오류: {e}")
# 필요 시 raise 하거나 continue로 다음 배치 진행 가능
raise
print(f"[INFO] DB 저장 전체 완료, 총 영향 행 수: {total_affected}")
return total_affected
def ensure_shop_exists(shop_cd, shop_name):
"""
매장 정보 테이블에 매장코드가 없으면 신규 등록한다.
Args:
shop_cd (str): 매장 코드
shop_name (str): 매장 명
"""
print(f"[INFO] 매장 존재 여부 확인: {shop_cd}")
engine = db.get_engine()
conn = engine.connect()
shop_table = db_schema.pos_shop_name
try:
query = shop_table.select().where(shop_table.c.shop_cd == shop_cd)
result = conn.execute(query).fetchone()
if result is None:
conn.execute(
"INSERT INTO fg_manager_pos_shop_name (shop_cd, shop_name, used) VALUES (%s, %s, 1)",
(shop_cd, shop_name)
)
logging.info(f"매장코드 '{shop_cd}' 신규 등록: '{shop_name}'")
print(f"[INFO] 신규 매장 등록: {shop_cd} / {shop_name}")
ins = shop_table.insert().values(shop_cd=shop_cd, shop_name=shop_name)
conn.execute(ins)
conn.commit()
else:
existing_name = result[0]
if existing_name != shop_name:
print(f"매장코드 '{shop_cd}' 기존명: '{existing_name}', 새 이름: '{shop_name}'")
answer = input("매장명을 새 이름으로 변경하시겠습니까? (y/Enter=예, 그외=아니오): ").strip().lower()
if answer in ('', 'y', 'yes'):
conn.execute(
"UPDATE fg_manager_pos_shop_name SET shop_name = %s WHERE shop_cd = %s",
(shop_name, shop_cd)
)
logging.info(f"매장코드 '{shop_cd}' 매장명 변경: '{existing_name}' -> '{shop_name}'")
else:
logging.info(f"매장코드 '{shop_cd}' 매장명 변경 거부됨")
print(f"[INFO] 기존 매장 존재: {shop_cd}")
except Exception as e:
print(f"[ERROR] 매장 확인/등록 실패: {e}")
raise
finally:
conn.close()
def load_data(filepath, sale_date, shop_cd):
ext = os.path.splitext(filepath)[1].lower()
if ext == '.xls':
engine = 'xlrd'
elif ext == '.xlsx':
engine = 'openpyxl'
else:
raise ValueError(f"지원하지 않는 엑셀 확장자: {ext}")
df = pd.read_excel(filepath, header=6, engine=engine)
if '합계' in df.iloc[:, 0].values:
df = df[df.iloc[:, 0] != '합계']
rename_map = {
'포스번호': 'pos_no',
'영수증번호': 'bill_no',
'구분': 'division',
'테이블번호': 'table_no',
'최초주문': 'order_time',
'결제시각': 'pay_time',
'상품코드': 'product_cd',
'바코드': 'barcode',
'상품명': 'product_name',
'수량': 'qty',
'총매출액': 'tot_sale_amt',
'ERP매핑코드': 'erp_cd',
'비고': 'remark',
'할인액': 'dc_amt',
'할인구분': 'dc_type',
'실매출액': 'dcm_sale_amt',
'가액': 'net_amt',
'부가세': 'vat_amt',
}
df.rename(columns=rename_map, inplace=True)
df['sale_date'] = pd.to_datetime(sale_date).date()
df['shop_cd'] = shop_cd
int_cols = ['pos_no', 'bill_no', 'table_no', 'qty', 'tot_sale_amt', 'dc_amt', 'dcm_sale_amt', 'net_amt', 'vat_amt']
for col in int_cols:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(int)
import datetime
time_cols = ['order_time', 'pay_time']
for col in time_cols:
if col in df.columns:
def to_time(val):
if pd.isna(val):
return None
if isinstance(val, datetime.time):
return val
if isinstance(val, datetime.datetime):
return val.time()
try:
return datetime.datetime.strptime(str(val), '%H:%M:%S').time()
except Exception:
return None
df[col] = df[col].apply(to_time)
return df
def insert_data_to_db(engine, table, df):
with engine.begin() as conn:
for idx, row in df.iterrows():
data = row.to_dict()
stmt = mysql_insert(table).values(**data)
update_cols = {
'division': data.get('division'),
'table_no': data.get('table_no'),
'order_time': data.get('order_time'),
'pay_time': data.get('pay_time'),
'barcode': data.get('barcode'),
'product_name': data.get('product_name'),
'qty': data.get('qty'),
'tot_sale_amt': data.get('tot_sale_amt'),
'erp_cd': data.get('erp_cd'),
'remark': data.get('remark'),
'dc_amt': data.get('dc_amt'),
'dc_type': data.get('dc_type'),
'dcm_sale_amt': data.get('dcm_sale_amt'),
'net_amt': data.get('net_amt'),
'vat_amt': data.get('vat_amt'),
}
stmt = stmt.on_duplicate_key_update(**update_cols)
try:
conn.execute(stmt)
except Exception as e:
logging.error(f"{idx+1}행 DB 입력 실패: {e}")
logging.info("DB 저장 완료")
class BillFileHandler(FileSystemEventHandler):
def __init__(self):
super().__init__()
self.processing_files = set()
def on_created(self, event):
if event.is_directory:
return
filename = os.path.basename(event.src_path)
if FILE_PATTERN.match(filename):
if filename in self.processing_files:
return
self.processing_files.add(filename)
logging.info(f"새 파일 감지: {filename}")
try:
time.sleep(3) # 파일 완전 생성 대기
sale_date, shop_cd, shop_name = parse_header(event.src_path)
logging.info(f"파일: {filename}, 날짜: {sale_date}, 매장코드: {shop_cd}, 매장명: {shop_name}")
check_and_update_shop(shop_cd, shop_name)
df = load_data(event.src_path, sale_date, shop_cd)
if df.empty:
logging.warning(f"데이터가 없습니다: {filename}")
else:
insert_data_to_db(db.engine, db_schema.pos_billdata, df)
logging.info(f"{filename} 처리 완료")
os.remove(event.src_path)
logging.info(f"파일 삭제 완료: {filename}")
except Exception as e:
logging.error(f"{filename} 처리 실패: {e}")
finally:
self.processing_files.discard(filename)
def process_existing_files():
def main():
"""
대상 데이터 파일 목록을 찾고, 파일별로 처리 진행한다.
처리 성공 시 저장 건수를 출력하고, 실패 시 오류 메시지 출력.
"""
files = [f for f in os.listdir(DATA_DIR) if FILE_PATTERN.match(f)]
if not files:
logging.info("처리할 기존 파일이 없습니다.")
return
logging.info(f"시작 시 발견된 처리 대상 파일 {len(files)}")
handler = BillFileHandler()
for filename in files:
filepath = os.path.join(DATA_DIR, filename)
if filename in handler.processing_files:
continue
handler.processing_files.add(filename)
logging.info(f"기존 파일 처리 시작: {filename}")
print(f"[INFO] 발견된 파일 {len(files)}")
for file in files:
filepath = os.path.join(DATA_DIR, file)
print(f"[INFO] 파일: {file} 처리 시작")
try:
sale_date, shop_cd, shop_name = parse_header(filepath)
logging.info(f"파일: {filename}, 날짜: {sale_date}, 매장코드: {shop_cd}, 매장명: {shop_name}")
sale_date, shop_cd, shop_name = extract_file_info(filepath)
ensure_shop_exists(shop_cd, shop_name)
check_and_update_shop(shop_cd, shop_name)
raw_df = load_excel_data(filepath)
df = normalize_data(raw_df, sale_date, shop_cd)
df = load_data(filepath, sale_date, shop_cd)
if df.empty:
logging.warning(f"데이터가 없습니다: {filename}")
else:
insert_data_to_db(db.engine, db_schema.pos_billdata, df)
logging.info(f"{filename} 처리 완료")
affected = upsert_data(df)
print(f"[DONE] 처리 완료: {file} / 저장 건수: {affected}")
# 처리 완료 후 파일 삭제 (필요 시 활성화)
# os.remove(filepath)
# print(f"[INFO] 처리 완료 후 파일 삭제: {file}")
os.remove(filepath)
logging.info(f"파일 삭제 완료: {filename}")
except Exception as e:
logging.error(f"{filename} 처리 실패: {e}")
finally:
handler.processing_files.discard(filename)
def monitor_folder():
process_existing_files() # 최초 실행 시 한번 검사
event_handler = BillFileHandler()
observer = Observer()
observer.schedule(event_handler, path=DATA_DIR, recursive=False)
observer.start()
logging.info(f"폴더 모니터링 시작: {DATA_DIR}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
logging.info("폴더 모니터링 종료 요청됨")
observer.join()
print(f"[ERROR] {file} 처리 실패: {e}")
if __name__ == "__main__":
monitor_folder()
main()