import os import sys import re import time import logging from datetime import datetime import pandas as pd import xlrd from openpyxl import load_workbook from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.exc import IntegrityError from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler # 프로젝트 루트 상위 경로 추가 sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from conf import db, db_schema from lib.common import load_config DATA_DIR = os.path.join(os.path.dirname(__file__), '../data') FILE_PATTERN = re.compile(r"^영수증별매출상세현황.*\.xls[x]?$") # xls 또는 xlsx logging.basicConfig( level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', ) def parse_header(filepath): ext = os.path.splitext(filepath)[1].lower() if ext == '.xlsx': from openpyxl import load_workbook wb = load_workbook(filepath, read_only=True, data_only=True) ws = wb.active cell_val = ws['A3'].value elif ext == '.xls': wb = xlrd.open_workbook(filepath) sheet = wb.sheet_by_index(0) cell_val = sheet.cell_value(2, 0) # 0-based row,col → A3 is row=2, col=0 else: raise ValueError(f"지원하지 않는 확장자: {ext}") if not cell_val: raise ValueError("A3 셀에 값이 없습니다.") date_match = re.search(r'조회일자\s*:\s*([\d\-]+)', cell_val) shop_match = re.search(r'매장선택\s*:\s*\[(.*?)\]\s*(.+)', cell_val) if not date_match or not shop_match: raise ValueError("A3 셀에서 날짜 또는 매장 정보 파싱 실패") sale_date = date_match.group(1) shop_cd = shop_match.group(1) shop_name = shop_match.group(2).strip() return sale_date, shop_cd, shop_name def check_and_update_shop(shop_cd, shop_name): conn = db.engine.connect() try: result = conn.execute( "SELECT shop_name FROM fg_manager_pos_shop_name WHERE shop_cd = %s", (shop_cd,) ).fetchone() if result is None: conn.execute( "INSERT INTO fg_manager_pos_shop_name (shop_cd, shop_name, used) VALUES (%s, %s, 1)", (shop_cd, shop_name) ) logging.info(f"매장코드 '{shop_cd}' 신규 등록: '{shop_name}'") else: existing_name = result[0] if existing_name != shop_name: print(f"매장코드 '{shop_cd}' 기존명: '{existing_name}', 새 이름: '{shop_name}'") answer = input("매장명을 새 이름으로 변경하시겠습니까? (y/Enter=예, 그외=아니오): ").strip().lower() if answer in ('', 'y', 'yes'): conn.execute( "UPDATE fg_manager_pos_shop_name SET shop_name = %s WHERE shop_cd = %s", (shop_name, shop_cd) ) logging.info(f"매장코드 '{shop_cd}' 매장명 변경: '{existing_name}' -> '{shop_name}'") else: logging.info(f"매장코드 '{shop_cd}' 매장명 변경 거부됨") finally: conn.close() def load_data(filepath, sale_date, shop_cd): ext = os.path.splitext(filepath)[1].lower() if ext == '.xls': engine = 'xlrd' elif ext == '.xlsx': engine = 'openpyxl' else: raise ValueError(f"지원하지 않는 엑셀 확장자: {ext}") df = pd.read_excel(filepath, header=6, engine=engine) if '합계' in df.iloc[:, 0].values: df = df[df.iloc[:, 0] != '합계'] rename_map = { '포스번호': 'pos_no', '영수증번호': 'bill_no', '구분': 'division', '테이블번호': 'table_no', '최초주문': 'order_time', '결제시각': 'pay_time', '상품코드': 'product_cd', '바코드': 'barcode', '상품명': 'product_name', '수량': 'qty', '총매출액': 'tot_sale_amt', 'ERP매핑코드': 'erp_cd', '비고': 'remark', '할인액': 'dc_amt', '할인구분': 'dc_type', '실매출액': 'dcm_sale_amt', '가액': 'net_amt', '부가세': 'vat_amt', } df.rename(columns=rename_map, inplace=True) df['sale_date'] = pd.to_datetime(sale_date).date() df['shop_cd'] = shop_cd int_cols = ['pos_no', 'bill_no', 'table_no', 'qty', 'tot_sale_amt', 'dc_amt', 'dcm_sale_amt', 'net_amt', 'vat_amt'] for col in int_cols: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(int) import datetime time_cols = ['order_time', 'pay_time'] for col in time_cols: if col in df.columns: def to_time(val): if pd.isna(val): return None if isinstance(val, datetime.time): return val if isinstance(val, datetime.datetime): return val.time() try: return datetime.datetime.strptime(str(val), '%H:%M:%S').time() except Exception: return None df[col] = df[col].apply(to_time) return df def insert_data_to_db(engine, table, df): with engine.begin() as conn: for idx, row in df.iterrows(): data = row.to_dict() stmt = mysql_insert(table).values(**data) update_cols = { 'division': data.get('division'), 'table_no': data.get('table_no'), 'order_time': data.get('order_time'), 'pay_time': data.get('pay_time'), 'barcode': data.get('barcode'), 'product_name': data.get('product_name'), 'qty': data.get('qty'), 'tot_sale_amt': data.get('tot_sale_amt'), 'erp_cd': data.get('erp_cd'), 'remark': data.get('remark'), 'dc_amt': data.get('dc_amt'), 'dc_type': data.get('dc_type'), 'dcm_sale_amt': data.get('dcm_sale_amt'), 'net_amt': data.get('net_amt'), 'vat_amt': data.get('vat_amt'), } stmt = stmt.on_duplicate_key_update(**update_cols) try: conn.execute(stmt) except Exception as e: logging.error(f"{idx+1}행 DB 입력 실패: {e}") logging.info("DB 저장 완료") class BillFileHandler(FileSystemEventHandler): def __init__(self): super().__init__() self.processing_files = set() def on_created(self, event): if event.is_directory: return filename = os.path.basename(event.src_path) if FILE_PATTERN.match(filename): if filename in self.processing_files: return self.processing_files.add(filename) logging.info(f"새 파일 감지: {filename}") try: time.sleep(3) # 파일 완전 생성 대기 sale_date, shop_cd, shop_name = parse_header(event.src_path) logging.info(f"파일: {filename}, 날짜: {sale_date}, 매장코드: {shop_cd}, 매장명: {shop_name}") check_and_update_shop(shop_cd, shop_name) df = load_data(event.src_path, sale_date, shop_cd) if df.empty: logging.warning(f"데이터가 없습니다: {filename}") else: insert_data_to_db(db.engine, db_schema.pos_billdata, df) logging.info(f"{filename} 처리 완료") os.remove(event.src_path) logging.info(f"파일 삭제 완료: {filename}") except Exception as e: logging.error(f"{filename} 처리 실패: {e}") finally: self.processing_files.discard(filename) def process_existing_files(): files = [f for f in os.listdir(DATA_DIR) if FILE_PATTERN.match(f)] if not files: logging.info("처리할 기존 파일이 없습니다.") return logging.info(f"시작 시 발견된 처리 대상 파일 {len(files)}개") handler = BillFileHandler() for filename in files: filepath = os.path.join(DATA_DIR, filename) if filename in handler.processing_files: continue handler.processing_files.add(filename) logging.info(f"기존 파일 처리 시작: {filename}") try: sale_date, shop_cd, shop_name = parse_header(filepath) logging.info(f"파일: {filename}, 날짜: {sale_date}, 매장코드: {shop_cd}, 매장명: {shop_name}") check_and_update_shop(shop_cd, shop_name) df = load_data(filepath, sale_date, shop_cd) if df.empty: logging.warning(f"데이터가 없습니다: {filename}") else: insert_data_to_db(db.engine, db_schema.pos_billdata, df) logging.info(f"{filename} 처리 완료") os.remove(filepath) logging.info(f"파일 삭제 완료: {filename}") except Exception as e: logging.error(f"{filename} 처리 실패: {e}") finally: handler.processing_files.discard(filename) def monitor_folder(): process_existing_files() # 최초 실행 시 한번 검사 event_handler = BillFileHandler() observer = Observer() observer.schedule(event_handler, path=DATA_DIR, recursive=False) observer.start() logging.info(f"폴더 모니터링 시작: {DATA_DIR}") try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() logging.info("폴더 모니터링 종료 요청됨") observer.join() if __name__ == "__main__": monitor_folder()