7 Commits

21 changed files with 1130 additions and 1093 deletions

View File

@ -20,34 +20,31 @@
## 폴더 구조 ## 폴더 구조
```bash ```bash
project-root/ project-root/
├── app/ # 🔹 웹 프론트엔드 및 Flask 서버 ├── app/ # 🔹 웹 프론트엔드 및 Flask 서버
│ ├── templates/ # HTML 템플릿 (Jinja2) │ ├── templates/ # HTML 템플릿 (Jinja2)
│ │ └── index.html │ │ └── index.html
│ ├── static/ # (선택) JS, CSS 파일 │ ├── static/ # (선택) JS, CSS 파일
│ └── app.py # Flask 애플리케이션 진입점 │ └── app.py # Flask 애플리케이션 진입점
├── build/ # 🔹 Docker 빌드 전용 디렉토리 ├── build/ # 🔹 Docker 빌드 전용 디렉토리
│ ├── Dockerfile # Ubuntu 22.04 기반 Dockerfile │ ├── Dockerfile # Ubuntu 22.04 기반 Dockerfile
│ ├── requirements.txt # Python 의존성 │ ├── requirements.txt # Python 의존성
│ └── (선택) run.sh / build.sh 등 실행 스크립트 │ └── (선택) run.sh / build.sh 등 실행 스크립트
├── conf/ # 🔹 설정 및 DB 정의 ├── conf/ # 🔹 설정 및 DB 정의
│ ├── config.yaml # 설정 파일 (DB 접속 등) │ ├── config.yaml # 설정 파일 (DB 접속 등)
│ ├── db.py # SQLAlchemy 연결 설정 │ ├── db.py # SQLAlchemy 연결 설정
│ └── db_schema.py # 테이블 정의 (SQLAlchemy metadata) │ └── db_schema.py # 테이블 정의 (SQLAlchemy metadata)
├── lib/ # 🔹 데이터 처리 및 백엔드 로직 ├── lib/ # 🔹 데이터 처리 및 백엔드 로직
│ ├── common.py # 중복 함수들을 처리 │ ├── pos_view_gui.py # 기존 Tkinter GUI (조회용)
│ ├── pos_view_gui.py # 기존 Tkinter GUI (조회용) │ ├── pos_update_gui.py # 기존 Tkinter GUI (업데이트용)
│ ├── pos_update_gui.py # 기존 Tkinter GUI (업데이트용) │ ├── air_quality.py # 대기환경 API 수집
│ ├── air_quality.py # 대기환경 API 수집 │ ├── ga4.py # GA4 수집 스크립트
── ga4.py # GA4 수집 스크립트 ── weather_asos.py # 기상청 ASOS 수집
│ ├── weather_asos.py # 기상청 ASOS 수집
│ ├── weekly_visitor_forecast.py # GA4 수집 스크립트 ├── data/ # 🔹 데이터 저장 및 엑셀 업로드 디렉토리
│ ├── weekly_visitor_forecast_prophet.py # GA4 수집 스크립트
│ └──
├── data/ # 🔹 데이터 저장 및 엑셀 업로드 디렉토리
│ └── (엑셀 파일들, 일자별 상품별 파일 등) │ └── (엑셀 파일들, 일자별 상품별 파일 등)
├── .gitignore
└── README.md └── .gitignore (선택)
``` ```

View File

