Files
static/lib/weekly_visitor_forecast_prophet.py

328 lines
12 KiB
Python

# weekly_visitor_forecast_prophet.py
# 퍼스트가든 방문객 예측 프로그램
# prophet를 활용한 예측처리
import os, sys
import re, requests
from sqlalchemy import select, and_, func
from sqlalchemy.orm import Session
from prophet import Prophet
from statsmodels.tsa.arima.model import ARIMA
from sklearn.ensemble import RandomForestRegressor
import numpy as np
import pandas as pd
from datetime import date, datetime, timedelta
# 경로 설정: 프로젝트 루트 conf 폴더 내 db 및 스키마 모듈 임포트
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from conf import db, db_schema
from lib.weather_forecast import get_weekly_precip
from lib.holiday import is_korean_holiday
from lib.common import load_config
# DB 테이블 객체 초기화
pos = db_schema.pos
ga4 = db_schema.ga4_by_date
weather = db_schema.weather
air = db_schema.air
# config 불러오기
config = load_config()
serviceKey = config['DATA_API']['serviceKey']
weight_cfg = config.get('FORECAST_WEIGHT', {})
VISITOR_CA = tuple(config['POS']['VISITOR_CA'])
visitor_forecast_multiplier = weight_cfg.get('visitor_forecast_multiplier', 1.0)
minTa_weight = weight_cfg.get('minTa', 1.0)
maxTa_weight = weight_cfg.get('maxTa', 1.0)
sumRn_weight = weight_cfg.get('sumRn', 1.0)
avgRhm_weight = weight_cfg.get('avgRhm', 1.0)
pm25_weight = weight_cfg.get('pm25', 1.0)
is_holiday_weight = weight_cfg.get('is_holiday', 1.0)
# --- 데이터 로딩 및 전처리 ---
def get_date_range(start_date, end_date):
return pd.date_range(start_date, end_date).to_pydatetime().tolist()
def add_korean_holiday_feature(df):
df['is_holiday'] = df['date'].apply(lambda d: 1 if is_korean_holiday(d.date()) else 0)
return df
def fix_zero_visitors_weighted(df):
df = df.copy()
if 'date' not in df.columns and 'ds' in df.columns:
df['date'] = df['ds']
if 'pos_qty' not in df.columns and 'y' in df.columns:
df['pos_qty'] = df['y']
if 'is_holiday' not in df.columns:
raise ValueError("DataFrame에 'is_holiday' 컬럼이 필요합니다.")
df['year_month'] = df['date'].dt.strftime('%Y-%m')
monthly_means = df[df['pos_qty'] > 0].groupby(['year_month', 'is_holiday'])['pos_qty'].mean()
arr = df['pos_qty'].values.copy()
for i in range(len(arr)):
if arr[i] == 0:
ym = df.iloc[i]['year_month']
holiday_flag = df.iloc[i]['is_holiday']
mean_val = monthly_means.get((ym, holiday_flag), np.nan)
arr[i] = 0 if np.isnan(mean_val) else mean_val
df['pos_qty'] = arr
if 'y' in df.columns:
df['y'] = df['pos_qty']
df.drop(columns=['year_month'], inplace=True)
return df
def load_data(session, start_date, end_date):
dates = get_date_range(start_date, end_date)
stmt_pos = select(
pos.c.date,
func.sum(pos.c.qty).label('pos_qty')
).where(
and_(
pos.c.date >= start_date,
pos.c.date <= end_date,
pos.c.ca01 == '매표소',
pos.c.ca03.in_(VISITOR_CA)
)
).group_by(pos.c.date)
pos_data = {row.date: row.pos_qty for row in session.execute(stmt_pos).fetchall()}
stmt_ga4 = select(ga4.c.date, ga4.c.activeUsers).where(
and_(ga4.c.date >= start_date, ga4.c.date <= end_date)
)
ga4_data = {row.date: row.activeUsers for row in session.execute(stmt_ga4).fetchall()}
stmt_weather = select(
weather.c.date,
weather.c.minTa,
weather.c.maxTa,
weather.c.sumRn,
weather.c.avgRhm
).where(
and_(
weather.c.date >= start_date,
weather.c.date <= end_date,
weather.c.stnId == 99
)
)
weather_data = {row.date: row for row in session.execute(stmt_weather).fetchall()}
stmt_air = select(air.c.date, air.c.pm25).where(
and_(
air.c.date >= start_date,
air.c.date <= end_date,
air.c.station == '운정'
)
)
air_data = {row.date: row.pm25 for row in session.execute(stmt_air).fetchall()}
records = []
for d in dates:
key = d.date() if isinstance(d, datetime) else d
record = {
'date': d,
'pos_qty': pos_data.get(key, 0),
'activeUsers': ga4_data.get(key, 0),
'minTa': getattr(weather_data.get(key), 'minTa', 0) if weather_data.get(key) else 0,
'maxTa': getattr(weather_data.get(key), 'maxTa', 0) if weather_data.get(key) else 0,
'sumRn': getattr(weather_data.get(key), 'sumRn', 0) if weather_data.get(key) else 0,
'avgRhm': getattr(weather_data.get(key), 'avgRhm', 0) if weather_data.get(key) else 0,
'pm25': air_data.get(key, 0)
}
records.append(record)
df = pd.DataFrame(records)
df = add_korean_holiday_feature(df)
df = fix_zero_visitors_weighted(df)
df['weekday'] = df['date'].dt.weekday
return df
def prepare_prophet_df(df):
prophet_df = pd.DataFrame({
'ds': df['date'],
'y': df['pos_qty'].astype(float),
'minTa': df['minTa'].astype(float),
'maxTa': df['maxTa'].astype(float),
'sumRn': df['sumRn'].astype(float),
'avgRhm': df['avgRhm'].astype(float),
'pm25': df['pm25'].astype(float),
'is_holiday': df['is_holiday'].astype(int)
})
return prophet_df
def train_and_predict_prophet(prophet_df, forecast_days=7):
# 가중치 적용 - 훈련 데이터의 기상/환경 변수 컬럼별 곱하기
prophet_df = prophet_df.copy()
prophet_df['minTa'] *= minTa_weight
prophet_df['maxTa'] *= maxTa_weight
prophet_df['sumRn'] *= sumRn_weight
prophet_df['avgRhm'] *= avgRhm_weight
prophet_df['pm25'] *= pm25_weight
prophet_df['is_holiday'] *= is_holiday_weight
# 기존 fix_zero_visitors_weighted 함수 호출 (필요 시)
prophet_df = fix_zero_visitors_weighted(prophet_df)
# 결측치 처리
prophet_df.fillna({
'minTa': 0,
'maxTa': 0,
'sumRn': 0,
'avgRhm': 0,
'pm25': 0,
'is_holiday': 0
}, inplace=True)
m = Prophet(weekly_seasonality=True, yearly_seasonality=True, daily_seasonality=False)
m.add_regressor('minTa')
m.add_regressor('maxTa')
m.add_regressor('sumRn')
m.add_regressor('avgRhm')
m.add_regressor('pm25')
m.add_regressor('is_holiday')
m.fit(prophet_df)
future = m.make_future_dataframe(periods=forecast_days)
# 미래 데이터에 날씨 예보 값 가져와서 가중치 적용
weekly_precip = get_weekly_precip(serviceKey)
sumRn_list = []
minTa_list = []
maxTa_list = []
avgRhm_list = []
for dt in future['ds']:
dt_str = dt.strftime('%Y%m%d')
day_forecast = weekly_precip.get(dt_str, None)
if day_forecast:
sumRn_list.append(float(day_forecast.get('sumRn', 0)) * sumRn_weight)
minTa_list.append(float(day_forecast.get('minTa', 0)) * minTa_weight)
maxTa_list.append(float(day_forecast.get('maxTa', 0)) * maxTa_weight)
avgRhm_list.append(float(day_forecast.get('avgRhm', 0)) * avgRhm_weight)
else:
sumRn_list.append(0)
minTa_list.append(0)
maxTa_list.append(0)
avgRhm_list.append(0)
future['sumRn'] = sumRn_list
future['minTa'] = minTa_list
future['maxTa'] = maxTa_list
future['avgRhm'] = avgRhm_list
# pm25는 과거 마지막 데이터 * 가중치 적용
last_known = prophet_df.iloc[-1]
future['pm25'] = last_known['pm25'] * pm25_weight
# 휴일 여부도 가중치 곱해서 적용
future['is_holiday'] = future['ds'].apply(lambda d: 1 if is_korean_holiday(d.date()) else 0) * is_holiday_weight
forecast = m.predict(future)
# 최종 방문객 예측에 multiplier 곱하기
forecast['yhat'] = (forecast['yhat'] * visitor_forecast_multiplier).round().astype(int)
forecast['yhat_lower'] = (forecast['yhat_lower'] * visitor_forecast_multiplier).round().astype(int)
forecast['yhat_upper'] = (forecast['yhat_upper'] * visitor_forecast_multiplier).round().astype(int)
# csv 저장 및 반환
output_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'data', 'prophet_result.csv'))
os.makedirs(os.path.dirname(output_path), exist_ok=True)
df_to_save = forecast[['ds', 'yhat']].copy()
df_to_save.columns = ['date', 'visitor_forecast']
df_to_save['date'] = df_to_save['date'].dt.strftime("%Y-%m-%d")
today_str = date.today().strftime("%Y-%m-%d")
df_to_save = df_to_save[df_to_save['date'] >= today_str]
df_to_save.to_csv(output_path, index=False)
return forecast
def train_and_predict_arima(ts, forecast_days=7):
model = ARIMA(ts, order=(5,1,0))
model_fit = model.fit()
forecast = model_fit.forecast(steps=forecast_days)
return forecast
def train_and_predict_rf(df, forecast_days=7):
from sklearn.ensemble import RandomForestRegressor
df = df.copy()
df['weekday'] = df['date'].dt.weekday
X = df[['weekday', 'minTa', 'maxTa', 'sumRn', 'avgRhm', 'pm25']]
y = df['pos_qty']
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X, y)
future_dates = pd.date_range(df['date'].max() + timedelta(days=1), periods=forecast_days)
future_df = pd.DataFrame({
'date': future_dates,
'weekday': future_dates.weekday,
'minTa': 0,
'maxTa': 0,
'sumRn': 0,
'avgRhm': 0,
'pm25': 0
})
future_df['pos_qty'] = model.predict(future_df[['weekday', 'minTa', 'maxTa', 'sumRn', 'avgRhm', 'pm25']])
return future_df
# weekly_visitor_forecast_prophet.py 하단에 추가
def get_forecast_dict(forecast_days=3) -> dict:
"""
오늘 기준 forecast_days일 만큼 방문객 예측 데이터를 계산해
{'2025-07-11': 1020, '2025-07-12': 1103, ...} 형태로 반환
"""
today = datetime.today().date()
start_date = today - timedelta(days=365)
end_date = today
with Session(db.engine) as session:
df = load_data(session, start_date, end_date)
prophet_df = prepare_prophet_df(df)
forecast = train_and_predict_prophet(prophet_df, forecast_days)
result = (
forecast[forecast['ds'].dt.date >= today]
[['ds', 'yhat']]
.copy()
)
result['ds'] = result['ds'].dt.strftime('%Y-%m-%d')
return dict(result.values)
def main():
today = datetime.today().date()
start_date = today - timedelta(days=365)
end_date = today
with Session(db.engine) as session:
df = load_data(session, start_date, end_date)
prophet_df = prepare_prophet_df(df)
forecast_days = 7
forecast = train_and_predict_prophet(prophet_df, forecast_days)
# 예측 후 정수 변환
forecast['yhat'] = forecast['yhat'].round().astype(int)
forecast['yhat_lower'] = forecast['yhat_lower'].round().astype(int)
forecast['yhat_upper'] = forecast['yhat_upper'].round().astype(int)
# 강수량 정보 포함 출력 (오늘 이후는 날씨 예보 데이터로 덮음)
weekly_precip = get_weekly_precip(serviceKey)
# 최근 10일 예측 결과 출력
output_df = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail(10).copy()
output_df.columns = ['날짜', '예상 방문객', '하한', '상한']
print("이번 주 강수 예보:")
for dt_str, val in weekly_precip.items():
print(f"{dt_str}: 강수량={val['sumRn']:.1f}mm, 최저기온={val['minTa']}, 최고기온={val['maxTa']}, 습도={val['avgRhm']:.1f}%")
print("\n예측 방문객:")
print(output_df.to_string(index=False))
if __name__ == '__main__':
main()