101 lines
2.8 KiB
Python
101 lines
2.8 KiB
Python
# test.py
|
|
|
|
import os
|
|
import csv
|
|
import sys
|
|
from datetime import datetime
|
|
from sqlalchemy import select, and_
|
|
|
|
# 경로 설정
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
|
from conf import db, db_schema
|
|
|
|
# CSV 파일명 설정
|
|
CSV_FILENAME = 'sample.csv' # <- 여기에 파일명을 입력
|
|
CSV_PATH = os.path.join(os.path.dirname(__file__), '../data', CSV_FILENAME)
|
|
|
|
# DB 설정
|
|
engine = db.engine
|
|
table = db_schema.fg_manager_static_pos
|
|
|
|
# 기본값
|
|
DEFAULT_VALUES = {
|
|
'ca01': '매표소',
|
|
'ca02': '기타',
|
|
'ca03': '입장료',
|
|
'barcode': 100000,
|
|
'name': '입장객',
|
|
'tot_amount': 0,
|
|
'tot_discount': 0,
|
|
'actual_amount': 0,
|
|
}
|
|
|
|
def load_csv(filepath):
|
|
rows = []
|
|
try:
|
|
with open(filepath, newline='', encoding='utf-8-sig') as csvfile: # utf-8-sig 로 BOM 제거
|
|
reader = csv.DictReader(csvfile)
|
|
for row in reader:
|
|
try:
|
|
date = datetime.strptime(row['date'], '%Y-%m-%d').date()
|
|
qty = int(float(row['qty'])) # 소수점 포함 숫자라도 정수로 변환
|
|
|
|
data = DEFAULT_VALUES.copy()
|
|
data['date'] = date
|
|
data['qty'] = qty
|
|
rows.append(data)
|
|
except Exception as e:
|
|
print(f"[WARN] 잘못된 행 건너뜀: {row} / 오류: {e}")
|
|
except FileNotFoundError:
|
|
print(f"[ERROR] 파일이 존재하지 않음: {filepath}")
|
|
sys.exit(1)
|
|
|
|
return rows
|
|
|
|
def check_existing(session, row):
|
|
stmt = select(table.c.idx).where(
|
|
and_(
|
|
table.c.date == row['date'],
|
|
table.c.ca01 == row['ca01'],
|
|
table.c.ca02 == row['ca02'],
|
|
table.c.ca03 == row['ca03'],
|
|
table.c.barcode == row['barcode'],
|
|
table.c.name == row['name']
|
|
)
|
|
)
|
|
return session.execute(stmt).scalar() is not None
|
|
|
|
def insert_rows(rows):
|
|
inserted = 0
|
|
session = db.get_session()
|
|
try:
|
|
for i, row in enumerate(rows, 1):
|
|
if check_existing(session, row):
|
|
print(f"[SKIP] {i}행: 이미 존재함 ({row['date']}, qty={row['qty']})")
|
|
continue
|
|
session.execute(table.insert().values(**row))
|
|
inserted += 1
|
|
session.commit()
|
|
except Exception as e:
|
|
session.rollback()
|
|
print(f"[ERROR] 삽입 중 오류 발생: {e}")
|
|
finally:
|
|
session.close()
|
|
|
|
return inserted
|
|
|
|
def main():
|
|
print(f"[INFO] CSV 파일 로드 중: {CSV_PATH}")
|
|
rows = load_csv(CSV_PATH)
|
|
print(f"[INFO] 총 데이터 건수: {len(rows)}")
|
|
|
|
if not rows:
|
|
print("[WARN] 삽입할 데이터가 없습니다.")
|
|
return
|
|
|
|
inserted = insert_rows(rows)
|
|
print(f"[DONE] DB 삽입 완료: {inserted}건 / 전체 {len(rows)}건 중")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|