131 lines
4.3 KiB
Python
131 lines
4.3 KiB
Python
# POS Update
|
|
'''
|
|
OK포스 > 매출관리 > 일자별 > 상품별 > 날짜 지정 > 조회줄수 5000으로 변경 > 엑셀
|
|
추출파일을 ./data에 복사
|
|
본 파일 실행하면 자동으로 mariadb의 DB에 삽입함.
|
|
'''
|
|
|
|
import sys, os
|
|
import pandas as pd
|
|
from sqlalchemy.dialects.mysql import insert as mysql_insert
|
|
from sqlalchemy.exc import IntegrityError
|
|
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
|
from conf import db, db_schema
|
|
from lib.common import load_config
|
|
|
|
CONFIG = load_config()
|
|
DATA_DIR = os.path.join(os.path.dirname(__file__), '../data')
|
|
|
|
def update_pos_table(engine, table, df):
|
|
with engine.begin() as conn:
|
|
for idx, row in df.iterrows():
|
|
data = row.to_dict()
|
|
stmt = mysql_insert(table).values(**data)
|
|
|
|
update_data = {
|
|
'qty': data['qty'],
|
|
'tot_amount': data['tot_amount'],
|
|
'tot_discount': data['tot_discount'],
|
|
'actual_amount': data['actual_amount']
|
|
}
|
|
stmt = stmt.on_duplicate_key_update(**update_data)
|
|
|
|
try:
|
|
conn.execute(stmt)
|
|
except IntegrityError as e:
|
|
print(f"[ERROR] {idx + 1}행 삽입 실패: {e}")
|
|
|
|
print("[DONE] 모든 데이터 삽입 완료")
|
|
|
|
def process_file(filepath, table, engine):
|
|
print(f"[INFO] 처리 시작: {filepath}")
|
|
try:
|
|
ext = os.path.splitext(filepath)[-1].lower()
|
|
if ext == ".xls":
|
|
df = pd.read_excel(filepath, header=5, engine="xlrd")
|
|
elif ext == ".xlsx":
|
|
df = pd.read_excel(filepath, header=5, engine="openpyxl")
|
|
else:
|
|
raise ValueError("지원하지 않는 파일 형식입니다.")
|
|
|
|
df = df[df.iloc[:, 0] != '합계']
|
|
|
|
df.rename(columns={
|
|
'일자': 'date',
|
|
'대분류': 'ca01',
|
|
'중분류': 'ca02',
|
|
'소분류': 'ca03',
|
|
'상품코드': 'barcode',
|
|
'상품명': 'name',
|
|
'수량': 'qty',
|
|
'총매출액': 'tot_amount',
|
|
'총할인액': 'tot_discount',
|
|
'실매출액': 'actual_amount'
|
|
}, inplace=True)
|
|
|
|
df.drop(columns=[col for col in ['idx'] if col in df.columns], inplace=True)
|
|
|
|
df['date'] = pd.to_datetime(df['date']).dt.date
|
|
df['barcode'] = df['barcode'].astype(int)
|
|
df['qty'] = df['qty'].astype(int)
|
|
df['tot_amount'] = df['tot_amount'].astype(int)
|
|
df['tot_discount'] = df['tot_discount'].astype(int)
|
|
df['actual_amount'] = df['actual_amount'].astype(int)
|
|
|
|
if df.empty:
|
|
print("[WARN] 데이터가 없습니다.")
|
|
return False, 0
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] 로드 실패: {e}")
|
|
return False, 0
|
|
|
|
print(f"[INFO] 데이터 건수: {len(df)}")
|
|
update_pos_table(engine, table, df)
|
|
print(f"[INFO] 처리 완료: {filepath}")
|
|
return True, len(df)
|
|
|
|
def batch_process_files(table, engine):
|
|
files = [f for f in os.listdir(DATA_DIR) if f.startswith("일자별 (상품별)") and f.endswith(('.xlsx', '.xls'))]
|
|
if not files:
|
|
print("[INFO] 처리할 파일이 없습니다.")
|
|
return False
|
|
|
|
print(f"[INFO] {len(files)}개의 파일을 찾았습니다.")
|
|
total_rows = 0
|
|
# deleted_files = 0
|
|
|
|
for fname in files:
|
|
full_path = os.path.join(DATA_DIR, fname)
|
|
success, count = process_file(full_path, table, engine)
|
|
if success:
|
|
total_rows += count
|
|
"""
|
|
try:
|
|
os.remove(full_path)
|
|
print(f"[INFO] 파일 삭제 완료: {fname}")
|
|
deleted_files += 1
|
|
except Exception as e:
|
|
print(f"[WARN] 파일 삭제 실패: {fname} / {e}")
|
|
"""
|
|
|
|
print(f"[INFO] 총 처리 데이터 건수: {total_rows}")
|
|
# print(f"[INFO] 삭제된 파일 수: {deleted_files}")
|
|
return True
|
|
|
|
def main():
|
|
engine = db.engine
|
|
try:
|
|
table = db_schema.pos
|
|
except AttributeError:
|
|
print("[ERROR] 'pos' 테이블이 db_schema에 정의되어 있지 않습니다.")
|
|
return
|
|
|
|
batch_done = batch_process_files(table, engine)
|
|
if not batch_done:
|
|
print("[INFO] 처리할 데이터가 없습니다.")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|