@ -1,47 +0,0 @@
FROM python:3.10-slim
# 작업 디렉토리 설정
WORKDIR /app
# 시스템 패키지 설치
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
gcc \
libsqlite3-dev \
libssl-dev \
libffi-dev \
libbz2-dev \
libreadline-dev \
libncurses5-dev \
libgdbm-dev \
liblzma-dev \
libtk8.6 \
tk8.6-dev \
tcl8.6-dev \
wget \
curl \
unzip \
git \
cron \
&& rm -rf /var/lib/apt/lists/*
# requirements 설치
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
# 앱 전체 복사
COPY . .
# 환경 변수 설정
ENV PYTHONUNBUFFERED=1
# 크론 작업 등록: 매일 11시에 daily_run.py 실행
RUN echo "0 11 * * * python /app/daily_run.py >> /var/log/cron.log 2>&1" > /etc/cron.d/daily-cron \
&& chmod 0644 /etc/cron.d/daily-cron \
&& crontab /etc/cron.d/daily-cron
# 로그 파일 생성
RUN touch /var/log/cron.log
# 컨테이너 시작 시 cron 실행 + file_watch.py 실행 + 로그 출력 유지
CMD cron && python lib/file_watch.py & tail -f /var/log/cron.log

View File

@ -18,16 +18,8 @@ db_cfg = config['database']
db_url = f"mysql+pymysql://{db_cfg['user']}:{db_cfg['password']}@{db_cfg['host']}/{db_cfg['name']}?charset=utf8mb4" db_url = f"mysql+pymysql://{db_cfg['user']}:{db_cfg['password']}@{db_cfg['host']}/{db_cfg['name']}?charset=utf8mb4"
# MySQL 연결이 끊겼을 때 자동 재시도 옵션 포함 # MySQL 연결이 끊겼을 때 자동 재시도 옵션 포함
engine = create_engine( engine = create_engine(db_url, pool_pre_ping=True)
db_url,
pool_pre_ping=True,
pool_recycle=3600, # 3600초 = 1시간
)
Session = sessionmaker(bind=engine) Session = sessionmaker(bind=engine)
def get_engine():
return engine
def get_session(): def get_session():
return Session() return Session()

View File

@ -1,7 +1,7 @@
# db_schema.py # db_schema.py
import os import os
import yaml import yaml
from sqlalchemy import Table, Column, Date, Integer, String, Float, Text, MetaData, UniqueConstraint, DateTime, Time, PrimaryKeyConstraint, Index from sqlalchemy import Table, Column, Date, Integer, String, Float, Text, MetaData, UniqueConstraint, DateTime
from sqlalchemy.sql import func from sqlalchemy.sql import func
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
@ -164,15 +164,6 @@ ga4 = Table(
mysql_charset='utf8mb4' mysql_charset='utf8mb4'
) )
holiday = Table(
get_full_table_name('holiday'), metadata,
Column('date', String(8), primary_key=True, comment='날짜 (YYYYMMDD)'),
Column('name', String(50), nullable=False, comment='휴일명'),
Column('created_at', DateTime, server_default=func.now(), comment='등록일시'),
Column('updated_at', DateTime, server_default=func.now(), onupdate=func.now(), comment='수정일시'),
comment='한국천문연구원 특일정보'
)
pos = Table( pos = Table(
get_full_table_name('pos'), metadata, get_full_table_name('pos'), metadata,
Column('idx', Integer, primary_key=True, autoincrement=True), Column('idx', Integer, primary_key=True, autoincrement=True),
@ -189,71 +180,11 @@ pos = Table(
UniqueConstraint('date', 'ca01', 'ca02', 'ca03', 'name', 'barcode', name='uniq_pos_composite') UniqueConstraint('date', 'ca01', 'ca02', 'ca03', 'name', 'barcode', name='uniq_pos_composite')
) )
pos_billdata = Table( holiday = Table(
get_full_table_name('pos_billdata'), metadata, get_full_table_name('holiday'), metadata,
Column('sale_date', Date, nullable=False), Column('date', String(8), primary_key=True, comment='날짜 (YYYYMMDD)'),
Column('shop_cd', String(20), nullable=False), Column('name', String(50), nullable=False, comment='휴일명'),
Column('pos_no', Integer, nullable=False), Column('created_at', DateTime, server_default=func.now(), comment='등록일시'),
Column('bill_no', Integer, nullable=False), Column('updated_at', DateTime, server_default=func.now(), onupdate=func.now(), comment='수정일시'),
Column('product_cd', String(20), nullable=False), comment='한국천문연구원 특일정보'
Column('division', String(10)),
Column('table_no', String(20)),
Column('order_time', Time),
Column('pay_time', Time),
Column('barcode', String(20)),
Column('product_name', String(100)),
Column('qty', Integer),
Column('tot_sale_amt', Integer),
Column('erp_cd', String(50)),
Column('remark', Text),
Column('dc_amt', Integer),
Column('dc_type', String(50)),
Column('dcm_sale_amt', Integer),
Column('net_amt', Integer),
Column('vat_amt', Integer),
PrimaryKeyConstraint('sale_date', 'shop_cd', 'pos_no', 'bill_no', 'product_cd')
)
pos_ups_billdata = Table(
get_full_table_name('pos_ups_billdata'), metadata,
Column('sale_date', DateTime, nullable=False),
Column('shop_name', String(100), nullable=False),
Column('pos_no', String(20), nullable=False),
Column('bill_no', String(20), nullable=False),
Column('product_cd', String(20), nullable=False),
Column('ca01', String(50)),
Column('ca02', String(50)),
Column('ca03', String(50)),
Column('product_name', String(100)),
Column('barcode', String(20)),
Column('amt', Integer),
Column('qty', Integer),
Column('tot_sale_amt', Integer),
Column('dc_amt', Integer),
Column('dcm_sale_amt', Integer),
Column('net_amt', Integer),
Column('vat_amt', Integer),
Column('cash_receipt', Integer),
Column('card', Integer),
# PrimaryKeyConstraint 생략
mysql_engine='InnoDB',
mysql_charset='utf8mb4'
)
# 인덱스 추가
Index('idx_sale_shop_pos_product', pos_ups_billdata.c.sale_date, pos_ups_billdata.c.shop_name, pos_ups_billdata.c.pos_no, pos_ups_billdata.c.product_cd)
Index('idx_category', pos_ups_billdata.c.ca01, pos_ups_billdata.c.ca02, pos_ups_billdata.c.ca03)
Index('idx_product_barcode', pos_ups_billdata.c.product_name, pos_ups_billdata.c.barcode)
pos_shop_name = Table(
get_full_table_name('pos_shop_name'), metadata,
Column('shop_cd', String(20), primary_key=True, nullable=False),
Column('shop_name', String(100), nullable=False),
Column('used', Integer, nullable=False, default=1, comment='사용여부 (1=사용, 0=미사용)'),
Column('created_at', DateTime, server_default=func.current_timestamp(), comment='등록일시'),
Column('updated_at', DateTime, server_default=func.current_timestamp(), onupdate=func.current_timestamp(), comment='수정일시'),
mysql_engine='InnoDB',
mysql_charset='utf8mb4',
) )

View File

@ -1,25 +0,0 @@
CREATE TABLE `fg_manager_static_pos_ups_billdata` (
`sale_date` DATETIME NOT NULL,
`shop_name` VARCHAR(100) NOT NULL,
`pos_no` VARCHAR(20) NOT NULL,
`bill_no` VARCHAR(20) NOT NULL,
`product_cd` VARCHAR(20) NOT NULL,
`ca01` VARCHAR(50),
`ca02` VARCHAR(50),
`ca03` VARCHAR(50),
`product_name` VARCHAR(100),
`barcode` VARCHAR(20),
`amt` INT,
`qty` INT,
`tot_sale_amt` INT,
`dc_amt` INT,
`dcm_sale_amt` INT,
`net_amt` INT,
`vat_amt` INT,
`cash_receipt` INT,
`card` INT,
PRIMARY KEY (`sale_date`, `shop_name`, `pos_no`, `bill_no`, `product_cd`, `qty`), -- 옵션: 복합 PK (원하지 않으면 제거)
KEY `idx_sale_shop_pos_product` (`sale_date`, `shop_name`, `pos_no`, `product_cd`),
KEY `idx_category` (`ca01`, `ca02`, `ca03`),
KEY `idx_product_barcode` (`product_name`, `barcode`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

View File

@ -1,13 +0,0 @@
services:
fg-static:
container_name: fg-static
build:
context: .
dockerfile: build/Dockerfile
image: reg.firstgarden.co.kr/fg-static:latest
volumes:
- ./data:/app/data
- ./conf:/app/conf
environment:
- TZ=Asia/Seoul
restart: unless-stopped

View File

@ -1,8 +1,5 @@
# common.py # common.py
import os, yaml import os, yaml
import logging
import time
import glob
def load_config(): def load_config():
""" """
@ -11,21 +8,3 @@ def load_config():
path = os.path.join(os.path.dirname(__file__), '..', 'conf', 'config.yaml') path = os.path.join(os.path.dirname(__file__), '..', 'conf', 'config.yaml')
with open(path, encoding='utf-8') as f: with open(path, encoding='utf-8') as f:
return yaml.safe_load(f) return yaml.safe_load(f)
def get_logger(name):
logger = logging.getLogger(name)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter('[%(asctime)s] %(levelname)s: %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
def wait_download_complete(download_dir, ext, timeout=60):
for _ in range(timeout):
files = glob.glob(os.path.join(download_dir, f"*.{ext.strip('.')}"))
if files:
return files[0]
time.sleep(1)
raise TimeoutError("다운로드 대기 시간 초과")

View File

@ -1,81 +0,0 @@
import time
import os
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import threading
import pos_update_bill
import pos_update_daily_product
DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), '../data'))
FILE_EXTENSIONS = ('.xls', '.xlsx')
BILL_PREFIX = "영수증별매출상세현황"
DAILY_PRODUCT_PREFIX = "일자별 (상품별)"
class NewFileHandler(FileSystemEventHandler):
def __init__(self):
super().__init__()
self._lock = threading.Lock()
self._processing_files = set()
def on_created(self, event):
if event.is_directory:
return
filepath = event.src_path
filename = os.path.basename(filepath)
if not filename.endswith(FILE_EXTENSIONS):
return
# 처리 대상 여부 확인
if filename.startswith(BILL_PREFIX) or filename.startswith(DAILY_PRODUCT_PREFIX):
print(f"[WATCHER] 신규 파일 감지: {filename}")
threading.Thread(target=self.process_file, args=(filepath, filename), daemon=True).start()
def process_file(self, filepath, filename):
with self._lock:
if filename in self._processing_files:
print(f"[WATCHER] {filename} 이미 처리 중")
return
self._processing_files.add(filename)
try:
time.sleep(3) # 파일 쓰기 완료 대기
print(f"[WATCHER] 파일 처리 시작: {filename}")
if filename.startswith(BILL_PREFIX):
pos_update_bill.main()
elif filename.startswith(DAILY_PRODUCT_PREFIX):
pos_update_daily_product.main()
else:
print(f"[WATCHER] 처리 대상이 아님: {filename}")
return
except Exception as e:
print(f"[WATCHER] 처리 중 오류 발생: {filename} / {e}")
else:
try:
os.remove(filepath)
print(f"[WATCHER] 파일 처리 완료 및 삭제: {filename}")
except Exception as e:
print(f"[WATCHER] 파일 삭제 실패: {filename} / {e}")
finally:
with self._lock:
self._processing_files.discard(filename)
def start_watching():
print(f"[WATCHER] '{DATA_DIR}' 폴더 감시 시작")
event_handler = NewFileHandler()
observer = Observer()
observer.schedule(event_handler, DATA_DIR, recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("[WATCHER] 감시 종료 요청 수신, 종료 중...")
observer.stop()
observer.join()
if __name__ == "__main__":
start_watching()

View File

@ -4,7 +4,7 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import yaml import yaml
import requests import requests
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
from datetime import date, datetime, timedelta from datetime import datetime, date
from sqlalchemy import select, insert, delete from sqlalchemy import select, insert, delete
# config.yaml 경로 및 로딩 # config.yaml 경로 및 로딩
@ -134,40 +134,8 @@ def is_korean_holiday(dt: date) -> bool:
finally: finally:
session.close() session.close()
def get_holiday_dates(start_date: date, end_date: date) -> set[date]:
"""특정 기간 내의 휴일 목록 반환"""
session = db.get_session()
try:
stmt = select(holiday_table.c.date).where(
holiday_table.c.date.between(start_date.strftime("%Y%m%d"), end_date.strftime("%Y%m%d"))
)
results = session.execute(stmt).scalars().all()
return set(datetime.strptime(d, "%Y%m%d").date() for d in results)
finally:
session.close()
def get_weekday_dates(start_date: date, end_date: date) -> set[date]:
"""특정 기간 중 평일(월~금 & 비휴일) 목록 반환"""
holiday_dates = get_holiday_dates(start_date, end_date)
result = set()
curr = start_date
while curr <= end_date:
if curr.weekday() < 5 and curr not in holiday_dates: # 월(0)~금(4)
result.add(curr)
curr += timedelta(days=1)
return result
if __name__ == "__main__": if __name__ == "__main__":
print("📌 휴일 테스트 시작") print("📌 특일정보 초기화 시작")
init_holidays() init_holidays()
print("✅ 특일정보 초기화 완료")
from datetime import date
start = date(2025, 1, 1)
end = date(2025, 12, 31)
holidays = get_holiday_dates(start, end)
print(f"🔍 {start} ~ {end} 사이 휴일 {len(holidays)}")
for d in sorted(holidays):
print(" -", d)

View File

@ -1,263 +0,0 @@
"""
영수증별매출상세현황 엑셀파일을 기반으로 MariaDB에 데이터 업데이트
1. 파일은 ./data 폴더에 위치 (파일명: '영수증별매출상세현황*.xls[x]')
2. 중복된 데이터는 update 처리됨 (on duplicate key update)
3. 처리 후 파일 자동 삭제 (파일 삭제 로직은 필요시 추가 가능)
"""
import os
import sys
import re
import pandas as pd
from datetime import datetime
from sqlalchemy.dialects.mysql import insert
from sqlalchemy import select
# 상위 경로를 sys.path에 추가해 프로젝트 내 모듈 임포트 가능하게 설정
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from conf import db, db_schema
from lib.common import load_config
# 설정 파일 로드 및 데이터 폴더 경로 설정
CONFIG = load_config()
DATA_DIR = os.path.join(os.path.dirname(__file__), '../data')
# 처리 대상 파일명 패턴: '영수증별매출상세현황'으로 시작하고 .xls 또는 .xlsx 확장자
FILE_PATTERN = re.compile(r"^영수증별매출상세현황.*\.xls[x]?$")
# 엑셀 상단 A3셀 형식 예: "조회일자 : 2025-07-27 매장선택 : [V83728] 퍼스트(삐아또"
HEADER_PATTERN = re.compile(r"조회일자\s*:\s*(\d{4}-\d{2}-\d{2})\s+매장선택\s*:\s*\[(\w+)]\s*(.+)")
def extract_file_info(filepath: str):
"""
엑셀 파일 상단에서 조회일자, 매장코드, 매장명을 추출한다.
A3 셀 (2행 0열, 0부터 시작 기준) 데이터를 정규식으로 파싱.
Args:
filepath (str): 엑셀파일 경로
Returns:
tuple: (sale_date: date, shop_cd: str, shop_name: str)
Raises:
ValueError: 정규식 매칭 실패 시
"""
print(f"[INFO] {filepath} 상단 조회일자 및 매장 정보 추출 시작")
df_head = pd.read_excel(filepath, header=None, nrows=5)
first_row = df_head.iloc[2, 0] # 3행 A열 (0-based index)
match = HEADER_PATTERN.search(str(first_row))
if not match:
raise ValueError(f"[ERROR] 조회일자 및 매장 정보 추출 실패: {filepath}")
sale_date = datetime.strptime(match.group(1), "%Y-%m-%d").date()
shop_cd = match.group(2)
shop_name = match.group(3).strip()
print(f"[INFO] 추출된 조회일자: {sale_date}, 매장코드: {shop_cd}, 매장명: {shop_name}")
return sale_date, shop_cd, shop_name
def load_excel_data(filepath: str):
"""
지정한 컬럼만 읽고, 헤더는 6번째 행(0-based index 5)으로 지정.
'합계'라는 단어가 '포스번호' 컬럼에 있으면 그 행부터 제거한다.
Args:
filepath (str): 엑셀파일 경로
Returns:
pd.DataFrame: 전처리된 데이터프레임
Raises:
ValueError: 필수 컬럼 누락 시
"""
print(f"[INFO] {filepath} 데이터 영역 로드 시작")
usecols = [
"포스번호", "영수증번호", "구분", "테이블명", "최초주문", "결제시각",
"상품코드", "바코드", "상품명", "수량", "총매출액", "ERP 매핑코드",
"비고", "할인액", "할인구분", "실매출액", "가액", "부가세"
]
# header=5 => 6번째 행이 컬럼명
df = pd.read_excel(filepath, header=5, dtype=str)
# 컬럼명 좌우 공백 제거
df.columns = df.columns.str.strip()
# '합계'인 행의 인덱스 찾기 및 제거
if '합계' in df['포스번호'].values:
idx = df[df['포스번호'] == '합계'].index[0]
df = df.loc[:idx-1]
print(f"[INFO] '합계' 행 이후 데이터 제거: {idx}번째 행부터 제외")
# 필수 컬럼 존재 여부 체크
if not set(usecols).issubset(df.columns):
raise ValueError(f"[ERROR] 필수 컬럼 누락: 현재 컬럼 {df.columns.tolist()}")
df = df[usecols]
print(f"[INFO] {filepath} 데이터 영역 로드 완료, 데이터 건수: {len(df)}")
return df
def normalize_data(df: pd.DataFrame, sale_date, shop_cd):
"""
컬럼명을 내부 규칙에 맞게 변경하고, 숫자 필드를 정수형으로 변환한다.
조회일자와 매장코드를 데이터프레임에 추가.
Args:
df (pd.DataFrame): 원본 데이터프레임
sale_date (date): 조회일자
shop_cd (str): 매장코드
Returns:
pd.DataFrame: 정규화된 데이터프레임
"""
print(f"[INFO] 데이터 정규화 시작")
def to_int(x):
try:
return int(str(x).replace(",", "").strip())
except:
return 0
df.rename(columns={
"포스번호": "pos_no",
"영수증번호": "bill_no",
"구분": "division",
"테이블명": "table_no",
"최초주문": "order_time",
"결제시각": "pay_time",
"상품코드": "product_cd",
"바코드": "barcode",
"상품명": "product_name",
"수량": "qty",
"총매출액": "tot_sale_amt",
"ERP 매핑코드": "erp_cd",
"비고": "remark",
"할인액": "dc_amt",
"할인구분": "dc_type",
"실매출액": "dcm_sale_amt",
"가액": "net_amt",
"부가세": "vat_amt"
}, inplace=True)
df["sale_date"] = sale_date
df["shop_cd"] = shop_cd
# 숫자형 컬럼 정수 변환
int_fields = ["qty", "tot_sale_amt", "dc_amt", "dcm_sale_amt", "net_amt", "vat_amt"]
for field in int_fields:
df[field] = df[field].apply(to_int)
# pos_no, bill_no는 반드시 int로 변환
df["pos_no"] = df["pos_no"].astype(int)
df["bill_no"] = df["bill_no"].astype(int)
print(f"[INFO] 데이터 정규화 완료")
return df
def upsert_data(df: pd.DataFrame, batch_size: int = 500) -> int:
"""
SQLAlchemy insert 구문을 사용하여
중복 PK 발생 시 update 처리 (on duplicate key update)
대량 데이터는 batch_size 단위로 나누어 처리
Args:
df (pd.DataFrame): DB에 삽입할 데이터
batch_size (int): 한번에 처리할 데이터 건수 (기본 500)
Returns:
int: 영향 받은 총 행 수
"""
print(f"[INFO] DB 저장 시작")
df = df.where(pd.notnull(df), None) # NaN → None 변환
engine = db.get_engine()
metadata = db_schema.metadata
table = db_schema.pos_billdata
total_affected = 0
with engine.connect() as conn:
for start in range(0, len(df), batch_size):
batch_df = df.iloc[start:start+batch_size]
records = batch_df.to_dict(orient="records")
insert_stmt = insert(table).values(records)
update_fields = {
col.name: insert_stmt.inserted[col.name]
for col in table.columns
if col.name not in table.primary_key.columns
}
upsert_stmt = insert_stmt.on_duplicate_key_update(update_fields)
try:
result = conn.execute(upsert_stmt)
conn.commit()
total_affected += result.rowcount
print(f"[INFO] 배치 처리 완료: {start} ~ {start+len(records)-1} / 영향 행 수: {result.rowcount}")
except Exception as e:
print(f"[ERROR] 배치 처리 실패: {start} ~ {start+len(records)-1} / 오류: {e}")
# 필요 시 raise 하거나 continue로 다음 배치 진행 가능
raise
print(f"[INFO] DB 저장 전체 완료, 총 영향 행 수: {total_affected}")
return total_affected
def ensure_shop_exists(shop_cd, shop_name):
"""
매장 정보 테이블에 매장코드가 없으면 신규 등록한다.
Args:
shop_cd (str): 매장 코드
shop_name (str): 매장 명
"""
print(f"[INFO] 매장 존재 여부 확인: {shop_cd}")
engine = db.get_engine()
conn = engine.connect()
shop_table = db_schema.pos_shop_name
try:
query = shop_table.select().where(shop_table.c.shop_cd == shop_cd)
result = conn.execute(query).fetchone()
if result is None:
print(f"[INFO] 신규 매장 등록: {shop_cd} / {shop_name}")
ins = shop_table.insert().values(shop_cd=shop_cd, shop_name=shop_name)
conn.execute(ins)
conn.commit()
else:
print(f"[INFO] 기존 매장 존재: {shop_cd}")
except Exception as e:
print(f"[ERROR] 매장 확인/등록 실패: {e}")
raise
finally:
conn.close()
def main():
"""
대상 데이터 파일 목록을 찾고, 파일별로 처리 진행한다.
처리 성공 시 저장 건수를 출력하고, 실패 시 오류 메시지 출력.
"""
files = [f for f in os.listdir(DATA_DIR) if FILE_PATTERN.match(f)]
print(f"[INFO] 발견된 파일 {len(files)}")
for file in files:
filepath = os.path.join(DATA_DIR, file)
print(f"[INFO] 파일: {file} 처리 시작")
try:
sale_date, shop_cd, shop_name = extract_file_info(filepath)
ensure_shop_exists(shop_cd, shop_name)
raw_df = load_excel_data(filepath)
df = normalize_data(raw_df, sale_date, shop_cd)
affected = upsert_data(df)
print(f"[DONE] 처리 완료: {file} / 저장 건수: {affected}")
# 처리 완료 후 파일 삭제 (필요 시 활성화)
# os.remove(filepath)
# print(f"[INFO] 처리 완료 후 파일 삭제: {file}")
except Exception as e:
print(f"[ERROR] {file} 처리 실패: {e}")
if __name__ == "__main__":
main()

View File

@ -1,16 +1,22 @@
# POS Update # POS Update
''' '''
포스 데이터를 추출한 엑셀파일을 업데이트
OK포스 > 매출관리 > 일자별 > 상품별 > 날짜 지정 > 조회줄수 5000으로 변경 > 엑셀 OK포스 > 매출관리 > 일자별 > 상품별 > 날짜 지정 > 조회줄수 5000으로 변경 > 엑셀
추출파일을 ./data에 복사 추출파일을 ./data에 복사
파일 실행하면 자동으로 mariadb의 DB에 삽입함. 파일 실행하면 자동으로 mariadb의 DB에 삽입함.
''' '''
import sys, os import sys, os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import tkinter as tk
import pandas as pd import pandas as pd
from tkinter import filedialog, messagebox
from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from conf import db, db_schema from conf import db, db_schema
from lib.common import load_config from lib.common import load_config
@ -23,12 +29,14 @@ def update_pos_table(engine, table, df):
data = row.to_dict() data = row.to_dict()
stmt = mysql_insert(table).values(**data) stmt = mysql_insert(table).values(**data)
# insert ... on duplicate key update (복합 unique key 기준)
update_data = { update_data = {
'qty': data['qty'], 'qty': data['qty'],
'tot_amount': data['tot_amount'], 'tot_amount': data['tot_amount'],
'tot_discount': data['tot_discount'], 'tot_discount': data['tot_discount'],
'actual_amount': data['actual_amount'] 'actual_amount': data['actual_amount']
} }
stmt = stmt.on_duplicate_key_update(**update_data) stmt = stmt.on_duplicate_key_update(**update_data)
try: try:
@ -42,6 +50,7 @@ def process_file(filepath, table, engine):
print(f"[INFO] 처리 시작: {filepath}") print(f"[INFO] 처리 시작: {filepath}")
try: try:
ext = os.path.splitext(filepath)[-1].lower() ext = os.path.splitext(filepath)[-1].lower()
if ext == ".xls": if ext == ".xls":
df = pd.read_excel(filepath, header=5, engine="xlrd") df = pd.read_excel(filepath, header=5, engine="xlrd")
elif ext == ".xlsx": elif ext == ".xlsx":
@ -64,7 +73,8 @@ def process_file(filepath, table, engine):
'실매출액': 'actual_amount' '실매출액': 'actual_amount'
}, inplace=True) }, inplace=True)
df.drop(columns=[col for col in ['idx'] if col in df.columns], inplace=True) if 'idx' in df.columns:
df = df.drop(columns=['idx'])
df['date'] = pd.to_datetime(df['date']).dt.date df['date'] = pd.to_datetime(df['date']).dt.date
df['barcode'] = df['barcode'].astype(int) df['barcode'] = df['barcode'].astype(int)
@ -88,6 +98,7 @@ def process_file(filepath, table, engine):
def batch_process_files(table, engine): def batch_process_files(table, engine):
files = [f for f in os.listdir(DATA_DIR) if f.startswith("일자별 (상품별)") and f.endswith(('.xlsx', '.xls'))] files = [f for f in os.listdir(DATA_DIR) if f.startswith("일자별 (상품별)") and f.endswith(('.xlsx', '.xls'))]
if not files: if not files:
print("[INFO] 처리할 파일이 없습니다.") print("[INFO] 처리할 파일이 없습니다.")
return False return False
@ -103,15 +114,36 @@ def batch_process_files(table, engine):
total_rows += count total_rows += count
try: try:
os.remove(full_path) os.remove(full_path)
print(f"[INFO] 파일 삭제 완료: {fname}") print(f"[INFO] 처리 완료 후 파일 삭제: {fname}")
deleted_files += 1 deleted_files += 1
except Exception as e: except Exception as e:
print(f"[WARN] 파일 삭제 실패: {fname} / {e}") print(f"[WARN] 파일 삭제 실패: {fname}, {e}")
print(f"[INFO] 처리 데이터 건수: {total_rows}") print(f"[INFO] 처리된 전체 데이터 건수: {total_rows}")
print(f"[INFO] 삭제된 파일 수: {deleted_files}") print(f"[INFO] 삭제된 파일 수: {deleted_files}")
return True return True
def run_pos_update():
filepath = filedialog.askopenfilename(
filetypes=[("Excel Files", "*.xlsx *.xls")],
title="파일을 선택하세요"
)
if not filepath:
return
engine = db.engine
try:
table = db_schema.pos
except AttributeError:
messagebox.showerror("DB 오류", "'pos' 테이블이 db_schema에 정의되어 있지 않습니다.")
return
if messagebox.askyesno("확인", f"'{os.path.basename(filepath)}' 파일을 'pos' 테이블에 업로드 하시겠습니까?"):
success, count = process_file(filepath, table, engine)
if success:
print(f"[INFO] 수동 선택된 파일 처리 완료: {count}")
messagebox.showinfo("완료", f"DB 업데이트가 완료되었습니다.\n{count}건 처리됨.")
def main(): def main():
engine = db.engine engine = db.engine
try: try:
@ -122,7 +154,18 @@ def main():
batch_done = batch_process_files(table, engine) batch_done = batch_process_files(table, engine)
if not batch_done: if not batch_done:
print("[INFO] 처리할 데이터가 없습니다.") # GUI 시작
root = tk.Tk()
root.title("POS 데이터 업데이트")
root.geometry("300x150")
lbl = tk.Label(root, text="POS 데이터 업데이트")
lbl.pack(pady=20)
btn = tk.Button(root, text="데이터 선택 및 업데이트", command=run_pos_update)
btn.pack()
root.mainloop()
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -1,147 +0,0 @@
import os
import sys
import pandas as pd
import shutil
from datetime import datetime
from sqlalchemy import Table, MetaData
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.exc import SQLAlchemyError
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from lib.common import get_logger
from conf import db, db_schema # get_engine, get_session 포함
logger = get_logger("POS_UPS")
DATA_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../data"))
FINISH_DIR = os.path.join(DATA_DIR, "finish")
os.makedirs(FINISH_DIR, exist_ok=True)
def nan_to_none(value):
import pandas as pd
if pd.isna(value):
return None
return value
def load_excel_data(filepath: str):
df = pd.read_excel(filepath, header=1) # 2행이 header, 3행부터 데이터
# 컬럼명 공백 제거 등 정리
df.columns = [col.strip() for col in df.columns]
# 필수 컬럼 체크
required_cols = ['영수증 번호', '품목명']
for col in required_cols:
if col not in df.columns:
raise ValueError(f"필수 컬럼 누락: {col}")
df = df.dropna(subset=required_cols)
return df
def process_file(filepath: str, engine, session, table, batch_size=500):
try:
df = load_excel_data(filepath)
logger.info(f"[LOAD] {os.path.basename(filepath)} - {len(df)}")
inserted, updated, errors = 0, 0, 0
batch_data = []
for idx, row in df.iterrows():
data = None
try:
data = {
"sale_date": pd.to_datetime(row["매출일시"]),
"shop_name": str(row["매장명"]).strip(),
"pos_no": str(row["포스"]).strip(),
"bill_no": str(row["영수증 번호"]).strip(),
"product_cd": str(row["품목"]).strip(),
"product_name": str(row["품목명"]).strip(),
"qty": int(row["수량"]),
"ca01": nan_to_none(row.get("대분류", None)),
"ca02": nan_to_none(row.get("중분류", None)),
"ca03": nan_to_none(row.get("소분류", None)),
"barcode": nan_to_none(row.get("바코드", None)),
"amt": int(row.get("단가", 0)),
"tot_sale_amt": int(row.get("주문 금액", 0)),
"dc_amt": int(row.get("할인 금액", 0)),
"dcm_sale_amt": int(row.get("공급가액", 0)),
"vat_amt": int(row.get("세금", 0)),
"net_amt": int(row.get("결제 금액", 0)),
"cash_receipt": int(row.get("현금영수증", 0)),
"card": int(row.get("카드", 0)),
}
batch_data.append(data)
except Exception as e:
if data is not None:
logger.warning(f"[ERROR:ROW] {e} / 데이터: {data}")
else:
logger.warning(f"[ERROR:ROW] {e} / 데이터가 생성되지 않음")
errors += 1
# 배치 크기 도달시 DB에 한번에 처리
if len(batch_data) >= batch_size:
stmt = mysql_insert(table)
update_cols = {
col.name: stmt.inserted[col.name]
for col in table.columns
if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']
}
upsert_stmt = stmt.on_duplicate_key_update(update_cols)
result = session.execute(upsert_stmt, batch_data)
session.commit()
# rowcount가 정확하지 않을 수 있으므로 임시로 inserted 개수만 처리
inserted += len(batch_data)
logger.info(f"[BATCH] {idx + 1} / {len(df)} 처리 중... (총 삽입: {inserted}, 오류: {errors})")
batch_data = []
# 남은 잔여 데이터 처리
if batch_data:
stmt = mysql_insert(table)
update_cols = {
col.name: stmt.inserted[col.name]
for col in table.columns
if col.name not in ['sale_date', 'shop_name', 'pos_no', 'bill_no', 'product_cd']
}
upsert_stmt = stmt.on_duplicate_key_update(update_cols)
result = session.execute(upsert_stmt, batch_data)
session.commit()
inserted += len(batch_data)
logger.info(f"[BATCH] 최종 {len(batch_data)}건 처리 완료 (총 삽입: {inserted}, 오류: {errors})")
logger.info(f"[DONE] 삽입: {inserted}, 오류: {errors}")
shutil.move(filepath, os.path.join(FINISH_DIR, os.path.basename(filepath)))
logger.info(f"[MOVE] 완료: {os.path.join(FINISH_DIR, os.path.basename(filepath))}")
except Exception as e:
logger.error(f"[FAIL] 파일 처리 중 오류 발생 - {e}")
session.rollback()
def main():
engine = db.get_engine()
session = db.get_session()
metadata = MetaData()
table = Table(
db_schema.get_full_table_name("pos_ups_billdata"),
metadata,
autoload_with=engine
)
files = [os.path.join(DATA_DIR, f) for f in os.listdir(DATA_DIR)
if f.endswith(".xlsx") and f.startswith("영수증별 상세매출")]
logger.info(f"[INFO] 처리할 파일 {len(files)}")
for file in sorted(files):
logger.info(f"[START] {os.path.basename(file)}")
process_file(file, engine, session, table)
if __name__ == "__main__":
main()

View File

@ -9,13 +9,12 @@ from tkcalendar import DateEntry
from datetime import datetime, timedelta from datetime import datetime, timedelta
from sqlalchemy import select, func, between from sqlalchemy import select, func, between
from conf import db_schema, db from conf import db_schema, db
from lib import holiday # 휴일 기능
# Windows DPI Awareness 설정 # Windows DPI Awareness 설정 (윈도우 전용)
if sys.platform == "win32": if sys.platform == "win32":
import ctypes import ctypes
try: try:
ctypes.windll.shcore.SetProcessDpiAwareness(1) ctypes.windll.shcore.SetProcessDpiAwareness(1) # SYSTEM_AWARE = 1
except Exception: except Exception:
pass pass
@ -27,23 +26,30 @@ class PosViewGUI(ctk.CTk):
super().__init__() super().__init__()
self.title("POS 데이터 조회") self.title("POS 데이터 조회")
self.geometry("1100x700") self.geometry("900x500")
self.configure(fg_color="#f0f0f0") self.configure(fg_color="#f0f0f0") # 배경색 맞춤
ctk.set_appearance_mode("light") ctk.set_appearance_mode("light")
ctk.set_default_color_theme("blue") ctk.set_default_color_theme("blue")
# 폰트 세팅 - NanumGothic이 없으면 Arial 대체
try: try:
self.label_font = ("NanumGothic", 13) self.label_font = ("NanumGothic", 13)
except Exception: except Exception:
self.label_font = ("Arial", 13) self.label_font = ("Arial", 13)
# Treeview 스타일 설정 (ttk 스타일)
style = ttk.Style(self) style = ttk.Style(self)
style.theme_use('default') style.theme_use('default')
style.configure("Treeview", font=("NanumGothic", 12), rowheight=30) style.configure("Treeview",
style.configure("Treeview.Heading", font=("NanumGothic", 13, "bold")) font=("NanumGothic", 12),
rowheight=30) # 높이 조절로 글씨 깨짐 방지
style.configure("Treeview.Heading",
font=("NanumGothic", 13, "bold"))
# 날짜 필터 # --- 위젯 배치 ---
# 날짜 범위
ctk.CTkLabel(self, text="시작일:", anchor="w", font=self.label_font, fg_color="#f0f0f0")\ ctk.CTkLabel(self, text="시작일:", anchor="w", font=self.label_font, fg_color="#f0f0f0")\
.grid(row=0, column=0, padx=10, pady=5, sticky="e") .grid(row=0, column=0, padx=10, pady=5, sticky="e")
self.start_date_entry = DateEntry(self, width=12, background='darkblue', foreground='white') self.start_date_entry = DateEntry(self, width=12, background='darkblue', foreground='white')
@ -54,18 +60,6 @@ class PosViewGUI(ctk.CTk):
self.end_date_entry = DateEntry(self, width=12, background='darkblue', foreground='white') self.end_date_entry = DateEntry(self, width=12, background='darkblue', foreground='white')
self.end_date_entry.grid(row=0, column=3, padx=10, pady=5, sticky="w") self.end_date_entry.grid(row=0, column=3, padx=10, pady=5, sticky="w")
# 날짜유형 라디오버튼
self.date_filter_var = ctk.StringVar(value="전체")
ctk.CTkLabel(self, text="날짜유형:", font=self.label_font, fg_color="#f0f0f0")\
.grid(row=0, column=4, padx=(10, 0), pady=5, sticky="e")
ctk.CTkRadioButton(self, text="전체", variable=self.date_filter_var, value="전체")\
.grid(row=0, column=5, padx=2, pady=5, sticky="w")
ctk.CTkRadioButton(self, text="휴일", variable=self.date_filter_var, value="휴일")\
.grid(row=0, column=6, padx=2, pady=5, sticky="w")
ctk.CTkRadioButton(self, text="평일", variable=self.date_filter_var, value="평일")\
.grid(row=0, column=7, padx=2, pady=5, sticky="w")
# 대분류 # 대분류
ctk.CTkLabel(self, text="대분류 :", anchor="w", font=self.label_font, fg_color="#f0f0f0")\ ctk.CTkLabel(self, text="대분류 :", anchor="w", font=self.label_font, fg_color="#f0f0f0")\
.grid(row=1, column=0, padx=10, pady=5, sticky="e") .grid(row=1, column=0, padx=10, pady=5, sticky="e")
@ -88,9 +82,9 @@ class PosViewGUI(ctk.CTk):
# 조회 버튼 # 조회 버튼
self.search_btn = ctk.CTkButton(self, text="조회", command=self.search, self.search_btn = ctk.CTkButton(self, text="조회", command=self.search,
fg_color="#0d6efd", hover_color="#0b5ed7", text_color="white") fg_color="#0d6efd", hover_color="#0b5ed7", text_color="white")
self.search_btn.grid(row=3, column=0, columnspan=8, pady=10) self.search_btn.grid(row=3, column=0, columnspan=4, pady=10)
# 상품별 트리뷰 # 결과 Treeview
self.DISPLAY_COLUMNS = ['ca01', 'ca02', 'ca03', 'name', 'qty', 'tot_amount', 'tot_discount', 'actual_amount'] self.DISPLAY_COLUMNS = ['ca01', 'ca02', 'ca03', 'name', 'qty', 'tot_amount', 'tot_discount', 'actual_amount']
self.COLUMN_LABELS = { self.COLUMN_LABELS = {
'ca01': '대분류', 'ca01': '대분류',
@ -103,38 +97,28 @@ class PosViewGUI(ctk.CTk):
'actual_amount': '실매출액' 'actual_amount': '실매출액'
} }
self.tree = ttk.Treeview(self, columns=self.DISPLAY_COLUMNS, show='headings', height=12) self.tree = ttk.Treeview(self, columns=self.DISPLAY_COLUMNS, show='headings', height=15)
for col in self.DISPLAY_COLUMNS: for col in self.DISPLAY_COLUMNS:
self.tree.heading(col, text=self.COLUMN_LABELS[col]) self.tree.heading(col, text=self.COLUMN_LABELS[col])
self.tree.column(col, width=120, anchor='center') self.tree.column(col, width=120, anchor='center')
self.tree.grid(row=4, column=0, columnspan=8, padx=10, pady=10, sticky="nsew") self.tree.grid(row=4, column=0, columnspan=4, padx=10, pady=10, sticky="nsew")
# 날짜 요약 트리뷰
self.date_tree = ttk.Treeview(self, columns=['date', 'qty', 'tot_amount', 'actual_amount'], show='headings', height=6)
self.date_tree.heading('date', text='일자')
self.date_tree.heading('qty', text='수량합')
self.date_tree.heading('tot_amount', text='총매출합')
self.date_tree.heading('actual_amount', text='실매출합')
for col in ['date', 'qty', 'tot_amount', 'actual_amount']:
self.date_tree.column(col, width=150, anchor='center')
self.date_tree.grid(row=5, column=0, columnspan=8, padx=10, pady=(0, 10), sticky="nsew")
# 그리드 가중치 설정 (창 크기에 따라 트리뷰 확장)
self.grid_rowconfigure(4, weight=1) self.grid_rowconfigure(4, weight=1)
self.grid_rowconfigure(5, weight=1) for col_index in range(4):
for col_index in range(8):
self.grid_columnconfigure(col_index, weight=1) self.grid_columnconfigure(col_index, weight=1)
# 날짜 기본값 # 날짜 기본값 설정 (전날부터 7일 전까지)
end_date = datetime.today().date() - timedelta(days=1) end_date = datetime.today().date() - timedelta(days=1)
start_date = end_date - timedelta(days=6) start_date = end_date - timedelta(days=6)
self.start_date_entry.set_date(start_date) self.start_date_entry.set_date(start_date)
self.end_date_entry.set_date(end_date) self.end_date_entry.set_date(end_date)
# 초기 대분류, 소분류 콤보박스 값 불러오기
self.load_ca01_options() self.load_ca01_options()
def on_ca01_selected(self, value): def on_ca01_selected(self, value):
# print("대분류 선택됨:", value) 디버깅용
self.load_ca03_options() self.load_ca03_options()
def load_ca01_options(self): def load_ca01_options(self):
@ -164,7 +148,7 @@ class PosViewGUI(ctk.CTk):
result = conn.execute(stmt) result = conn.execute(stmt)
ca03_list = [row[0] for row in result.fetchall()] ca03_list = [row[0] for row in result.fetchall()]
self.ca03_combo.configure(values=['전체'] + ca03_list) self.ca03_combo.configure(values=['전체'] + ca03_list)
self.ca03_combo.set('전체') self.ca03_combo.set('전체') # 항상 기본값으로 초기화
def search(self): def search(self):
start_date = self.start_date_entry.get_date() start_date = self.start_date_entry.get_date()
@ -172,34 +156,8 @@ class PosViewGUI(ctk.CTk):
ca01_val = self.ca01_combo.get() ca01_val = self.ca01_combo.get()
ca03_val = self.ca03_combo.get() ca03_val = self.ca03_combo.get()
name_val = self.name_entry.get().strip() name_val = self.name_entry.get().strip()
date_filter = self.date_filter_var.get()
print("🔍 date_filter:", date_filter,
"| start:", start_date, "end:", end_date)
if date_filter == "휴일":
valid_dates = holiday.get_holiday_dates(start_date, end_date)
print("🚩 반환된 휴일 날짜 리스트:", valid_dates)
conditions = []
if date_filter == "전체":
conditions.append(between(pos_table.c.date, start_date, end_date))
else:
if date_filter == "휴일":
valid_dates = holiday.get_holiday_dates(start_date, end_date)
elif date_filter == "평일":
valid_dates = holiday.get_weekday_dates(start_date, end_date)
else:
valid_dates = set()
if not valid_dates:
messagebox.showinfo("알림", f"{date_filter}에 해당하는 데이터가 없습니다.")
self.tree.delete(*self.tree.get_children())
self.date_tree.delete(*self.date_tree.get_children())
return
conditions.append(pos_table.c.date.in_(valid_dates))
conditions = [between(pos_table.c.date, start_date, end_date)]
if ca01_val != '전체': if ca01_val != '전체':
conditions.append(pos_table.c.ca01 == ca01_val) conditions.append(pos_table.c.ca01 == ca01_val)
if ca03_val != '전체': if ca03_val != '전체':
@ -208,7 +166,6 @@ class PosViewGUI(ctk.CTk):
conditions.append(pos_table.c.name.like(f"%{name_val}%")) conditions.append(pos_table.c.name.like(f"%{name_val}%"))
with engine.connect() as conn: with engine.connect() as conn:
# 상품별
stmt = select( stmt = select(
pos_table.c.ca01, pos_table.c.ca01,
pos_table.c.ca02, pos_table.c.ca02,
@ -222,42 +179,11 @@ class PosViewGUI(ctk.CTk):
result = conn.execute(stmt).mappings().all() result = conn.execute(stmt).mappings().all()
# 날짜별 요약
date_stmt = select(
pos_table.c.date,
func.sum(pos_table.c.qty).label("qty"),
func.sum(pos_table.c.tot_amount).label("tot_amount"),
func.sum(pos_table.c.actual_amount).label("actual_amount")
).where(*conditions).group_by(pos_table.c.date).order_by(pos_table.c.date)
date_summary = conn.execute(date_stmt).mappings().all()
# 트리뷰 초기화
self.tree.delete(*self.tree.get_children()) self.tree.delete(*self.tree.get_children())
self.date_tree.delete(*self.date_tree.get_children())
# 상품별 출력
for row in result: for row in result:
values = tuple(row[col] for col in self.DISPLAY_COLUMNS) values = tuple(row[col] for col in self.DISPLAY_COLUMNS)
self.tree.insert('', 'end', values=values) self.tree.insert('', 'end', values=values)
# 날짜별 출력
total_qty = total_amount = total_actual = 0
for row in date_summary:
self.date_tree.insert('', 'end', values=(
row['date'].strftime("%Y-%m-%d"),
row['qty'],
row['tot_amount'],
row['actual_amount']
))
total_qty += row['qty']
total_amount += row['tot_amount']
total_actual += row['actual_amount']
# 총합계 추가
self.date_tree.insert('', 'end', values=("총합계", total_qty, total_amount, total_actual))
if __name__ == "__main__": if __name__ == "__main__":
try: try:
import tkcalendar import tkcalendar

View File

@ -0,0 +1,299 @@
import os
import sys
import re
import requests
from sqlalchemy import select, and_, func
from sqlalchemy.orm import Session
from prophet import Prophet
from statsmodels.tsa.arima.model import ARIMA
import numpy as np
import pandas as pd
from datetime import date, datetime, timedelta
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from conf import db, db_schema
from weather_forecast import get_weekly_precip
from lib.holiday import is_korean_holiday
from lib.common import load_config
# DB 테이블 객체 초기화
pos = db_schema.pos
ga4 = db_schema.ga4_by_date
weather = db_schema.weather
air = db_schema.air
# config 불러오기
config = load_config()
serviceKey = config['DATA_API']['serviceKey']
weight_cfg = config.get('FORECAST_WEIGHT', {})
VISITOR_CA = tuple(config['POS']['VISITOR_CA'])
visitor_forecast_multiplier = weight_cfg.get('visitor_forecast_multiplier', 1.0)
minTa_weight = weight_cfg.get('minTa', 1.0)
maxTa_weight = weight_cfg.get('maxTa', 1.0)
sumRn_weight = weight_cfg.get('sumRn', 1.0)
avgRhm_weight = weight_cfg.get('avgRhm', 1.0)
pm25_weight = weight_cfg.get('pm25', 1.0)
is_holiday_weight = weight_cfg.get('is_holiday', 1.0)
def get_date_range(start_date, end_date):
return pd.date_range(start_date, end_date).to_pydatetime().tolist()
def add_korean_holiday_feature(df):
df['is_holiday'] = df['date'].apply(lambda d: 1 if is_korean_holiday(d.date()) else 0)
return df
def fix_zero_visitors_weighted(df):
df = df.copy()
if 'date' not in df.columns and 'ds' in df.columns:
df['date'] = df['ds']
if 'pos_qty' not in df.columns and 'y' in df.columns:
df['pos_qty'] = df['y']
if 'is_holiday' not in df.columns:
raise ValueError("DataFrame에 'is_holiday' 컬럼이 필요합니다.")
df['year_month'] = df['date'].dt.strftime('%Y-%m')
monthly_means = df[df['pos_qty'] > 0].groupby(['year_month', 'is_holiday'])['pos_qty'].mean()
arr = df['pos_qty'].values.copy()
for i in range(len(arr)):
if arr[i] == 0:
ym = df.iloc[i]['year_month']
holiday_flag = df.iloc[i]['is_holiday']
mean_val = monthly_means.get((ym, holiday_flag), np.nan)
arr[i] = 0 if np.isnan(mean_val) else mean_val
df['pos_qty'] = arr
if 'y' in df.columns:
df['y'] = df['pos_qty']
df.drop(columns=['year_month'], inplace=True)
return df
def load_data(session, start_date, end_date):
dates = get_date_range(start_date, end_date)
stmt_pos = select(
pos.c.date,
func.sum(pos.c.qty).label('pos_qty')
).where(
and_(
pos.c.date >= start_date,
pos.c.date <= end_date,
pos.c.ca01 == '매표소',
pos.c.ca03.in_(VISITOR_CA)
)
).group_by(pos.c.date)
stmt_ga4 = select(ga4.c.date, ga4.c.activeUsers).where(
and_(ga4.c.date >= start_date, ga4.c.date <= end_date)
)
stmt_weather = select(
weather.c.date,
weather.c.minTa,
weather.c.maxTa,
weather.c.sumRn,
weather.c.avgRhm
).where(
and_(
weather.c.date >= start_date,
weather.c.date <= end_date,
weather.c.stnId == 99
)
)
stmt_air = select(air.c.date, air.c.pm25).where(
and_(
air.c.date >= start_date,
air.c.date <= end_date,
air.c.station == '운정'
)
)
pos_data = {row['date']: row['pos_qty'] for row in session.execute(stmt_pos).mappings().all()}
ga4_data = {row['date']: row['activeUsers'] for row in session.execute(stmt_ga4).mappings().all()}
weather_data = {row['date']: row for row in session.execute(stmt_weather).mappings().all()}
air_data = {row['date']: row['pm25'] for row in session.execute(stmt_air).mappings().all()}
records = []
for d in dates:
key = d.date() if isinstance(d, datetime) else d
record = {
'date': d,
'pos_qty': pos_data.get(key, 0),
'activeUsers': ga4_data.get(key, 0),
'minTa': weather_data.get(key, {}).get('minTa', 0) if weather_data.get(key) else 0,
'maxTa': weather_data.get(key, {}).get('maxTa', 0) if weather_data.get(key) else 0,
'sumRn': weather_data.get(key, {}).get('sumRn', 0) if weather_data.get(key) else 0,
'avgRhm': weather_data.get(key, {}).get('avgRhm', 0) if weather_data.get(key) else 0,
'pm25': air_data.get(key, 0)
}
records.append(record)
df = pd.DataFrame(records)
df = add_korean_holiday_feature(df)
df = fix_zero_visitors_weighted(df)
df['weekday'] = df['date'].dt.weekday
return df
def prepare_prophet_df(df):
prophet_df = pd.DataFrame({
'ds': df['date'],
'y': df['pos_qty'].astype(float),
'minTa': df['minTa'].astype(float),
'maxTa': df['maxTa'].astype(float),
'sumRn': df['sumRn'].astype(float),
'avgRhm': df['avgRhm'].astype(float),
'pm25': df['pm25'].astype(float),
'is_holiday': df['is_holiday'].astype(int)
})
return prophet_df
def train_and_predict_prophet(prophet_df, forecast_days=7):
prophet_df = prophet_df.copy()
# 결측값을 전일과 다음날의 평균치로 선형 보간 처리
for col in ['minTa', 'maxTa', 'sumRn', 'avgRhm', 'pm25', 'is_holiday']:
if col in prophet_df.columns:
prophet_df[col] = prophet_df[col].interpolate(method='linear', limit_direction='both')
# 보간 후 남은 결측치는 0으로 처리
prophet_df.fillna({
'minTa': 0,
'maxTa': 0,
'sumRn': 0,
'avgRhm': 0,
'pm25': 0,
'is_holiday': 0
}, inplace=True)
# 가중치 적용
prophet_df['minTa'] *= minTa_weight
prophet_df['maxTa'] *= maxTa_weight
prophet_df['sumRn'] *= sumRn_weight
prophet_df['avgRhm'] *= avgRhm_weight
prophet_df['pm25'] *= pm25_weight
prophet_df['is_holiday'] *= is_holiday_weight
# 고정 0 방문객값 보정
prophet_df = fix_zero_visitors_weighted(prophet_df)
# Prophet 모델 정의 및 학습
m = Prophet(weekly_seasonality=True, yearly_seasonality=True, daily_seasonality=False)
m.add_regressor('minTa')
m.add_regressor('maxTa')
m.add_regressor('sumRn')
m.add_regressor('avgRhm')
m.add_regressor('pm25')
m.add_regressor('is_holiday')
m.fit(prophet_df)
future = m.make_future_dataframe(periods=forecast_days)
# 미래 데이터에 날씨 예보값과 가중치 적용
weekly_precip = get_weekly_precip(serviceKey)
sumRn_list, minTa_list, maxTa_list, avgRhm_list = [], [], [], []
for dt in future['ds']:
dt_str = dt.strftime('%Y%m%d')
day_forecast = weekly_precip.get(dt_str, None)
if day_forecast:
sumRn_list.append(float(day_forecast.get('sumRn', 0)) * sumRn_weight)
minTa_list.append(float(day_forecast.get('minTa', 0)) * minTa_weight)
maxTa_list.append(float(day_forecast.get('maxTa', 0)) * maxTa_weight)
avgRhm_list.append(float(day_forecast.get('avgRhm', 0)) * avgRhm_weight)
else:
sumRn_list.append(0)
minTa_list.append(0)
maxTa_list.append(0)
avgRhm_list.append(0)
future['sumRn'] = sumRn_list
future['minTa'] = minTa_list
future['maxTa'] = maxTa_list
future['avgRhm'] = avgRhm_list
# pm25는 마지막 과거 데이터값에 가중치 적용
last_known = prophet_df.iloc[-1]
future['pm25'] = last_known['pm25'] * pm25_weight
# 휴일 여부도 가중치 곱해서 적용
future['is_holiday'] = future['ds'].apply(lambda d: 1 if is_korean_holiday(d.date()) else 0) * is_holiday_weight
forecast = m.predict(future)
# 방문객 예측값에 multiplier 적용 및 정수형 변환
forecast['yhat'] = (forecast['yhat'] * visitor_forecast_multiplier).round().astype(int)
forecast['yhat_lower'] = (forecast['yhat_lower'] * visitor_forecast_multiplier).round().astype(int)
forecast['yhat_upper'] = (forecast['yhat_upper'] * visitor_forecast_multiplier).round().astype(int)
# 결과 CSV 저장
output_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'data', 'prophet_result.csv'))
os.makedirs(os.path.dirname(output_path), exist_ok=True)
df_to_save = forecast[['ds', 'yhat']].copy()
df_to_save.columns = ['date', 'visitor_forecast']
df_to_save['date'] = df_to_save['date'].dt.strftime("%Y-%m-%d")
today_str = date.today().strftime("%Y-%m-%d")
df_to_save = df_to_save[df_to_save['date'] >= today_str]
df_to_save.to_csv(output_path, index=False)
return forecast
def train_and_predict_arima(ts, forecast_days=7):
model = ARIMA(ts, order=(5,1,0))
model_fit = model.fit()
forecast = model_fit.forecast(steps=forecast_days)
return forecast
def train_and_predict_rf(df, forecast_days=7):
from sklearn.ensemble import RandomForestRegressor
df = df.copy()
df['weekday'] = df['date'].dt.weekday
X = df[['weekday', 'minTa', 'maxTa', 'sumRn', 'avgRhm', 'pm25']]
y = df['pos_qty']
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X, y)
future_dates = pd.date_range(df['date'].max() + timedelta(days=1), periods=forecast_days)
future_df = pd.DataFrame({
'date': future_dates,
'weekday': future_dates.weekday,
'minTa': 0,
'maxTa': 0,
'sumRn': 0,
'avgRhm': 0,
'pm25': 0
})
future_df['pos_qty'] = model.predict(future_df[['weekday', 'minTa', 'maxTa', 'sumRn', 'avgRhm', 'pm25']])
return future_df
def main():
today = datetime.today().date()
start_date = today - timedelta(days=365)
end_date = today
with Session(db.engine) as session:
df = load_data(session, start_date, end_date)
prophet_df = prepare_prophet_df(df)
forecast_days = 7
forecast = train_and_predict_prophet(prophet_df, forecast_days)
forecast['yhat'] = forecast['yhat'].round().astype(int)
forecast['yhat_lower'] = forecast['yhat_lower'].round().astype(int)
forecast['yhat_upper'] = forecast['yhat_upper'].round().astype(int)
weekly_precip = get_weekly_precip(serviceKey)
output_df = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail(10).copy()
output_df.columns = ['날짜', '예상 방문객', '하한', '상한']
print("이번 주 강수 예보:")
for dt_str, val in weekly_precip.items():
print(f"{dt_str}: 강수량={val['sumRn']:.1f}mm, 최저기온={val['minTa']}, 최고기온={val['maxTa']}, 습도={val['avgRhm']:.1f}%")
print("\n예측 방문객:")
print(output_df.to_string(index=False))
if __name__ == '__main__':
main()

87
lib/visitor_update.py Normal file
View File

@ -0,0 +1,87 @@
# ./lib/visitor_update.py
import os
import sys
import pandas as pd
from datetime import datetime
# 프로젝트 루트 경로 추가
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from conf.db import get_session
from conf.db_schema import pos
from sqlalchemy import select
# 상수 정의
FILE_PATH = os.path.join(os.path.dirname(__file__), '..', 'data', 'visitor_raw.xlsx')
CA01 = '매표소'
CA02 = 'POS'
CA03 = '입장료'
BARCODE = 11111111
DEFAULT_INT = 0
def load_excel(filepath):
df = pd.read_excel(filepath)
df.columns = ['date', 'qty']
df['date'] = pd.to_datetime(df['date']).dt.date
return df
def get_existing_dates(session, dates):
"""DB에 이미 존재하는 날짜 목록 조회"""
stmt = select(pos.c.date).where(pos.c.date.in_(dates))
result = session.execute(stmt).scalars().all()
return set(result)
def insert_data(df):
session = get_session()
try:
all_dates = set(df['date'].unique())
existing_dates = get_existing_dates(session, all_dates)
# 중복 날짜 제거
if existing_dates:
print(f"[INFO] 이미 존재하는 날짜는 건너뜁니다: {sorted(existing_dates)}")
df = df[~df['date'].isin(existing_dates)]
if df.empty:
print("[INFO] 삽입할 신규 데이터가 없습니다.")
return
for _, row in df.iterrows():
record = {
'date': row['date'],
'ca01': CA01,
'ca02': CA02,
'ca03': CA03,
'barcode': BARCODE,
'name': '입장객',
'qty': int(row['qty']),
'tot_amount': DEFAULT_INT,
'tot_discount': DEFAULT_INT,
'actual_amount': DEFAULT_INT
}
session.execute(pos.insert().values(**record))
session.commit()
print(f"[INFO] {len(df)}건의 데이터가 성공적으로 삽입되었습니다.")
except Exception as e:
session.rollback()
print(f"[ERROR] 데이터 저장 중 오류 발생: {e}")
finally:
session.close()
def main():
if not os.path.exists(FILE_PATH):
print(f"[ERROR] 파일을 찾을 수 없습니다: {FILE_PATH}")
return
df = load_excel(FILE_PATH)
insert_data(df)
if __name__ == "__main__":
main()

96
lib/weatherFileUpdate.py Normal file
View File

@ -0,0 +1,96 @@
# weatherFileUpdate.py
import os
import csv
import sys
from datetime import datetime
from sqlalchemy import select, and_
# 경로 설정
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from conf import db, db_schema
CSV_FILENAME = 'weather.csv' # 데이터 파일명
CSV_PATH = os.path.join(os.path.dirname(__file__), '../data', CSV_FILENAME)
weather_table = db_schema.fg_manager_static_weather
STN_ID = 99 # 고정된 stnId
def parse_float(value):
try:
f = float(value)
return f if f == f else 0.0 # NaN 체크, NaN일 경우 0.0
except (ValueError, TypeError):
return 0.0
def load_csv(filepath):
rows = []
try:
with open(filepath, newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
try:
date = datetime.strptime(row['날짜'], '%Y-%m-%d').date()
data = {
'date': date,
'stnId': STN_ID,
'minTa': parse_float(row.get('최저기온', 0)),
'maxTa': parse_float(row.get('최고기온', 0)),
'sumRn': parse_float(row.get('일강수량\n(mm)', 0)),
'avgWs': parse_float(row.get('평균풍속\n(m/s)', 0)),
'avgRhm': parse_float(row.get('습도', 0)),
}
rows.append(data)
except Exception as e:
print(f"[WARN] 잘못된 행 건너뜀: {row} / 오류: {e}")
except FileNotFoundError:
print(f"[ERROR] 파일이 존재하지 않음: {filepath}")
sys.exit(1)
return rows
def row_exists(session, date, stnId):
stmt = select(weather_table.c.date).where(
and_(
weather_table.c.date == date,
weather_table.c.stnId == stnId
)
)
return session.execute(stmt).scalar() is not None
def insert_rows(rows):
inserted = 0
skipped = 0
session = db.get_session()
try:
for row in rows:
if row_exists(session, row['date'], row['stnId']):
skipped += 1
continue
session.execute(weather_table.insert().values(**row))
inserted += 1
session.commit()
except Exception as e:
session.rollback()
print(f"[ERROR] DB 삽입 실패: {e}")
finally:
session.close()
return inserted, skipped
def main():
print(f"[INFO] CSV 파일 로드: {CSV_PATH}")
rows = load_csv(CSV_PATH)
print(f"[INFO] 총 행 수: {len(rows)}")
if not rows:
print("[WARN] 삽입할 데이터가 없습니다.")
return
inserted, skipped = insert_rows(rows)
print(f"[DONE] 삽입 완료: {inserted}건, 건너뜀: {skipped}")
if __name__ == "__main__":
main()

View File

@ -1,6 +1,12 @@
import requests import requests
import os
import json
from datetime import datetime, timedelta from datetime import datetime, timedelta
def valid_until_hours(cached, hours=2):
ts = datetime.fromisoformat(cached['ts'])
return datetime.now() - ts < timedelta(hours=hours)
def parse_precip(value): def parse_precip(value):
if value == '강수없음': if value == '강수없음':
return 0.0 return 0.0
@ -12,6 +18,27 @@ def parse_precip(value):
except: except:
return 0.0 return 0.0
def ensure_cache_dir():
cache_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'data', 'cache'))
os.makedirs(cache_dir, exist_ok=True)
return cache_dir
def get_cache_or_request(name, valid_until_fn, request_fn):
cache_dir = ensure_cache_dir()
today = datetime.now().strftime("%Y%m%d")
cache_file = os.path.join(cache_dir, f"{name}_{today}.json")
if os.path.exists(cache_file):
with open(cache_file, 'r', encoding='utf-8') as f:
cached = json.load(f)
if valid_until_fn(cached):
return cached['data']
data = request_fn()
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump({'ts': datetime.now().isoformat(), 'data': data}, f, ensure_ascii=False)
return data
def get_latest_base_date_time(now=None): def get_latest_base_date_time(now=None):
if now is None: if now is None:
now = datetime.now() now = datetime.now()
@ -30,219 +57,228 @@ def get_latest_base_date_time(now=None):
return base_date, candidate return base_date, candidate
def get_daily_ultra_forecast(serviceKey): def get_daily_ultra_forecast(serviceKey):
base_date, base_time = get_latest_base_date_time() def request():
url = "http://apis.data.go.kr/1360000/VilageFcstInfoService_2.0/getUltraSrtFcst" base_date, base_time = get_latest_base_date_time()
params = { url = "http://apis.data.go.kr/1360000/VilageFcstInfoService_2.0/getUltraSrtFcst"
'serviceKey': serviceKey, params = {
'numOfRows': '1000', 'serviceKey': serviceKey,
'pageNo': '1', 'numOfRows': '1000',
'dataType': 'JSON', 'pageNo': '1',
'base_date': base_date, 'dataType': 'JSON',
'base_time': base_time, 'base_date': base_date,
'nx': '57', 'base_time': base_time,
'ny': '130' 'nx': '57',
} 'ny': '130'
try:
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
items = resp.json()['response']['body']['items']['item']
except Exception as e:
print(f"[ERROR] 초단기예보 호출 실패: {e}")
return {}
daily_data = {}
for item in items:
dt = item['fcstDate']
cat = item['category']
val = item['fcstValue']
if dt not in daily_data:
daily_data[dt] = {'sumRn': 0, 'minTa': [], 'maxTa': [], 'rhm': []}
if cat == 'RN1':
daily_data[dt]['sumRn'] += parse_precip(val)
elif cat == 'T3H':
try:
t = float(val)
daily_data[dt]['minTa'].append(t)
daily_data[dt]['maxTa'].append(t)
except:
pass
elif cat == 'REH':
try:
daily_data[dt]['rhm'].append(float(val))
except:
pass
result = {}
for dt, vals in daily_data.items():
minTa = min(vals['minTa']) if vals['minTa'] else 0
maxTa = max(vals['maxTa']) if vals['maxTa'] else 0
avgRhm = sum(vals['rhm']) / len(vals['rhm']) if vals['rhm'] else 0
sumRn = round(vals['sumRn'], 2)
result[dt] = {'sumRn': sumRn, 'minTa': minTa, 'maxTa': maxTa, 'avgRhm': avgRhm}
return result
def get_daily_vilage_forecast(serviceKey):
base_date, _ = get_latest_base_date_time()
url = "http://apis.data.go.kr/1360000/VilageFcstInfoService_2.0/getVilageFcst"
params = {
'serviceKey': serviceKey,
'numOfRows': '1000',
'pageNo': '1',
'dataType': 'JSON',
'base_date': base_date,
'base_time': '0200',
'nx': '57',
'ny': '130'
}
try:
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
items = resp.json()['response']['body']['items']['item']
except Exception as e:
print(f"[ERROR] 단기예보 호출 실패: {e}")
return {}
daily_data = {}
for item in items:
dt = item['fcstDate']
cat = item['category']
val = item['fcstValue']
if dt not in daily_data:
daily_data[dt] = {'sumRn': 0, 'minTa': [], 'maxTa': [], 'rhm': []}
if cat == 'RN1':
daily_data[dt]['sumRn'] += parse_precip(val)
elif cat == 'TMN':
try:
daily_data[dt]['minTa'].append(float(val))
except:
pass
elif cat == 'TMX':
try:
daily_data[dt]['maxTa'].append(float(val))
except:
pass
elif cat == 'REH':
try:
daily_data[dt]['rhm'].append(float(val))
except:
pass
result = {}
for dt, vals in daily_data.items():
minTa = min(vals['minTa']) if vals['minTa'] else 0
maxTa = max(vals['maxTa']) if vals['maxTa'] else 0
avgRhm = sum(vals['rhm']) / len(vals['rhm']) if vals['rhm'] else 0
sumRn = round(vals['sumRn'], 2)
result[dt] = {
'sumRn': sumRn,
'minTa': minTa,
'maxTa': maxTa,
'avgRhm': avgRhm
} }
return result
def get_midterm_forecast(serviceKey, regId='11B20305'):
# 중기 강수확률 예보
url = "http://apis.data.go.kr/1360000/MidFcstInfoService/getMidLandFcst"
# 발표 시각 계산: 06시 또는 18시만 존재
now = datetime.now()
if now.hour < 6:
tmFc = (now - timedelta(days=1)).strftime("%Y%m%d") + "1800"
elif now.hour < 18:
tmFc = now.strftime("%Y%m%d") + "0600"
else:
tmFc = now.strftime("%Y%m%d") + "1800"
params = {
'serviceKey': serviceKey,
'regId': regId,
'tmFc': tmFc,
'numOfRows': 10,
'pageNo': 1,
'dataType': 'JSON',
}
try:
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
data = resp.json()
items = data.get('response', {}).get('body', {}).get('items', {}).get('item', [])
if not items:
print(f"[ERROR] 중기예보 응답 item 없음. tmFc={tmFc}, regId={regId}")
return {}, {}
item = items[0] # 실제 예보 데이터
except Exception as e:
print(f"[ERROR] 중기예보 호출 실패: {e}")
return {}, {}
# 3~10일 후 강수확률 추출
precip_probs = {}
for day in range(3, 11):
key = f'rnSt{day}'
try: try:
precip_probs[day] = int(item.get(key, 0)) resp = requests.get(url, params=params, timeout=10)
except: resp.raise_for_status()
precip_probs[day] = 0 items = resp.json()['response']['body']['items']['item']
except Exception as e:
return precip_probs, item print(f"[ERROR] 초단기예보 호출 실패: {e}")
def get_midterm_temperature_forecast(serviceKey, regId='11B20305'): # 파주 코드
url = "http://apis.data.go.kr/1360000/MidFcstInfoService/getMidTa"
# 발표시각은 06:00 또는 18:00
now = datetime.now()
if now.hour < 6:
tmFc = (now - timedelta(days=1)).strftime("%Y%m%d") + "1800"
elif now.hour < 18:
tmFc = now.strftime("%Y%m%d") + "0600"
else:
tmFc = now.strftime("%Y%m%d") + "1800"
params = {
'serviceKey': serviceKey,
'regId': regId,
'tmFc': tmFc,
'pageNo': '1',
'numOfRows': '10',
'dataType': 'JSON'
}
try:
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
data = resp.json()
# 응답 검증
items = data.get("response", {}).get("body", {}).get("items", {}).get("item", [])
if not items:
print(f"[ERROR] 응답에 item 없음. tmFc={tmFc}, regId={regId}")
return {} return {}
item = items[0] daily_data = {}
for item in items:
dt = item['fcstDate']
cat = item['category']
val = item['fcstValue']
if dt not in daily_data:
daily_data[dt] = {'sumRn': 0, 'minTa': [], 'maxTa': [], 'rhm': []}
if cat == 'RN1':
daily_data[dt]['sumRn'] += parse_precip(val)
elif cat == 'T3H':
try:
t = float(val)
daily_data[dt]['minTa'].append(t)
daily_data[dt]['maxTa'].append(t)
except:
pass
elif cat == 'REH':
try:
daily_data[dt]['rhm'].append(float(val))
except:
pass
except Exception as e: result = {}
print(f"[ERROR] 중기기온예보 호출 실패: {e}") for dt, vals in daily_data.items():
return {} minTa = min(vals['minTa']) if vals['minTa'] else 0
maxTa = max(vals['maxTa']) if vals['maxTa'] else 0
temps = {} avgRhm = sum(vals['rhm']) / len(vals['rhm']) if vals['rhm'] else 0
for day in range(3, 11): sumRn = round(vals['sumRn'], 2)
min_key = f'taMin{day}' result[dt] = {
max_key = f'taMax{day}' 'sumRn': round(sumRn, 1),
try: 'minTa': round(minTa, 1),
temps[day] = { 'maxTa': round(maxTa, 1),
'min': int(item.get(min_key, 0)), 'avgRhm': round(avgRhm, 1)
'max': int(item.get(max_key, 0))
} }
except:
temps[day] = {'min': 0, 'max': 0}
return temps return result
return get_cache_or_request('ultra_forecast', lambda cached: valid_until_hours(cached, 2), request)
def get_daily_vilage_forecast(serviceKey):
def request():
base_date, _ = get_latest_base_date_time()
url = "http://apis.data.go.kr/1360000/VilageFcstInfoService_2.0/getVilageFcst"
params = {
'serviceKey': serviceKey,
'numOfRows': '1000',
'pageNo': '1',
'dataType': 'JSON',
'base_date': base_date,
'base_time': '0200',
'nx': '57',
'ny': '130'
}
try:
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
items = resp.json()['response']['body']['items']['item']
except Exception as e:
print(f"[ERROR] 단기예보 호출 실패: {e}")
return {}
daily_data = {}
for item in items:
dt = item['fcstDate']
cat = item['category']
val = item['fcstValue']
if dt not in daily_data:
daily_data[dt] = {'sumRn': 0, 'minTa': [], 'maxTa': [], 'rhm': []}
if cat == 'RN1':
daily_data[dt]['sumRn'] += parse_precip(val)
elif cat == 'TMN':
try:
daily_data[dt]['minTa'].append(float(val))
except:
pass
elif cat == 'TMX':
try:
daily_data[dt]['maxTa'].append(float(val))
except:
pass
elif cat == 'REH':
try:
daily_data[dt]['rhm'].append(float(val))
except:
pass
result = {}
for dt, vals in daily_data.items():
minTa = min(vals['minTa']) if vals['minTa'] else 0
maxTa = max(vals['maxTa']) if vals['maxTa'] else 0
avgRhm = sum(vals['rhm']) / len(vals['rhm']) if vals['rhm'] else 0
sumRn = round(vals['sumRn'], 2)
result[dt] = {
'sumRn': round(sumRn, 1),
'minTa': round(minTa, 1),
'maxTa': round(maxTa, 1),
'avgRhm': round(avgRhm, 1)
}
return result
return get_cache_or_request('vilage_forecast', lambda cached: valid_until_hours(cached, 6), request)
def get_midterm_forecast(serviceKey, regId='11B20305'):
def request():
url = "http://apis.data.go.kr/1360000/MidFcstInfoService/getMidLandFcst"
now = datetime.now()
if now.hour < 6:
tmFc = (now - timedelta(days=1)).strftime("%Y%m%d") + "1800"
elif now.hour < 18:
tmFc = now.strftime("%Y%m%d") + "0600"
else:
tmFc = now.strftime("%Y%m%d") + "1800"
params = {
'serviceKey': serviceKey,
'regId': regId,
'tmFc': tmFc,
'numOfRows': 10,
'pageNo': 1,
'dataType': 'JSON',
}
try:
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
data = resp.json()
items = data.get('response', {}).get('body', {}).get('items', {}).get('item', [])
if not items:
print(f"[ERROR] 중기예보 응답 item 없음. tmFc={tmFc}, regId={regId}")
return {}
item = items[0]
except Exception as e:
print(f"[ERROR] 중기예보 호출 실패: {e}")
return {}
precip_probs = {}
for day in range(3, 11):
key = f'rnSt{day}'
try:
precip_probs[day] = int(item.get(key, 0))
except:
precip_probs[day] = 0
return precip_probs
return get_cache_or_request('midterm_precip', lambda cached: valid_until_hours(cached, 12), request)
def get_midterm_temperature_forecast(serviceKey, regId='11B20305'):
def request():
url = "http://apis.data.go.kr/1360000/MidFcstInfoService/getMidTa"
now = datetime.now()
if now.hour < 6:
tmFc = (now - timedelta(days=1)).strftime("%Y%m%d") + "1800"
elif now.hour < 18:
tmFc = now.strftime("%Y%m%d") + "0600"
else:
tmFc = now.strftime("%Y%m%d") + "1800"
params = {
'serviceKey': serviceKey,
'regId': regId,
'tmFc': tmFc,
'pageNo': '1',
'numOfRows': '10',
'dataType': 'JSON'
}
try:
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
data = resp.json()
items = data.get("response", {}).get("body", {}).get("items", {}).get("item", [])
if not items:
print(f"[ERROR] 응답에 item 없음. tmFc={tmFc}, regId={regId}")
return {}
item = items[0]
except Exception as e:
print(f"[ERROR] 중기기온예보 호출 실패: {e}")
return {}
temps = {}
for day in range(3, 11):
min_key = f'taMin{day}'
max_key = f'taMax{day}'
min_val = item.get(min_key)
max_val = item.get(max_key)
try:
temps[day] = {
'min': int(min_val) if min_val is not None else None,
'max': int(max_val) if max_val is not None else None
}
except Exception:
temps[day] = {'min': None, 'max': None}
return temps
return get_cache_or_request('midterm_temp', lambda cached: valid_until_hours(cached, 12), request)
def get_weekly_precip(serviceKey): def get_weekly_precip(serviceKey):
from datetime import date from datetime import date
@ -251,7 +287,7 @@ def get_weekly_precip(serviceKey):
ultra = get_daily_ultra_forecast(serviceKey) ultra = get_daily_ultra_forecast(serviceKey)
short = get_daily_vilage_forecast(serviceKey) short = get_daily_vilage_forecast(serviceKey)
mid_precip, _ = get_midterm_forecast(serviceKey) mid_precip = get_midterm_forecast(serviceKey)
mid_temp = get_midterm_temperature_forecast(serviceKey) mid_temp = get_midterm_temperature_forecast(serviceKey)
results = {} results = {}
@ -267,34 +303,77 @@ def get_weekly_precip(serviceKey):
'avgRhm': 0 'avgRhm': 0
} }
# 강수량과 습도는 초단기예보 우선 반영
if dt_str in ultra: if dt_str in ultra:
results[dt_str]['sumRn'] = ultra[dt_str]['sumRn'] results[dt_str]['sumRn'] = ultra[dt_str]['sumRn']
results[dt_str]['avgRhm'] = ultra[dt_str]['avgRhm'] results[dt_str]['avgRhm'] = ultra[dt_str]['avgRhm']
# 최고/최저기온은 단기예보로만 덮어쓰기 (0이 아니면 덮어쓰기)
if dt_str in short: if dt_str in short:
if short[dt_str]['minTa'] != 0: if short[dt_str]['minTa'] != 0:
results[dt_str]['minTa'] = short[dt_str]['minTa'] results[dt_str]['minTa'] = short[dt_str]['minTa']
if short[dt_str]['maxTa'] != 0: if short[dt_str]['maxTa'] != 0:
results[dt_str]['maxTa'] = short[dt_str]['maxTa'] results[dt_str]['maxTa'] = short[dt_str]['maxTa']
# 중기예보 보정 (3일 이후부터) day_offset = (dt - today).days # 0부터 시작
day_idx = (dt - today).days + 1
if day_idx >= 3:
if day_idx in mid_precip:
mid_rain = mid_precip[day_idx] / 100 * 5.0
if results[dt_str]['sumRn'] < mid_rain:
results[dt_str]['sumRn'] = mid_rain
if day_idx in mid_temp:
# 단기예보로 이미 값이 있으면 건너뛰기
if results[dt_str]['minTa'] == 0:
results[dt_str]['minTa'] = mid_temp[day_idx]['min']
if results[dt_str]['maxTa'] == 0:
results[dt_str]['maxTa'] = mid_temp[day_idx]['max']
if day_offset >= 3:
# 중기예보 강수 우선 적용
if day_offset in mid_precip:
mid_rain = float(mid_precip[day_offset]) / 100 * 5.0
if mid_rain > results[dt_str]['sumRn']:
results[dt_str]['sumRn'] = mid_rain
# 중기예보 기온 적용: 단, None이거나 0이면 단기예보로 대체
key = str(day_offset)
if key in mid_temp:
mid_min = mid_temp[key]['min']
mid_max = mid_temp[key]['max']
if mid_min not in (None, 0):
results[dt_str]['minTa'] = mid_min
elif dt_str in short and short[dt_str]['minTa'] != 0:
results[dt_str]['minTa'] = short[dt_str]['minTa']
if mid_max not in (None, 0):
results[dt_str]['maxTa'] = mid_max
elif dt_str in short and short[dt_str]['maxTa'] != 0:
results[dt_str]['maxTa'] = short[dt_str]['maxTa']
# 중기 기온 적용 이후, 습도 보완
if results[dt_str]['avgRhm'] == 0 and dt_str in short and short[dt_str]['avgRhm'] != 0:
results[dt_str]['avgRhm'] = short[dt_str]['avgRhm']
results[dt_str] = {
'sumRn': round(results[dt_str]['sumRn'], 1),
'minTa': round(results[dt_str]['minTa'], 1),
'maxTa': round(results[dt_str]['maxTa'], 1),
'avgRhm': round(results[dt_str]['avgRhm'], 1),
}
return results return results
def print_weekly_precip_table(data_dict):
# 헤더 출력
header = f"{'날짜':<10} {'강수량(mm)':>10} {'최저기온(℃)':>12} {'최고기온(℃)':>12} {'평균습도(%)':>12}"
print(header)
print('-' * len(header))
# 날짜 순서대로 출력
for dt in sorted(data_dict.keys()):
vals = data_dict[dt]
print(f"{dt:<10} {vals['sumRn']:10.1f} {vals['minTa']:12.1f} {vals['maxTa']:12.1f} {vals['avgRhm']:12.1f}")
if __name__ == '__main__': if __name__ == '__main__':
serviceKey = "mHrZoSnzVc+2S4dpCe3A1CgI9cAu1BRttqRdoEy9RGbnKAKyQT4sqcESDqqY3grgBGQMuLeEgWIS3Qxi8rcDVA==" import os, sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from lib.common import load_config
serviceKey = load_config()['DATA_API']['serviceKey']
data = get_weekly_precip(serviceKey)
print(get_weekly_precip(serviceKey)) print(get_weekly_precip(serviceKey))
print_weekly_precip_table(data)
print(get_daily_vilage_forecast(serviceKey))
print(get_midterm_temperature_forecast(serviceKey))

View File

@ -7,8 +7,8 @@ from collections import defaultdict
import pandas as pd import pandas as pd
from sqlalchemy import select, func from sqlalchemy import select, func
from weather_forecast import get_weekly_precip
from conf import db, db_schema from conf import db, db_schema
from lib.weather_forecast import get_weekly_precip
from lib.holiday import is_korean_holiday from lib.holiday import is_korean_holiday
from lib.common import load_config from lib.common import load_config
@ -22,12 +22,50 @@ pos = db_schema.pos
engine = db.engine engine = db.engine
def get_recent_dataframe(today=None) -> pd.DataFrame:
today = today or date.today()
weekday = today.weekday()
sunday = today + timedelta(days=(6 - weekday))
recent_dates = [sunday - timedelta(days=i) for i in reversed(range(14))]
recent_data = fetch_data_for_dates(recent_dates)
# 결측 강수량 보정
weekly_precip = get_weekly_precip(config['DATA_API']['serviceKey'])
for d in recent_dates:
if d >= today and (d not in recent_data or '강수량' not in recent_data[d]):
dt_str = d.strftime('%Y%m%d')
if dt_str in weekly_precip:
recent_data[d] = recent_data.get(d, {})
recent_data[d]['강수량'] = round(float(weekly_precip[dt_str]['sumRn']), 1)
recent_data[d]['최저기온'] = round(float(weekly_precip[dt_str]['minTa']), 1)
recent_data[d]['최고기온'] = round(float(weekly_precip[dt_str]['maxTa']), 1)
recent_data[d]['습도'] = round(float(weekly_precip[dt_str]['avgRhm']), 1)
# prophet 예측값 병합
prophet_forecast = load_prophet_forecast()
for d in recent_dates:
d_ts = pd.Timestamp(d)
if d >= today and d_ts in prophet_forecast.index:
recent_data[d] = recent_data.get(d, {})
recent_data[d]['예상 방문자'] = round(float(prophet_forecast.loc[d_ts]), 0)
return build_dataframe(recent_dates, recent_data, use_forecast_after=today)
def get_last_year_dataframe(today=None) -> pd.DataFrame:
today = today or date.today()
weekday = today.weekday()
sunday = today + timedelta(days=(6 - weekday))
recent_dates = [sunday - timedelta(days=i) for i in reversed(range(14))]
prev_year_dates = get_last_year_same_weekdays(recent_dates)
prev_year_data = fetch_data_for_dates(prev_year_dates)
return build_dataframe(prev_year_dates, prev_year_data)
def get_recent_dates(today=None, days=14): def get_recent_dates(today=None, days=14):
today = today or date.today() today = today or date.today()
return [today - timedelta(days=i) for i in reversed(range(days))] return [today - timedelta(days=i) for i in reversed(range(days))]
def get_this_week_dates(today=None): def get_this_week_dates(today=None):
today = today or date.today() today = today or date.today()
weekday = today.weekday() weekday = today.weekday()
@ -205,7 +243,7 @@ def main():
recent_dates = [sunday - timedelta(days=i) for i in reversed(range(14))] recent_dates = [sunday - timedelta(days=i) for i in reversed(range(14))]
prev_year_dates = get_last_year_same_weekdays(recent_dates) prev_year_dates = get_last_year_same_weekdays(recent_dates)
# 이번 주 예상 대상 (오늘부터 일요일까지) # 이번 주 예상 대상 (오늘부터 일요일까지 )
this_week_dates = [today + timedelta(days=i) for i in range(7 - weekday)] this_week_dates = [today + timedelta(days=i) for i in range(7 - weekday)]
# 데이터 조회 # 데이터 조회
@ -228,20 +266,8 @@ def main():
# prophet 예측 결과 불러오기 및 이번 주 예상 데이터에 병합 # prophet 예측 결과 불러오기 및 이번 주 예상 데이터에 병합
prophet_forecast = load_prophet_forecast() prophet_forecast = load_prophet_forecast()
for d in this_week_dates:
d_ts = pd.Timestamp(d)
has_forecast = d_ts in prophet_forecast.index
print(f"[DEBUG] 날짜 {d} (Timestamp {d_ts}) 예측 데이터 존재 여부: {has_forecast}")
if has_forecast:
if d not in forecast_data:
forecast_data[d] = {}
forecast_data[d]['예상 방문자'] = round(float(prophet_forecast.loc[d_ts]), 0)
else:
if d not in forecast_data:
forecast_data[d] = {}
forecast_data[d]['예상 방문자'] = None
# 최근 2주 데이터에 오늘 이후 날짜에 대해 예상 방문자 병합 # 최근 2주 데이터에 오늘 이후 날짜에 대해 예상 방문자 병합
for d in recent_dates: for d in recent_dates:
d_ts = pd.Timestamp(d) d_ts = pd.Timestamp(d)
if d >= today and d_ts in prophet_forecast.index: if d >= today and d_ts in prophet_forecast.index:
@ -264,6 +290,19 @@ def main():
print("\n📈 작년 동일 요일 데이터:") print("\n📈 작년 동일 요일 데이터:")
print(df_prev.to_string(index=False)) print(df_prev.to_string(index=False))
# 🔽 엑셀 파일로 저장
output_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'output'))
os.makedirs(output_dir, exist_ok=True)
recent_excel_path = os.path.join(output_dir, 'recent_visitors.xlsx')
prev_excel_path = os.path.join(output_dir, 'lastyear_visitors.xlsx')
df_recent.to_excel(recent_excel_path, index=False)
df_prev.to_excel(prev_excel_path, index=False)
print(f"\n📁 엑셀 파일 저장 완료:")
print(f" - 최근 2주: {recent_excel_path}")
print(f" - 작년 동일 요일: {prev_excel_path}")
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -1,4 +1,7 @@
#weekly_visitor_forecast_prophet.py # weekly_visitor_forecast_prophet.py
# 퍼스트가든 방문객 예측 프로그램
# prophet를 활용한 예측처리
import os, sys import os, sys
import re, requests import re, requests
from sqlalchemy import select, and_, func from sqlalchemy import select, and_, func
@ -13,8 +16,8 @@ from datetime import date, datetime, timedelta
# 경로 설정: 프로젝트 루트 conf 폴더 내 db 및 스키마 모듈 임포트 # 경로 설정: 프로젝트 루트 conf 폴더 내 db 및 스키마 모듈 임포트
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from conf import db, db_schema from conf import db, db_schema
from weather_forecast import get_weekly_precip # 변경된 날씨 예보 함수 임포트 from lib.weather_forecast import get_weekly_precip
from lib.holiday import is_korean_holiday # holiday.py의 DB 기반 휴일 판단 함수 from lib.holiday import is_korean_holiday
from lib.common import load_config from lib.common import load_config
# DB 테이블 객체 초기화 # DB 테이블 객체 초기화
@ -263,6 +266,31 @@ def train_and_predict_rf(df, forecast_days=7):
future_df['pos_qty'] = model.predict(future_df[['weekday', 'minTa', 'maxTa', 'sumRn', 'avgRhm', 'pm25']]) future_df['pos_qty'] = model.predict(future_df[['weekday', 'minTa', 'maxTa', 'sumRn', 'avgRhm', 'pm25']])
return future_df return future_df
# weekly_visitor_forecast_prophet.py 하단에 추가
def get_forecast_dict(forecast_days=3) -> dict:
"""
오늘 기준 forecast_days일 만큼 방문객 예측 데이터를 계산해
{'2025-07-11': 1020, '2025-07-12': 1103, ...} 형태로 반환
"""
today = datetime.today().date()
start_date = today - timedelta(days=365)
end_date = today
with Session(db.engine) as session:
df = load_data(session, start_date, end_date)
prophet_df = prepare_prophet_df(df)
forecast = train_and_predict_prophet(prophet_df, forecast_days)
result = (
forecast[forecast['ds'].dt.date >= today]
[['ds', 'yhat']]
.copy()
)
result['ds'] = result['ds'].dt.strftime('%Y-%m-%d')
return dict(result.values)
def main(): def main():
today = datetime.today().date() today = datetime.today().date()
start_date = today - timedelta(days=365) start_date = today - timedelta(days=365)

View File

@ -0,0 +1,150 @@
import pandas as pd
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, Border, Side
from openpyxl.chart import LineChart, Reference
from openpyxl.chart.series import SeriesLabel
from datetime import date
import os
def generate_excel_report(today, recent_dates, prev_year_dates, recent_data, prev_year_data, filename="visitor_report.xlsx"):
weekday_names = ['', '', '', '', '', '', '']
wb = Workbook()
ws = wb.active
ws.title = "방문자 리포트"
bold = Font(bold=True)
center = Alignment(horizontal='center', vertical='center')
thick_border = Border(
left=Side(style='thick'), right=Side(style='thick'),
top=Side(style='thick'), bottom=Side(style='thick')
)
def fmt(d):
return f"{d.month}{d.day}{weekday_names[d.weekday()]}"
headers = ["구분"] + [fmt(d) for d in recent_dates]
ws.append([])
for _ in range(23):
ws.append([])
data_start_row = 24
ws.append(headers)
# 범례 영역
ws.merge_cells(start_row=data_start_row, start_column=1, end_row=data_start_row + 6, end_column=1)
ws.merge_cells(start_row=data_start_row + 7, start_column=1, end_row=data_start_row + 13, end_column=1)
ws.cell(row=data_start_row, column=1, value=f"{today.year}").font = bold
ws.cell(row=data_start_row + 7, column=1, value=f"{today.year - 1}").font = bold
def row(label, key, data, suffix="", fmt_func=None):
r = [label]
for d in recent_dates:
v = data.get(d, {}).get(key, "")
if fmt_func:
v = fmt_func(v)
if v == 0 or v == '':
r.append("")
else:
r.append(f"{v}{suffix}")
return r
# 올해 예측 포함 입장객
merged_visitors = ["입장객수"]
for d in recent_dates:
actual = recent_data.get(d, {}).get("입장객 수", 0)
forecast = recent_data.get(d, {}).get("예상 방문자", None)
if d >= today and forecast:
merged_visitors.append(f"{actual} ({int(forecast)})")
else:
merged_visitors.append(actual if actual else "")
year_rows = [
row("홈페이지", "웹 방문자 수", recent_data),
merged_visitors,
row("최저기온", "최저기온", recent_data),
row("최고기온", "최고기온", recent_data),
row("습도", "습도", recent_data, "%"),
row("강수량", "강수량", recent_data),
row("미세먼지지수", "미세먼지", recent_data),
]
for r in year_rows:
ws.append(r)
# 작년 데이터
def prev_row(label, key, suffix="", fmt_func=None):
r = [label]
for d in prev_year_dates:
v = prev_year_data.get(d, {}).get(key, "")
if fmt_func:
v = fmt_func(v)
if v == 0 or v == '':
r.append("")
else:
r.append(f"{v}{suffix}")
return r
prev_rows = [
prev_row("홈페이지", "웹 방문자 수"),
prev_row("입장객수", "입장객 수"),
prev_row("최저기온", "최저기온"),
prev_row("최고기온", "최고기온"),
prev_row("습도", "습도", "%"),
prev_row("강수량", "강수량"),
prev_row("미세먼지지수", "미세먼지"),
]
for r in prev_rows:
ws.append(r)
# 증감 비교
diff = ["입장객 증감"]
rate = ["입장객 변동률"]
temp_dev = ["최고기온 편차"]
for i, d in enumerate(recent_dates):
cur = recent_data.get(d, {}).get("입장객 수", 0)
prev = prev_year_data.get(prev_year_dates[i], {}).get("입장객 수", 0)
if prev:
diff.append(cur - prev)
rate.append(f"{(cur - prev) / prev * 100:.1f}%")
else:
diff.append("")
rate.append("")
t1 = recent_data.get(d, {}).get("최고기온")
t2 = prev_year_data.get(prev_year_dates[i], {}).get("최고기온")
temp_dev.append(round(t1 - t2, 1) if t1 is not None and t2 is not None else "")
for row in [diff, rate, temp_dev]:
ws.append(row)
# 굵은 테두리 처리
for col, d in enumerate(recent_dates, start=2):
if d >= today:
for r in range(data_start_row + 1, data_start_row + 18):
ws.cell(row=r, column=col).border = thick_border
# 차트
chart = LineChart()
chart.title = "입장객 비교 (예상 포함 vs 작년)"
chart.height = 10
chart.width = 22
chart.y_axis.title = ""
chart.x_axis.title = "날짜"
label_ref = Reference(ws, min_col=2, min_row=data_start_row, max_col=1 + len(recent_dates))
this_year_ref = Reference(ws, min_col=2, min_row=data_start_row + 2, max_col=1 + len(recent_dates))
last_year_ref = Reference(ws, min_col=2, min_row=data_start_row + 9, max_col=1 + len(recent_dates))
chart.set_categories(label_ref)
chart.add_data(this_year_ref, titles_from_data=False)
chart.add_data(last_year_ref, titles_from_data=False)
chart.series[0].tx = SeriesLabel(v="입장객수 (예상 포함)")
chart.series[1].tx = SeriesLabel(v="작년 입장객수")
chart.series[1].graphicalProperties.solidFill = "999999"
ws.add_chart(chart, "A1")
wb.save(filename)
print(f"✅ 엑셀 저장 완료: {filename}")

View File

@ -13,4 +13,3 @@ scikit-learn
customtkinter customtkinter
tkcalendar tkcalendar
tabulate tabulate
watchdog