Files
static/lib/pos_update_daily_product.py

129 lines
4.2 KiB
Python

# POS Update
'''
OK포스 > 매출관리 > 일자별 > 상품별 > 날짜 지정 > 조회줄수 5000으로 변경 > 엑셀
추출파일을 ./data에 복사
본 파일 실행하면 자동으로 mariadb의 DB에 삽입함.
'''
import sys, os
import pandas as pd
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.exc import IntegrityError
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from conf import db, db_schema
from lib.common import load_config
CONFIG = load_config()
DATA_DIR = os.path.join(os.path.dirname(__file__), '../data')
def update_pos_table(engine, table, df):
with engine.begin() as conn:
for idx, row in df.iterrows():
data = row.to_dict()
stmt = mysql_insert(table).values(**data)
update_data = {
'qty': data['qty'],
'tot_amount': data['tot_amount'],
'tot_discount': data['tot_discount'],
'actual_amount': data['actual_amount']
}
stmt = stmt.on_duplicate_key_update(**update_data)
try:
conn.execute(stmt)
except IntegrityError as e:
print(f"[ERROR] {idx + 1}행 삽입 실패: {e}")
print("[DONE] 모든 데이터 삽입 완료")
def process_file(filepath, table, engine):
print(f"[INFO] 처리 시작: {filepath}")
try:
ext = os.path.splitext(filepath)[-1].lower()
if ext == ".xls":
df = pd.read_excel(filepath, header=5, engine="xlrd")
elif ext == ".xlsx":
df = pd.read_excel(filepath, header=5, engine="openpyxl")
else:
raise ValueError("지원하지 않는 파일 형식입니다.")
df = df[df.iloc[:, 0] != '합계']
df.rename(columns={
'일자': 'date',
'대분류': 'ca01',
'중분류': 'ca02',
'소분류': 'ca03',
'상품코드': 'barcode',
'상품명': 'name',
'수량': 'qty',
'총매출액': 'tot_amount',
'총할인액': 'tot_discount',
'실매출액': 'actual_amount'
}, inplace=True)
df.drop(columns=[col for col in ['idx'] if col in df.columns], inplace=True)
df['date'] = pd.to_datetime(df['date']).dt.date
df['barcode'] = df['barcode'].astype(int)
df['qty'] = df['qty'].astype(int)
df['tot_amount'] = df['tot_amount'].astype(int)
df['tot_discount'] = df['tot_discount'].astype(int)
df['actual_amount'] = df['actual_amount'].astype(int)
if df.empty:
print("[WARN] 데이터가 없습니다.")
return False, 0
except Exception as e:
print(f"[ERROR] 로드 실패: {e}")
return False, 0
print(f"[INFO] 데이터 건수: {len(df)}")
update_pos_table(engine, table, df)
print(f"[INFO] 처리 완료: {filepath}")
return True, len(df)
def batch_process_files(table, engine):
files = [f for f in os.listdir(DATA_DIR) if f.startswith("일자별 (상품별)") and f.endswith(('.xlsx', '.xls'))]
if not files:
print("[INFO] 처리할 파일이 없습니다.")
return False
print(f"[INFO] {len(files)}개의 파일을 찾았습니다.")
total_rows = 0
deleted_files = 0
for fname in files:
full_path = os.path.join(DATA_DIR, fname)
success, count = process_file(full_path, table, engine)
if success:
total_rows += count
try:
os.remove(full_path)
print(f"[INFO] 파일 삭제 완료: {fname}")
deleted_files += 1
except Exception as e:
print(f"[WARN] 파일 삭제 실패: {fname} / {e}")
print(f"[INFO] 총 처리 데이터 건수: {total_rows}")
print(f"[INFO] 삭제된 파일 수: {deleted_files}")
return True
def main():
engine = db.engine
try:
table = db_schema.pos
except AttributeError:
print("[ERROR] 'pos' 테이블이 db_schema에 정의되어 있지 않습니다.")
return
batch_done = batch_process_files(table, engine)
if not batch_done:
print("[INFO] 처리할 데이터가 없습니다.")
if __name__ == "__main__":
main()