Files
static/lib/pos_update_bill.py

267 lines
9.7 KiB
Python

import os
import sys
import re
import time
import logging
from datetime import datetime
import pandas as pd
import xlrd
from openpyxl import load_workbook
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.exc import IntegrityError
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
# 프로젝트 루트 상위 경로 추가
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from conf import db, db_schema
from lib.common import load_config
DATA_DIR = os.path.join(os.path.dirname(__file__), '../data')
FILE_PATTERN = re.compile(r"^영수증별매출상세현황.*\.xls[x]?$") # xls 또는 xlsx
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
)
def parse_header(filepath):
ext = os.path.splitext(filepath)[1].lower()
if ext == '.xlsx':
from openpyxl import load_workbook
wb = load_workbook(filepath, read_only=True, data_only=True)
ws = wb.active
cell_val = ws['A3'].value
elif ext == '.xls':
wb = xlrd.open_workbook(filepath)
sheet = wb.sheet_by_index(0)
cell_val = sheet.cell_value(2, 0) # 0-based row,col → A3 is row=2, col=0
else:
raise ValueError(f"지원하지 않는 확장자: {ext}")
if not cell_val:
raise ValueError("A3 셀에 값이 없습니다.")
date_match = re.search(r'조회일자\s*:\s*([\d\-]+)', cell_val)
shop_match = re.search(r'매장선택\s*:\s*\[(.*?)\]\s*(.+)', cell_val)
if not date_match or not shop_match:
raise ValueError("A3 셀에서 날짜 또는 매장 정보 파싱 실패")
sale_date = date_match.group(1)
shop_cd = shop_match.group(1)
shop_name = shop_match.group(2).strip()
return sale_date, shop_cd, shop_name
def check_and_update_shop(shop_cd, shop_name):
conn = db.engine.connect()
try:
result = conn.execute(
"SELECT shop_name FROM fg_manager_pos_shop_name WHERE shop_cd = %s",
(shop_cd,)
).fetchone()
if result is None:
conn.execute(
"INSERT INTO fg_manager_pos_shop_name (shop_cd, shop_name, used) VALUES (%s, %s, 1)",
(shop_cd, shop_name)
)
logging.info(f"매장코드 '{shop_cd}' 신규 등록: '{shop_name}'")
else:
existing_name = result[0]
if existing_name != shop_name:
print(f"매장코드 '{shop_cd}' 기존명: '{existing_name}', 새 이름: '{shop_name}'")
answer = input("매장명을 새 이름으로 변경하시겠습니까? (y/Enter=예, 그외=아니오): ").strip().lower()
if answer in ('', 'y', 'yes'):
conn.execute(
"UPDATE fg_manager_pos_shop_name SET shop_name = %s WHERE shop_cd = %s",
(shop_name, shop_cd)
)
logging.info(f"매장코드 '{shop_cd}' 매장명 변경: '{existing_name}' -> '{shop_name}'")
else:
logging.info(f"매장코드 '{shop_cd}' 매장명 변경 거부됨")
finally:
conn.close()
def load_data(filepath, sale_date, shop_cd):
ext = os.path.splitext(filepath)[1].lower()
if ext == '.xls':
engine = 'xlrd'
elif ext == '.xlsx':
engine = 'openpyxl'
else:
raise ValueError(f"지원하지 않는 엑셀 확장자: {ext}")
df = pd.read_excel(filepath, header=6, engine=engine)
if '합계' in df.iloc[:, 0].values:
df = df[df.iloc[:, 0] != '합계']
rename_map = {
'포스번호': 'pos_no',
'영수증번호': 'bill_no',
'구분': 'division',
'테이블번호': 'table_no',
'최초주문': 'order_time',
'결제시각': 'pay_time',
'상품코드': 'product_cd',
'바코드': 'barcode',
'상품명': 'product_name',
'수량': 'qty',
'총매출액': 'tot_sale_amt',
'ERP매핑코드': 'erp_cd',
'비고': 'remark',
'할인액': 'dc_amt',
'할인구분': 'dc_type',
'실매출액': 'dcm_sale_amt',
'가액': 'net_amt',
'부가세': 'vat_amt',
}
df.rename(columns=rename_map, inplace=True)
df['sale_date'] = pd.to_datetime(sale_date).date()
df['shop_cd'] = shop_cd
int_cols = ['pos_no', 'bill_no', 'table_no', 'qty', 'tot_sale_amt', 'dc_amt', 'dcm_sale_amt', 'net_amt', 'vat_amt']
for col in int_cols:
if col in df.columns:
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(int)
import datetime
time_cols = ['order_time', 'pay_time']
for col in time_cols:
if col in df.columns:
def to_time(val):
if pd.isna(val):
return None
if isinstance(val, datetime.time):
return val
if isinstance(val, datetime.datetime):
return val.time()
try:
return datetime.datetime.strptime(str(val), '%H:%M:%S').time()
except Exception:
return None
df[col] = df[col].apply(to_time)
return df
def insert_data_to_db(engine, table, df):
with engine.begin() as conn:
for idx, row in df.iterrows():
data = row.to_dict()
stmt = mysql_insert(table).values(**data)
update_cols = {
'division': data.get('division'),
'table_no': data.get('table_no'),
'order_time': data.get('order_time'),
'pay_time': data.get('pay_time'),
'barcode': data.get('barcode'),
'product_name': data.get('product_name'),
'qty': data.get('qty'),
'tot_sale_amt': data.get('tot_sale_amt'),
'erp_cd': data.get('erp_cd'),
'remark': data.get('remark'),
'dc_amt': data.get('dc_amt'),
'dc_type': data.get('dc_type'),
'dcm_sale_amt': data.get('dcm_sale_amt'),
'net_amt': data.get('net_amt'),
'vat_amt': data.get('vat_amt'),
}
stmt = stmt.on_duplicate_key_update(**update_cols)
try:
conn.execute(stmt)
except Exception as e:
logging.error(f"{idx+1}행 DB 입력 실패: {e}")
logging.info("DB 저장 완료")
class BillFileHandler(FileSystemEventHandler):
def __init__(self):
super().__init__()
self.processing_files = set()
def on_created(self, event):
if event.is_directory:
return
filename = os.path.basename(event.src_path)
if FILE_PATTERN.match(filename):
if filename in self.processing_files:
return
self.processing_files.add(filename)
logging.info(f"새 파일 감지: {filename}")
try:
time.sleep(3) # 파일 완전 생성 대기
sale_date, shop_cd, shop_name = parse_header(event.src_path)
logging.info(f"파일: {filename}, 날짜: {sale_date}, 매장코드: {shop_cd}, 매장명: {shop_name}")
check_and_update_shop(shop_cd, shop_name)
df = load_data(event.src_path, sale_date, shop_cd)
if df.empty:
logging.warning(f"데이터가 없습니다: {filename}")
else:
insert_data_to_db(db.engine, db_schema.pos_billdata, df)
logging.info(f"{filename} 처리 완료")
os.remove(event.src_path)
logging.info(f"파일 삭제 완료: {filename}")
except Exception as e:
logging.error(f"{filename} 처리 실패: {e}")
finally:
self.processing_files.discard(filename)
def process_existing_files():
files = [f for f in os.listdir(DATA_DIR) if FILE_PATTERN.match(f)]
if not files:
logging.info("처리할 기존 파일이 없습니다.")
return
logging.info(f"시작 시 발견된 처리 대상 파일 {len(files)}")
handler = BillFileHandler()
for filename in files:
filepath = os.path.join(DATA_DIR, filename)
if filename in handler.processing_files:
continue
handler.processing_files.add(filename)
logging.info(f"기존 파일 처리 시작: {filename}")
try:
sale_date, shop_cd, shop_name = parse_header(filepath)
logging.info(f"파일: {filename}, 날짜: {sale_date}, 매장코드: {shop_cd}, 매장명: {shop_name}")
check_and_update_shop(shop_cd, shop_name)
df = load_data(filepath, sale_date, shop_cd)
if df.empty:
logging.warning(f"데이터가 없습니다: {filename}")
else:
insert_data_to_db(db.engine, db_schema.pos_billdata, df)
logging.info(f"{filename} 처리 완료")
os.remove(filepath)
logging.info(f"파일 삭제 완료: {filename}")
except Exception as e:
logging.error(f"{filename} 처리 실패: {e}")
finally:
handler.processing_files.discard(filename)
def monitor_folder():
process_existing_files() # 최초 실행 시 한번 검사
event_handler = BillFileHandler()
observer = Observer()
observer.schedule(event_handler, path=DATA_DIR, recursive=False)
observer.start()
logging.info(f"폴더 모니터링 시작: {DATA_DIR}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
logging.info("폴더 모니터링 종료 요청됨")
observer.join()
if __name__ == "__main__":
monitor_folder()