diff --git a/conf/db_schema.py b/conf/db_schema.py index 95e345e..704a1bd 100644 --- a/conf/db_schema.py +++ b/conf/db_schema.py @@ -1,7 +1,7 @@ # db_schema.py import os import yaml -from sqlalchemy import Table, Column, Date, Integer, String, Float, Text, MetaData, UniqueConstraint, DateTime +from sqlalchemy import Table, Column, Date, Integer, String, Float, Text, MetaData, UniqueConstraint, DateTime, Time, PrimaryKeyConstraint from sqlalchemy.sql import func BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) @@ -188,3 +188,39 @@ holiday = Table( Column('updated_at', DateTime, server_default=func.now(), onupdate=func.now(), comment='수정일시'), comment='한국천문연구원 특일정보' ) + +pos_billdata = Table( + get_full_table_name('manager_static_pos_billdata'), metadata, + Column('sale_date', Date, nullable=False), + Column('shop_cd', String(20), nullable=False), + Column('pos_no', Integer, nullable=False), + Column('bill_no', Integer, nullable=False), + Column('product_cd', String(20), nullable=False), + Column('division', String(10)), + Column('table_no', Integer), + Column('order_time', Time), + Column('pay_time', Time), + Column('barcode', String(20)), + Column('product_name', String(100)), + Column('qty', Integer), + Column('tot_sale_amt', Integer), + Column('erp_cd', String(50)), + Column('remark', Text), + Column('dc_amt', Integer), + Column('dc_type', String(50)), + Column('dcm_sale_amt', Integer), + Column('net_amt', Integer), + Column('vat_amt', Integer), + PrimaryKeyConstraint('sale_date', 'shop_cd', 'pos_no', 'bill_no', 'product_cd') +) + +pos_shop_name = Table( + get_full_table_name('pos_shop_name'), metadata, + Column('shop_cd', String(20), primary_key=True, nullable=False), + Column('shop_name', String(100), nullable=False), + Column('used', Integer, nullable=False, default=1, comment='사용여부 (1=사용, 0=미사용)'), + Column('created_at', DateTime, server_default=func.current_timestamp(), comment='등록일시'), + Column('updated_at', DateTime, server_default=func.current_timestamp(), onupdate=func.current_timestamp(), comment='수정일시'), + mysql_engine='InnoDB', + mysql_charset='utf8mb4', +) diff --git a/lib/pos_update_bill.py b/lib/pos_update_bill.py new file mode 100644 index 0000000..d7f0446 --- /dev/null +++ b/lib/pos_update_bill.py @@ -0,0 +1,266 @@ +import os +import sys +import re +import time +import logging +from datetime import datetime +import pandas as pd +import xlrd +from openpyxl import load_workbook +from sqlalchemy.dialects.mysql import insert as mysql_insert +from sqlalchemy.exc import IntegrityError +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler + +# 프로젝트 루트 상위 경로 추가 +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +from conf import db, db_schema +from lib.common import load_config + +DATA_DIR = os.path.join(os.path.dirname(__file__), '../data') +FILE_PATTERN = re.compile(r"^영수증별매출상세현황.*\.xls[x]?$") # xls 또는 xlsx + +logging.basicConfig( + level=logging.INFO, + format='[%(asctime)s] %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', +) + +def parse_header(filepath): + ext = os.path.splitext(filepath)[1].lower() + if ext == '.xlsx': + from openpyxl import load_workbook + wb = load_workbook(filepath, read_only=True, data_only=True) + ws = wb.active + cell_val = ws['A3'].value + elif ext == '.xls': + wb = xlrd.open_workbook(filepath) + sheet = wb.sheet_by_index(0) + cell_val = sheet.cell_value(2, 0) # 0-based row,col → A3 is row=2, col=0 + else: + raise ValueError(f"지원하지 않는 확장자: {ext}") + + if not cell_val: + raise ValueError("A3 셀에 값이 없습니다.") + + date_match = re.search(r'조회일자\s*:\s*([\d\-]+)', cell_val) + shop_match = re.search(r'매장선택\s*:\s*\[(.*?)\]\s*(.+)', cell_val) + + if not date_match or not shop_match: + raise ValueError("A3 셀에서 날짜 또는 매장 정보 파싱 실패") + + sale_date = date_match.group(1) + shop_cd = shop_match.group(1) + shop_name = shop_match.group(2).strip() + + return sale_date, shop_cd, shop_name + +def check_and_update_shop(shop_cd, shop_name): + conn = db.engine.connect() + try: + result = conn.execute( + "SELECT shop_name FROM fg_manager_pos_shop_name WHERE shop_cd = %s", + (shop_cd,) + ).fetchone() + + if result is None: + conn.execute( + "INSERT INTO fg_manager_pos_shop_name (shop_cd, shop_name, used) VALUES (%s, %s, 1)", + (shop_cd, shop_name) + ) + logging.info(f"매장코드 '{shop_cd}' 신규 등록: '{shop_name}'") + else: + existing_name = result[0] + if existing_name != shop_name: + print(f"매장코드 '{shop_cd}' 기존명: '{existing_name}', 새 이름: '{shop_name}'") + answer = input("매장명을 새 이름으로 변경하시겠습니까? (y/Enter=예, 그외=아니오): ").strip().lower() + if answer in ('', 'y', 'yes'): + conn.execute( + "UPDATE fg_manager_pos_shop_name SET shop_name = %s WHERE shop_cd = %s", + (shop_name, shop_cd) + ) + logging.info(f"매장코드 '{shop_cd}' 매장명 변경: '{existing_name}' -> '{shop_name}'") + else: + logging.info(f"매장코드 '{shop_cd}' 매장명 변경 거부됨") + finally: + conn.close() + +def load_data(filepath, sale_date, shop_cd): + ext = os.path.splitext(filepath)[1].lower() + if ext == '.xls': + engine = 'xlrd' + elif ext == '.xlsx': + engine = 'openpyxl' + else: + raise ValueError(f"지원하지 않는 엑셀 확장자: {ext}") + + df = pd.read_excel(filepath, header=6, engine=engine) + + if '합계' in df.iloc[:, 0].values: + df = df[df.iloc[:, 0] != '합계'] + + rename_map = { + '포스번호': 'pos_no', + '영수증번호': 'bill_no', + '구분': 'division', + '테이블번호': 'table_no', + '최초주문': 'order_time', + '결제시각': 'pay_time', + '상품코드': 'product_cd', + '바코드': 'barcode', + '상품명': 'product_name', + '수량': 'qty', + '총매출액': 'tot_sale_amt', + 'ERP매핑코드': 'erp_cd', + '비고': 'remark', + '할인액': 'dc_amt', + '할인구분': 'dc_type', + '실매출액': 'dcm_sale_amt', + '가액': 'net_amt', + '부가세': 'vat_amt', + } + df.rename(columns=rename_map, inplace=True) + + df['sale_date'] = pd.to_datetime(sale_date).date() + df['shop_cd'] = shop_cd + + int_cols = ['pos_no', 'bill_no', 'table_no', 'qty', 'tot_sale_amt', 'dc_amt', 'dcm_sale_amt', 'net_amt', 'vat_amt'] + for col in int_cols: + if col in df.columns: + df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(int) + + import datetime + time_cols = ['order_time', 'pay_time'] + for col in time_cols: + if col in df.columns: + def to_time(val): + if pd.isna(val): + return None + if isinstance(val, datetime.time): + return val + if isinstance(val, datetime.datetime): + return val.time() + try: + return datetime.datetime.strptime(str(val), '%H:%M:%S').time() + except Exception: + return None + df[col] = df[col].apply(to_time) + + return df + +def insert_data_to_db(engine, table, df): + with engine.begin() as conn: + for idx, row in df.iterrows(): + data = row.to_dict() + stmt = mysql_insert(table).values(**data) + update_cols = { + 'division': data.get('division'), + 'table_no': data.get('table_no'), + 'order_time': data.get('order_time'), + 'pay_time': data.get('pay_time'), + 'barcode': data.get('barcode'), + 'product_name': data.get('product_name'), + 'qty': data.get('qty'), + 'tot_sale_amt': data.get('tot_sale_amt'), + 'erp_cd': data.get('erp_cd'), + 'remark': data.get('remark'), + 'dc_amt': data.get('dc_amt'), + 'dc_type': data.get('dc_type'), + 'dcm_sale_amt': data.get('dcm_sale_amt'), + 'net_amt': data.get('net_amt'), + 'vat_amt': data.get('vat_amt'), + } + stmt = stmt.on_duplicate_key_update(**update_cols) + try: + conn.execute(stmt) + except Exception as e: + logging.error(f"{idx+1}행 DB 입력 실패: {e}") + logging.info("DB 저장 완료") + +class BillFileHandler(FileSystemEventHandler): + def __init__(self): + super().__init__() + self.processing_files = set() + + def on_created(self, event): + if event.is_directory: + return + filename = os.path.basename(event.src_path) + if FILE_PATTERN.match(filename): + if filename in self.processing_files: + return + self.processing_files.add(filename) + logging.info(f"새 파일 감지: {filename}") + try: + time.sleep(3) # 파일 완전 생성 대기 + sale_date, shop_cd, shop_name = parse_header(event.src_path) + logging.info(f"파일: {filename}, 날짜: {sale_date}, 매장코드: {shop_cd}, 매장명: {shop_name}") + + check_and_update_shop(shop_cd, shop_name) + + df = load_data(event.src_path, sale_date, shop_cd) + if df.empty: + logging.warning(f"데이터가 없습니다: {filename}") + else: + insert_data_to_db(db.engine, db_schema.pos_billdata, df) + logging.info(f"{filename} 처리 완료") + + os.remove(event.src_path) + logging.info(f"파일 삭제 완료: {filename}") + except Exception as e: + logging.error(f"{filename} 처리 실패: {e}") + finally: + self.processing_files.discard(filename) + +def process_existing_files(): + files = [f for f in os.listdir(DATA_DIR) if FILE_PATTERN.match(f)] + if not files: + logging.info("처리할 기존 파일이 없습니다.") + return + logging.info(f"시작 시 발견된 처리 대상 파일 {len(files)}개") + handler = BillFileHandler() + for filename in files: + filepath = os.path.join(DATA_DIR, filename) + if filename in handler.processing_files: + continue + handler.processing_files.add(filename) + logging.info(f"기존 파일 처리 시작: {filename}") + try: + sale_date, shop_cd, shop_name = parse_header(filepath) + logging.info(f"파일: {filename}, 날짜: {sale_date}, 매장코드: {shop_cd}, 매장명: {shop_name}") + + check_and_update_shop(shop_cd, shop_name) + + df = load_data(filepath, sale_date, shop_cd) + if df.empty: + logging.warning(f"데이터가 없습니다: {filename}") + else: + insert_data_to_db(db.engine, db_schema.pos_billdata, df) + logging.info(f"{filename} 처리 완료") + + os.remove(filepath) + logging.info(f"파일 삭제 완료: {filename}") + except Exception as e: + logging.error(f"{filename} 처리 실패: {e}") + finally: + handler.processing_files.discard(filename) + +def monitor_folder(): + process_existing_files() # 최초 실행 시 한번 검사 + + event_handler = BillFileHandler() + observer = Observer() + observer.schedule(event_handler, path=DATA_DIR, recursive=False) + observer.start() + logging.info(f"폴더 모니터링 시작: {DATA_DIR}") + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + observer.stop() + logging.info("폴더 모니터링 종료 요청됨") + observer.join() + + +if __name__ == "__main__": + monitor_folder() diff --git a/requirements.txt b/requirements.txt index 6b0461a..aed05ad 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,4 +12,5 @@ statsmodels scikit-learn customtkinter tkcalendar -tabulate \ No newline at end of file +tabulate +watchdog \ No newline at end of file