Compare commits
9 Commits
Dockerfile
...
upsolution
| Author | SHA1 | Date | |
|---|---|---|---|
| ea70cbcf82 | |||
| 4e22744adf | |||
| c41bf82e58 | |||
| cb3b152217 | |||
| ac54673983 | |||
| bf44f13a51 | |||
| 9abc760d7b | |||
| 3a15b938f2 | |||
| 39046f20a5 |
@ -1,7 +1,7 @@
|
|||||||
# db_schema.py
|
# db_schema.py
|
||||||
import os
|
import os
|
||||||
import yaml
|
import yaml
|
||||||
from sqlalchemy import Table, Column, Date, Integer, String, Float, Text, MetaData, UniqueConstraint, DateTime, Time, PrimaryKeyConstraint
|
from sqlalchemy import Table, Column, Date, Integer, String, Float, Text, MetaData, UniqueConstraint, DateTime, Time, PrimaryKeyConstraint, Index
|
||||||
from sqlalchemy.sql import func
|
from sqlalchemy.sql import func
|
||||||
|
|
||||||
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
|
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
|
||||||
@ -214,6 +214,39 @@ pos_billdata = Table(
|
|||||||
PrimaryKeyConstraint('sale_date', 'shop_cd', 'pos_no', 'bill_no', 'product_cd')
|
PrimaryKeyConstraint('sale_date', 'shop_cd', 'pos_no', 'bill_no', 'product_cd')
|
||||||
)
|
)
|
||||||
|
|
||||||
|
pos_ups_billdata = Table(
|
||||||
|
get_full_table_name('pos_ups_billdata'), metadata,
|
||||||
|
Column('sale_date', DateTime, nullable=False),
|
||||||
|
Column('shop_name', String(100), nullable=False),
|
||||||
|
Column('pos_no', String(20), nullable=False),
|
||||||
|
Column('bill_no', String(20), nullable=False),
|
||||||
|
Column('product_cd', String(20), nullable=False),
|
||||||
|
Column('ca01', String(50)),
|
||||||
|
Column('ca02', String(50)),
|
||||||
|
Column('ca03', String(50)),
|
||||||
|
Column('product_name', String(100)),
|
||||||
|
Column('barcode', String(20)),
|
||||||
|
Column('amt', Integer),
|
||||||
|
Column('qty', Integer),
|
||||||
|
Column('tot_sale_amt', Integer),
|
||||||
|
Column('dc_amt', Integer),
|
||||||
|
Column('dcm_sale_amt', Integer),
|
||||||
|
Column('net_amt', Integer),
|
||||||
|
Column('vat_amt', Integer),
|
||||||
|
Column('cash_receipt', Integer),
|
||||||
|
Column('card', Integer),
|
||||||
|
# PrimaryKeyConstraint 생략
|
||||||
|
|
||||||
|
mysql_engine='InnoDB',
|
||||||
|
mysql_charset='utf8mb4'
|
||||||
|
)
|
||||||
|
|
||||||
|
# 인덱스 추가
|
||||||
|
Index('idx_sale_shop_pos_product', pos_ups_billdata.c.sale_date, pos_ups_billdata.c.shop_name, pos_ups_billdata.c.pos_no, pos_ups_billdata.c.product_cd)
|
||||||
|
Index('idx_category', pos_ups_billdata.c.ca01, pos_ups_billdata.c.ca02, pos_ups_billdata.c.ca03)
|
||||||
|
Index('idx_product_barcode', pos_ups_billdata.c.product_name, pos_ups_billdata.c.barcode)
|
||||||
|
|
||||||
|
|
||||||
pos_shop_name = Table(
|
pos_shop_name = Table(
|
||||||
get_full_table_name('pos_shop_name'), metadata,
|
get_full_table_name('pos_shop_name'), metadata,
|
||||||
Column('shop_cd', String(20), primary_key=True, nullable=False),
|
Column('shop_cd', String(20), primary_key=True, nullable=False),
|
||||||
|
|||||||
25
conf/install.sql
Normal file
25
conf/install.sql
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
CREATE TABLE `fg_manager_static_pos_ups_billdata` (
|
||||||
|
`sale_date` DATETIME NOT NULL,
|
||||||
|
`shop_name` VARCHAR(100) NOT NULL,
|
||||||
|
`pos_no` VARCHAR(20) NOT NULL,
|
||||||
|
`bill_no` VARCHAR(20) NOT NULL,
|
||||||
|
`product_cd` VARCHAR(20) NOT NULL,
|
||||||
|
`ca01` VARCHAR(50),
|
||||||
|
`ca02` VARCHAR(50),
|
||||||
|
`ca03` VARCHAR(50),
|
||||||
|
`product_name` VARCHAR(100),
|
||||||
|
`barcode` VARCHAR(20),
|
||||||
|
`amt` INT,
|
||||||
|
`qty` INT,
|
||||||
|
`tot_sale_amt` INT,
|
||||||
|
`dc_amt` INT,
|
||||||
|
`dcm_sale_amt` INT,
|
||||||
|
`net_amt` INT,
|
||||||
|
`vat_amt` INT,
|
||||||
|
`cash_receipt` INT,
|
||||||
|
`card` INT,
|
||||||
|
PRIMARY KEY (`sale_date`, `shop_name`, `pos_no`, `bill_no`, `product_cd`, `qty`), -- 옵션: 복합 PK (원하지 않으면 제거)
|
||||||
|
KEY `idx_sale_shop_pos_product` (`sale_date`, `shop_name`, `pos_no`, `product_cd`),
|
||||||
|
KEY `idx_category` (`ca01`, `ca02`, `ca03`),
|
||||||
|
KEY `idx_product_barcode` (`product_name`, `barcode`)
|
||||||
|
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
|
||||||
@ -1,6 +1,8 @@
|
|||||||
# common.py
|
# common.py
|
||||||
import os, yaml
|
import os, yaml
|
||||||
import logging
|
import logging
|
||||||
|
import time
|
||||||
|
import glob
|
||||||
|
|
||||||
def load_config():
|
def load_config():
|
||||||
"""
|
"""
|
||||||
@ -19,3 +21,11 @@ def get_logger(name):
|
|||||||
logger.addHandler(handler)
|
logger.addHandler(handler)
|
||||||
logger.setLevel(logging.INFO)
|
logger.setLevel(logging.INFO)
|
||||||
return logger
|
return logger
|
||||||
|
|
||||||
|
def wait_download_complete(download_dir, ext, timeout=60):
|
||||||
|
for _ in range(timeout):
|
||||||
|
files = glob.glob(os.path.join(download_dir, f"*.{ext.strip('.')}"))
|
||||||
|
if files:
|
||||||
|
return files[0]
|
||||||
|
time.sleep(1)
|
||||||
|
raise TimeoutError("다운로드 대기 시간 초과")
|
||||||
147
lib/pos_update_upsolution.py
Normal file
147
lib/pos_update_upsolution.py
Normal file
@ -0,0 +1,147 @@
|
|||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import pandas as pd
|
||||||
|
import shutil
|
||||||
|
from datetime import datetime
|
||||||
|
from sqlalchemy import Table, MetaData
|
||||||
|
from sqlalchemy.dialects.mysql import insert as mysql_insert
|
||||||
|
from sqlalchemy.exc import SQLAlchemyError
|
||||||
|
|
||||||
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||||
|
|
||||||
|
from lib.common import get_logger
|
||||||
|
from conf import db, db_schema # get_engine, get_session 포함
|
||||||
|
|
||||||
|
logger = get_logger("POS_UPS")
|
||||||
|
|
||||||
|
DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../data"))
|
||||||
|
FINISH_DIR = os.path.join(DATA_DIR, "finish")
|
||||||
|
os.makedirs(FINISH_DIR, exist_ok=True)
|
||||||
|
|
||||||
|
|
||||||
|
def nan_to_none(value):
|
||||||
|
import pandas as pd
|
||||||
|
if pd.isna(value):
|
||||||
|
return None
|
||||||
|
return value
|
||||||
|
|
||||||
|
|
||||||
|
def load_excel_data(filepath: str):
|
||||||
|
df = pd.read_excel(filepath, header=1) # 2행이 header, 3행부터 데이터
|
||||||
|
# 컬럼명 공백 제거 등 정리
|
||||||
|
df.columns = [col.strip() for col in df.columns]
|
||||||
|
# 필수 컬럼 체크
|
||||||
|
required_cols = ['영수증 번호', '품목명']
|
||||||
|
for col in required_cols:
|
||||||
|
if col not in df.columns:
|
||||||
|
raise ValueError(f"필수 컬럼 누락: {col}")
|
||||||
|
df = df.dropna(subset=required_cols)
|
||||||
|
return df
|
||||||
|
|
||||||
|
|
||||||
|
def process_file(filepath: str, engine, session, table, batch_size=500):
|
||||||
|
try:
|
||||||
|
df = load_excel_data(filepath)
|
||||||
|
logger.info(f"[LOAD] {os.path.basename(filepath)} - {len(df)}건")
|
||||||
|
|
||||||
|
inserted, updated, errors = 0, 0, 0
|
||||||
|
batch_data = []
|
||||||
|
|
||||||
|
for idx, row in df.iterrows():
|
||||||
|
data = None
|
||||||
|
try:
|
||||||
|
data = {
|
||||||
|
"sale_date": pd.to_datetime(row["매출일시"]),
|
||||||
|
"shop_name": str(row["매장명"]).strip(),
|
||||||
|
"pos_no": str(row["포스"]).strip(),
|
||||||
|
"bill_no": str(row["영수증 번호"]).strip(),
|
||||||
|
"product_cd": str(row["품목"]).strip(),
|
||||||
|
"product_name": str(row["품목명"]).strip(),
|
||||||
|
"qty": int(row["수량"]),
|
||||||
|
|
||||||
|
"ca01": nan_to_none(row.get("대분류", None)),
|
||||||
|
"ca02": nan_to_none(row.get("중분류", None)),
|
||||||
|
"ca03": nan_to_none(row.get("소분류", None)),
|
||||||
|
"barcode": nan_to_none(row.get("바코드", None)),
|
||||||
|
"amt": int(row.get("단가", 0)),
|
||||||
|
"tot_sale_amt": int(row.get("주문 금액", 0)),
|
||||||
|
"dc_amt": int(row.get("할인 금액", 0)),
|
||||||
|
"dcm_sale_amt": int(row.get("공급가액", 0)),
|
||||||
|
"vat_amt": int(row.get("세금", 0)),
|
||||||
|
"net_amt": int(row.get("결제 금액", 0)),
|
||||||
|
"cash_receipt": int(row.get("현금영수증", 0)),
|
||||||
|
"card": int(row.get("카드", 0)),
|
||||||
|
}
|
||||||
|
batch_data.append(data)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
if data is not None:
|
||||||
|
logger.warning(f"[ERROR:ROW] {e} / 데이터: {data}")
|
||||||
|
else:
|
||||||
|
logger.warning(f"[ERROR:ROW] {e} / 데이터가 생성되지 않음")
|
||||||
|
errors += 1
|
||||||
|
|
||||||
|
# 배치 크기 도달시 DB에 한번에 처리
|
||||||
|
if len(batch_data) >= batch_size:
|
||||||
|
stmt = mysql_insert(table)
|
||||||
|
update_cols = {
|
||||||
|
col.name: stmt.inserted[col.name]
|
||||||
|
for col in table.columns
|
||||||
|
if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']
|
||||||
|
}
|
||||||
|
upsert_stmt = stmt.on_duplicate_key_update(update_cols)
|
||||||
|
result = session.execute(upsert_stmt, batch_data)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
# rowcount가 정확하지 않을 수 있으므로 임시로 inserted 개수만 처리
|
||||||
|
inserted += len(batch_data)
|
||||||
|
logger.info(f"[BATCH] {idx + 1} / {len(df)} 처리 중... (총 삽입: {inserted}, 오류: {errors})")
|
||||||
|
batch_data = []
|
||||||
|
|
||||||
|
# 남은 잔여 데이터 처리
|
||||||
|
if batch_data:
|
||||||
|
stmt = mysql_insert(table)
|
||||||
|
update_cols = {
|
||||||
|
col.name: stmt.inserted[col.name]
|
||||||
|
for col in table.columns
|
||||||
|
if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']
|
||||||
|
}
|
||||||
|
upsert_stmt = stmt.on_duplicate_key_update(update_cols)
|
||||||
|
result = session.execute(upsert_stmt, batch_data)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
inserted += len(batch_data)
|
||||||
|
logger.info(f"[BATCH] 최종 {len(batch_data)}건 처리 완료 (총 삽입: {inserted}, 오류: {errors})")
|
||||||
|
|
||||||
|
logger.info(f"[DONE] 삽입: {inserted}, 오류: {errors}")
|
||||||
|
|
||||||
|
shutil.move(filepath, os.path.join(FINISH_DIR, os.path.basename(filepath)))
|
||||||
|
logger.info(f"[MOVE] 완료: {os.path.join(FINISH_DIR, os.path.basename(filepath))}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[FAIL] 파일 처리 중 오류 발생 - {e}")
|
||||||
|
session.rollback()
|
||||||
|
|
||||||
|
def main():
|
||||||
|
engine = db.get_engine()
|
||||||
|
session = db.get_session()
|
||||||
|
|
||||||
|
metadata = MetaData()
|
||||||
|
table = Table(
|
||||||
|
db_schema.get_full_table_name("pos_ups_billdata"),
|
||||||
|
metadata,
|
||||||
|
autoload_with=engine
|
||||||
|
)
|
||||||
|
|
||||||
|
files = [os.path.join(DATA_DIR, f) for f in os.listdir(DATA_DIR)
|
||||||
|
if f.endswith(".xlsx") and f.startswith("영수증별 상세매출")]
|
||||||
|
|
||||||
|
logger.info(f"[INFO] 처리할 파일 {len(files)}개")
|
||||||
|
|
||||||
|
for file in sorted(files):
|
||||||
|
logger.info(f"[START] {os.path.basename(file)}")
|
||||||
|
process_file(file, engine, session, table)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Reference in New Issue
Block a user