diff --git a/lib/prophet-ensemble_forecast.py b/lib/prophet-ensemble_forecast.py new file mode 100644 index 0000000..288f05a --- /dev/null +++ b/lib/prophet-ensemble_forecast.py @@ -0,0 +1,299 @@ +import os +import sys +import re +import requests +from sqlalchemy import select, and_, func +from sqlalchemy.orm import Session +from prophet import Prophet +from statsmodels.tsa.arima.model import ARIMA +import numpy as np +import pandas as pd +from datetime import date, datetime, timedelta + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +from conf import db, db_schema +from weather_forecast import get_weekly_precip +from lib.holiday import is_korean_holiday +from lib.common import load_config + +# DB 테이블 객체 초기화 +pos = db_schema.pos +ga4 = db_schema.ga4_by_date +weather = db_schema.weather +air = db_schema.air + +# config 불러오기 +config = load_config() +serviceKey = config['DATA_API']['serviceKey'] +weight_cfg = config.get('FORECAST_WEIGHT', {}) + +VISITOR_CA = tuple(config['POS']['VISITOR_CA']) + +visitor_forecast_multiplier = weight_cfg.get('visitor_forecast_multiplier', 1.0) +minTa_weight = weight_cfg.get('minTa', 1.0) +maxTa_weight = weight_cfg.get('maxTa', 1.0) +sumRn_weight = weight_cfg.get('sumRn', 1.0) +avgRhm_weight = weight_cfg.get('avgRhm', 1.0) +pm25_weight = weight_cfg.get('pm25', 1.0) +is_holiday_weight = weight_cfg.get('is_holiday', 1.0) + +def get_date_range(start_date, end_date): + return pd.date_range(start_date, end_date).to_pydatetime().tolist() + +def add_korean_holiday_feature(df): + df['is_holiday'] = df['date'].apply(lambda d: 1 if is_korean_holiday(d.date()) else 0) + return df + +def fix_zero_visitors_weighted(df): + df = df.copy() + if 'date' not in df.columns and 'ds' in df.columns: + df['date'] = df['ds'] + if 'pos_qty' not in df.columns and 'y' in df.columns: + df['pos_qty'] = df['y'] + if 'is_holiday' not in df.columns: + raise ValueError("DataFrame에 'is_holiday' 컬럼이 필요합니다.") + df['year_month'] = df['date'].dt.strftime('%Y-%m') + monthly_means = df[df['pos_qty'] > 0].groupby(['year_month', 'is_holiday'])['pos_qty'].mean() + arr = df['pos_qty'].values.copy() + for i in range(len(arr)): + if arr[i] == 0: + ym = df.iloc[i]['year_month'] + holiday_flag = df.iloc[i]['is_holiday'] + mean_val = monthly_means.get((ym, holiday_flag), np.nan) + arr[i] = 0 if np.isnan(mean_val) else mean_val + df['pos_qty'] = arr + if 'y' in df.columns: + df['y'] = df['pos_qty'] + df.drop(columns=['year_month'], inplace=True) + return df + +def load_data(session, start_date, end_date): + dates = get_date_range(start_date, end_date) + + stmt_pos = select( + pos.c.date, + func.sum(pos.c.qty).label('pos_qty') + ).where( + and_( + pos.c.date >= start_date, + pos.c.date <= end_date, + pos.c.ca01 == '매표소', + pos.c.ca03.in_(VISITOR_CA) + ) + ).group_by(pos.c.date) + + stmt_ga4 = select(ga4.c.date, ga4.c.activeUsers).where( + and_(ga4.c.date >= start_date, ga4.c.date <= end_date) + ) + + stmt_weather = select( + weather.c.date, + weather.c.minTa, + weather.c.maxTa, + weather.c.sumRn, + weather.c.avgRhm + ).where( + and_( + weather.c.date >= start_date, + weather.c.date <= end_date, + weather.c.stnId == 99 + ) + ) + + stmt_air = select(air.c.date, air.c.pm25).where( + and_( + air.c.date >= start_date, + air.c.date <= end_date, + air.c.station == '운정' + ) + ) + + pos_data = {row['date']: row['pos_qty'] for row in session.execute(stmt_pos).mappings().all()} + ga4_data = {row['date']: row['activeUsers'] for row in session.execute(stmt_ga4).mappings().all()} + weather_data = {row['date']: row for row in session.execute(stmt_weather).mappings().all()} + air_data = {row['date']: row['pm25'] for row in session.execute(stmt_air).mappings().all()} + + records = [] + for d in dates: + key = d.date() if isinstance(d, datetime) else d + record = { + 'date': d, + 'pos_qty': pos_data.get(key, 0), + 'activeUsers': ga4_data.get(key, 0), + 'minTa': weather_data.get(key, {}).get('minTa', 0) if weather_data.get(key) else 0, + 'maxTa': weather_data.get(key, {}).get('maxTa', 0) if weather_data.get(key) else 0, + 'sumRn': weather_data.get(key, {}).get('sumRn', 0) if weather_data.get(key) else 0, + 'avgRhm': weather_data.get(key, {}).get('avgRhm', 0) if weather_data.get(key) else 0, + 'pm25': air_data.get(key, 0) + } + records.append(record) + + df = pd.DataFrame(records) + df = add_korean_holiday_feature(df) + df = fix_zero_visitors_weighted(df) + df['weekday'] = df['date'].dt.weekday + return df + +def prepare_prophet_df(df): + prophet_df = pd.DataFrame({ + 'ds': df['date'], + 'y': df['pos_qty'].astype(float), + 'minTa': df['minTa'].astype(float), + 'maxTa': df['maxTa'].astype(float), + 'sumRn': df['sumRn'].astype(float), + 'avgRhm': df['avgRhm'].astype(float), + 'pm25': df['pm25'].astype(float), + 'is_holiday': df['is_holiday'].astype(int) + }) + return prophet_df + +def train_and_predict_prophet(prophet_df, forecast_days=7): + prophet_df = prophet_df.copy() + + # 결측값을 전일과 다음날의 평균치로 선형 보간 처리 + for col in ['minTa', 'maxTa', 'sumRn', 'avgRhm', 'pm25', 'is_holiday']: + if col in prophet_df.columns: + prophet_df[col] = prophet_df[col].interpolate(method='linear', limit_direction='both') + + # 보간 후 남은 결측치는 0으로 처리 + prophet_df.fillna({ + 'minTa': 0, + 'maxTa': 0, + 'sumRn': 0, + 'avgRhm': 0, + 'pm25': 0, + 'is_holiday': 0 + }, inplace=True) + + # 가중치 적용 + prophet_df['minTa'] *= minTa_weight + prophet_df['maxTa'] *= maxTa_weight + prophet_df['sumRn'] *= sumRn_weight + prophet_df['avgRhm'] *= avgRhm_weight + prophet_df['pm25'] *= pm25_weight + prophet_df['is_holiday'] *= is_holiday_weight + + # 고정 0 방문객값 보정 + prophet_df = fix_zero_visitors_weighted(prophet_df) + + # Prophet 모델 정의 및 학습 + m = Prophet(weekly_seasonality=True, yearly_seasonality=True, daily_seasonality=False) + m.add_regressor('minTa') + m.add_regressor('maxTa') + m.add_regressor('sumRn') + m.add_regressor('avgRhm') + m.add_regressor('pm25') + m.add_regressor('is_holiday') + + m.fit(prophet_df) + future = m.make_future_dataframe(periods=forecast_days) + + # 미래 데이터에 날씨 예보값과 가중치 적용 + weekly_precip = get_weekly_precip(serviceKey) + + sumRn_list, minTa_list, maxTa_list, avgRhm_list = [], [], [], [] + for dt in future['ds']: + dt_str = dt.strftime('%Y%m%d') + day_forecast = weekly_precip.get(dt_str, None) + if day_forecast: + sumRn_list.append(float(day_forecast.get('sumRn', 0)) * sumRn_weight) + minTa_list.append(float(day_forecast.get('minTa', 0)) * minTa_weight) + maxTa_list.append(float(day_forecast.get('maxTa', 0)) * maxTa_weight) + avgRhm_list.append(float(day_forecast.get('avgRhm', 0)) * avgRhm_weight) + else: + sumRn_list.append(0) + minTa_list.append(0) + maxTa_list.append(0) + avgRhm_list.append(0) + + future['sumRn'] = sumRn_list + future['minTa'] = minTa_list + future['maxTa'] = maxTa_list + future['avgRhm'] = avgRhm_list + + # pm25는 마지막 과거 데이터값에 가중치 적용 + last_known = prophet_df.iloc[-1] + future['pm25'] = last_known['pm25'] * pm25_weight + + # 휴일 여부도 가중치 곱해서 적용 + future['is_holiday'] = future['ds'].apply(lambda d: 1 if is_korean_holiday(d.date()) else 0) * is_holiday_weight + + forecast = m.predict(future) + + # 방문객 예측값에 multiplier 적용 및 정수형 변환 + forecast['yhat'] = (forecast['yhat'] * visitor_forecast_multiplier).round().astype(int) + forecast['yhat_lower'] = (forecast['yhat_lower'] * visitor_forecast_multiplier).round().astype(int) + forecast['yhat_upper'] = (forecast['yhat_upper'] * visitor_forecast_multiplier).round().astype(int) + + # 결과 CSV 저장 + output_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'data', 'prophet_result.csv')) + os.makedirs(os.path.dirname(output_path), exist_ok=True) + + df_to_save = forecast[['ds', 'yhat']].copy() + df_to_save.columns = ['date', 'visitor_forecast'] + df_to_save['date'] = df_to_save['date'].dt.strftime("%Y-%m-%d") + + today_str = date.today().strftime("%Y-%m-%d") + df_to_save = df_to_save[df_to_save['date'] >= today_str] + df_to_save.to_csv(output_path, index=False) + + return forecast + +def train_and_predict_arima(ts, forecast_days=7): + model = ARIMA(ts, order=(5,1,0)) + model_fit = model.fit() + forecast = model_fit.forecast(steps=forecast_days) + return forecast + +def train_and_predict_rf(df, forecast_days=7): + from sklearn.ensemble import RandomForestRegressor + df = df.copy() + df['weekday'] = df['date'].dt.weekday + X = df[['weekday', 'minTa', 'maxTa', 'sumRn', 'avgRhm', 'pm25']] + y = df['pos_qty'] + model = RandomForestRegressor(n_estimators=100, random_state=42) + model.fit(X, y) + future_dates = pd.date_range(df['date'].max() + timedelta(days=1), periods=forecast_days) + future_df = pd.DataFrame({ + 'date': future_dates, + 'weekday': future_dates.weekday, + 'minTa': 0, + 'maxTa': 0, + 'sumRn': 0, + 'avgRhm': 0, + 'pm25': 0 + }) + future_df['pos_qty'] = model.predict(future_df[['weekday', 'minTa', 'maxTa', 'sumRn', 'avgRhm', 'pm25']]) + return future_df + +def main(): + today = datetime.today().date() + start_date = today - timedelta(days=365) + end_date = today + + with Session(db.engine) as session: + df = load_data(session, start_date, end_date) + + prophet_df = prepare_prophet_df(df) + forecast_days = 7 + + forecast = train_and_predict_prophet(prophet_df, forecast_days) + + forecast['yhat'] = forecast['yhat'].round().astype(int) + forecast['yhat_lower'] = forecast['yhat_lower'].round().astype(int) + forecast['yhat_upper'] = forecast['yhat_upper'].round().astype(int) + + weekly_precip = get_weekly_precip(serviceKey) + + output_df = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail(10).copy() + output_df.columns = ['날짜', '예상 방문객', '하한', '상한'] + + print("이번 주 강수 예보:") + for dt_str, val in weekly_precip.items(): + print(f"{dt_str}: 강수량={val['sumRn']:.1f}mm, 최저기온={val['minTa']}, 최고기온={val['maxTa']}, 습도={val['avgRhm']:.1f}%") + + print("\n예측 방문객:") + print(output_df.to_string(index=False)) + +if __name__ == '__main__': + main()