9 Commits

4 changed files with 216 additions and 1 deletions

View File

@ -1,7 +1,7 @@
# db_schema.py # db_schema.py
import os import os
import yaml import yaml
from sqlalchemy import Table, Column, Date, Integer, String, Float, Text, MetaData, UniqueConstraint, DateTime, Time, PrimaryKeyConstraint from sqlalchemy import Table, Column, Date, Integer, String, Float, Text, MetaData, UniqueConstraint, DateTime, Time, PrimaryKeyConstraint, Index
from sqlalchemy.sql import func from sqlalchemy.sql import func
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
@ -214,6 +214,39 @@ pos_billdata = Table(
PrimaryKeyConstraint('sale_date', 'shop_cd', 'pos_no', 'bill_no', 'product_cd') PrimaryKeyConstraint('sale_date', 'shop_cd', 'pos_no', 'bill_no', 'product_cd')
) )
pos_ups_billdata = Table(
get_full_table_name('pos_ups_billdata'), metadata,
Column('sale_date', DateTime, nullable=False),
Column('shop_name', String(100), nullable=False),
Column('pos_no', String(20), nullable=False),
Column('bill_no', String(20), nullable=False),
Column('product_cd', String(20), nullable=False),
Column('ca01', String(50)),
Column('ca02', String(50)),
Column('ca03', String(50)),
Column('product_name', String(100)),
Column('barcode', String(20)),
Column('amt', Integer),
Column('qty', Integer),
Column('tot_sale_amt', Integer),
Column('dc_amt', Integer),
Column('dcm_sale_amt', Integer),
Column('net_amt', Integer),
Column('vat_amt', Integer),
Column('cash_receipt', Integer),
Column('card', Integer),
# PrimaryKeyConstraint 생략
mysql_engine='InnoDB',
mysql_charset='utf8mb4'
)
# 인덱스 추가
Index('idx_sale_shop_pos_product', pos_ups_billdata.c.sale_date, pos_ups_billdata.c.shop_name, pos_ups_billdata.c.pos_no, pos_ups_billdata.c.product_cd)
Index('idx_category', pos_ups_billdata.c.ca01, pos_ups_billdata.c.ca02, pos_ups_billdata.c.ca03)
Index('idx_product_barcode', pos_ups_billdata.c.product_name, pos_ups_billdata.c.barcode)
pos_shop_name = Table( pos_shop_name = Table(
get_full_table_name('pos_shop_name'), metadata, get_full_table_name('pos_shop_name'), metadata,
Column('shop_cd', String(20), primary_key=True, nullable=False), Column('shop_cd', String(20), primary_key=True, nullable=False),

25
conf/install.sql Normal file
View File

@ -0,0 +1,25 @@
CREATE TABLE `fg_manager_static_pos_ups_billdata` (
`sale_date` DATETIME NOT NULL,
`shop_name` VARCHAR(100) NOT NULL,
`pos_no` VARCHAR(20) NOT NULL,
`bill_no` VARCHAR(20) NOT NULL,
`product_cd` VARCHAR(20) NOT NULL,
`ca01` VARCHAR(50),
`ca02` VARCHAR(50),
`ca03` VARCHAR(50),
`product_name` VARCHAR(100),
`barcode` VARCHAR(20),
`amt` INT,
`qty` INT,
`tot_sale_amt` INT,
`dc_amt` INT,
`dcm_sale_amt` INT,
`net_amt` INT,
`vat_amt` INT,
`cash_receipt` INT,
`card` INT,
PRIMARY KEY (`sale_date`, `shop_name`, `pos_no`, `bill_no`, `product_cd`, `qty`), -- 옵션: 복합 PK (원하지 않으면 제거)
KEY `idx_sale_shop_pos_product` (`sale_date`, `shop_name`, `pos_no`, `product_cd`),
KEY `idx_category` (`ca01`, `ca02`, `ca03`),
KEY `idx_product_barcode` (`product_name`, `barcode`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

View File

@ -1,6 +1,8 @@
# common.py # common.py
import os, yaml import os, yaml
import logging import logging
import time
import glob
def load_config(): def load_config():
""" """
@ -19,3 +21,11 @@ def get_logger(name):
logger.addHandler(handler) logger.addHandler(handler)
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
return logger return logger
def wait_download_complete(download_dir, ext, timeout=60):
for _ in range(timeout):
files = glob.glob(os.path.join(download_dir, f"*.{ext.strip('.')}"))
if files:
return files[0]
time.sleep(1)
raise TimeoutError("다운로드 대기 시간 초과")

View File

@ -0,0 +1,147 @@
import os
import sys
import pandas as pd
import shutil
from datetime import datetime
from sqlalchemy import Table, MetaData
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.exc import SQLAlchemyError
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from lib.common import get_logger
from conf import db, db_schema # get_engine, get_session 포함
logger = get_logger("POS_UPS")
DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../data"))
FINISH_DIR = os.path.join(DATA_DIR, "finish")
os.makedirs(FINISH_DIR, exist_ok=True)
def nan_to_none(value):
import pandas as pd
if pd.isna(value):
return None
return value
def load_excel_data(filepath: str):
df = pd.read_excel(filepath, header=1) # 2행이 header, 3행부터 데이터
# 컬럼명 공백 제거 등 정리
df.columns = [col.strip() for col in df.columns]
# 필수 컬럼 체크
required_cols = ['영수증 번호', '품목명']
for col in required_cols:
if col not in df.columns:
raise ValueError(f"필수 컬럼 누락: {col}")
df = df.dropna(subset=required_cols)
return df
def process_file(filepath: str, engine, session, table, batch_size=500):
try:
df = load_excel_data(filepath)
logger.info(f"[LOAD] {os.path.basename(filepath)} - {len(df)}")
inserted, updated, errors = 0, 0, 0
batch_data = []
for idx, row in df.iterrows():
data = None
try:
data = {
"sale_date": pd.to_datetime(row["매출일시"]),
"shop_name": str(row["매장명"]).strip(),
"pos_no": str(row["포스"]).strip(),
"bill_no": str(row["영수증 번호"]).strip(),
"product_cd": str(row["품목"]).strip(),
"product_name": str(row["품목명"]).strip(),
"qty": int(row["수량"]),
"ca01": nan_to_none(row.get("대분류", None)),
"ca02": nan_to_none(row.get("중분류", None)),
"ca03": nan_to_none(row.get("소분류", None)),
"barcode": nan_to_none(row.get("바코드", None)),
"amt": int(row.get("단가", 0)),
"tot_sale_amt": int(row.get("주문 금액", 0)),
"dc_amt": int(row.get("할인 금액", 0)),
"dcm_sale_amt": int(row.get("공급가액", 0)),
"vat_amt": int(row.get("세금", 0)),
"net_amt": int(row.get("결제 금액", 0)),
"cash_receipt": int(row.get("현금영수증", 0)),
"card": int(row.get("카드", 0)),
}
batch_data.append(data)
except Exception as e:
if data is not None:
logger.warning(f"[ERROR:ROW] {e} / 데이터: {data}")
else:
logger.warning(f"[ERROR:ROW] {e} / 데이터가 생성되지 않음")
errors += 1
# 배치 크기 도달시 DB에 한번에 처리
if len(batch_data) >= batch_size:
stmt = mysql_insert(table)
update_cols = {
col.name: stmt.inserted[col.name]
for col in table.columns
if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']
}
upsert_stmt = stmt.on_duplicate_key_update(update_cols)
result = session.execute(upsert_stmt, batch_data)
session.commit()
# rowcount가 정확하지 않을 수 있으므로 임시로 inserted 개수만 처리
inserted += len(batch_data)
logger.info(f"[BATCH] {idx + 1} / {len(df)} 처리 중... (총 삽입: {inserted}, 오류: {errors})")
batch_data = []
# 남은 잔여 데이터 처리
if batch_data:
stmt = mysql_insert(table)
update_cols = {
col.name: stmt.inserted[col.name]
for col in table.columns
if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']
}
upsert_stmt = stmt.on_duplicate_key_update(update_cols)
result = session.execute(upsert_stmt, batch_data)
session.commit()
inserted += len(batch_data)
logger.info(f"[BATCH] 최종 {len(batch_data)}건 처리 완료 (총 삽입: {inserted}, 오류: {errors})")
logger.info(f"[DONE] 삽입: {inserted}, 오류: {errors}")
shutil.move(filepath, os.path.join(FINISH_DIR, os.path.basename(filepath)))
logger.info(f"[MOVE] 완료: {os.path.join(FINISH_DIR, os.path.basename(filepath))}")
except Exception as e:
logger.error(f"[FAIL] 파일 처리 중 오류 발생 - {e}")
session.rollback()
def main():
engine = db.get_engine()
session = db.get_session()
metadata = MetaData()
table = Table(
db_schema.get_full_table_name("pos_ups_billdata"),
metadata,
autoload_with=engine
)
files = [os.path.join(DATA_DIR, f) for f in os.listdir(DATA_DIR)
if f.endswith(".xlsx") and f.startswith("영수증별 상세매출")]
logger.info(f"[INFO] 처리할 파일 {len(files)}")
for file in sorted(files):
logger.info(f"[START] {os.path.basename(file)}")
process_file(file, engine, session, table)
if __name__ == "__main__":
main()