# POS Update ''' OK포스 > 매출관리 > 일자별 > 상품별 > 날짜 지정 > 조회줄수 5000으로 변경 > 엑셀 추출파일을 ./data에 복사 본 파일 실행하면 자동으로 mariadb의 DB에 삽입함. ''' import sys, os import pandas as pd from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.exc import IntegrityError sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from conf import db, db_schema from lib.common import load_config CONFIG = load_config() DATA_DIR = os.path.join(os.path.dirname(__file__), '../data') def update_pos_table(engine, table, df): with engine.begin() as conn: for idx, row in df.iterrows(): data = row.to_dict() stmt = mysql_insert(table).values(**data) update_data = { 'qty': data['qty'], 'tot_amount': data['tot_amount'], 'tot_discount': data['tot_discount'], 'actual_amount': data['actual_amount'] } stmt = stmt.on_duplicate_key_update(**update_data) try: conn.execute(stmt) except IntegrityError as e: print(f"[ERROR] {idx + 1}행 삽입 실패: {e}") print("[DONE] 모든 데이터 삽입 완료") def process_file(filepath, table, engine): print(f"[INFO] 처리 시작: {filepath}") try: ext = os.path.splitext(filepath)[-1].lower() if ext == ".xls": df = pd.read_excel(filepath, header=5, engine="xlrd") elif ext == ".xlsx": df = pd.read_excel(filepath, header=5, engine="openpyxl") else: raise ValueError("지원하지 않는 파일 형식입니다.") df = df[df.iloc[:, 0] != '합계'] df.rename(columns={ '일자': 'date', '대분류': 'ca01', '중분류': 'ca02', '소분류': 'ca03', '상품코드': 'barcode', '상품명': 'name', '수량': 'qty', '총매출액': 'tot_amount', '총할인액': 'tot_discount', '실매출액': 'actual_amount' }, inplace=True) df.drop(columns=[col for col in ['idx'] if col in df.columns], inplace=True) df['date'] = pd.to_datetime(df['date']).dt.date df['barcode'] = df['barcode'].astype(int) df['qty'] = df['qty'].astype(int) df['tot_amount'] = df['tot_amount'].astype(int) df['tot_discount'] = df['tot_discount'].astype(int) df['actual_amount'] = df['actual_amount'].astype(int) if df.empty: print("[WARN] 데이터가 없습니다.") return False, 0 except Exception as e: print(f"[ERROR] 로드 실패: {e}") return False, 0 print(f"[INFO] 데이터 건수: {len(df)}") update_pos_table(engine, table, df) print(f"[INFO] 처리 완료: {filepath}") return True, len(df) def batch_process_files(table, engine): files = [f for f in os.listdir(DATA_DIR) if f.startswith("일자별 (상품별)") and f.endswith(('.xlsx', '.xls'))] if not files: print("[INFO] 처리할 파일이 없습니다.") return False print(f"[INFO] {len(files)}개의 파일을 찾았습니다.") total_rows = 0 deleted_files = 0 for fname in files: full_path = os.path.join(DATA_DIR, fname) success, count = process_file(full_path, table, engine) if success: total_rows += count try: os.remove(full_path) print(f"[INFO] 파일 삭제 완료: {fname}") deleted_files += 1 except Exception as e: print(f"[WARN] 파일 삭제 실패: {fname} / {e}") print(f"[INFO] 총 처리 데이터 건수: {total_rows}") print(f"[INFO] 삭제된 파일 수: {deleted_files}") return True def main(): engine = db.engine try: table = db_schema.pos except AttributeError: print("[ERROR] 'pos' 테이블이 db_schema에 정의되어 있지 않습니다.") return batch_done = batch_process_files(table, engine) if not batch_done: print("[INFO] 처리할 데이터가 없습니다.") if __name__ == "__main__": main()