From be1e41528fa5a1b6866206acf1d96a3bcfd61aa2 Mon Sep 17 00:00:00 2001 From: KWON Date: Tue, 8 Jul 2025 14:37:28 +0900 Subject: [PATCH] =?UTF-8?q?=EA=B8=B0=EC=A1=B4=20=EB=B0=A9=EB=AC=B8?= =?UTF-8?q?=EA=B0=9D=20=EC=A1=B0=ED=9A=8C,=20=EC=A0=84=EB=85=84=EB=8F=84?= =?UTF-8?q?=20=EB=8D=B0=EC=9D=B4=ED=84=B0=20=EB=B9=84=EA=B5=90,=20?= =?UTF-8?q?=EC=98=88=EC=83=81=20=EB=B0=A9=EB=AC=B8=EA=B0=9D=20=EA=B3=84?= =?UTF-8?q?=EC=82=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/weekly_visitor_forecast.py | 242 +++++++++++++++++++++ lib/weekly_visitor_forecast_prophet.py | 278 +++++++++++++++++++++++++ 2 files changed, 520 insertions(+) create mode 100644 lib/weekly_visitor_forecast.py create mode 100644 lib/weekly_visitor_forecast_prophet.py diff --git a/lib/weekly_visitor_forecast.py b/lib/weekly_visitor_forecast.py new file mode 100644 index 0000000..1a3d3dd --- /dev/null +++ b/lib/weekly_visitor_forecast.py @@ -0,0 +1,242 @@ +# weekly_visitor_forecast.py +import os, sys +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from datetime import date, timedelta, datetime +from collections import defaultdict +import pandas as pd +from sqlalchemy import select, func + +from weather_forecast import get_weekly_precip +from conf import db, db_schema +from lib.holiday import is_korean_holiday +from lib.common import load_config + +config = load_config() +visitor_ca_filter = config.get('POS', {}).get('VISITOR_CA', []) + +ga4_by_date = db_schema.ga4_by_date +weather = db_schema.weather +air = db_schema.air +pos = db_schema.pos + +engine = db.engine + + +def get_recent_dates(today=None, days=14): + today = today or date.today() + return [today - timedelta(days=i) for i in reversed(range(days))] + + +def get_this_week_dates(today=None): + today = today or date.today() + weekday = today.weekday() + return [today + timedelta(days=i) for i in range(7 - weekday)] + + +def get_last_year_same_weekdays(dates): + return [d.replace(year=d.year - 1) for d in dates] + + +def pm25_grade(value): + if value is None: + return '' + if value <= 15: + return '좋음' + elif value <= 35: + return '보통' + elif value <= 75: + return '나쁨' + else: + return '매우나쁨' + + +def fetch_data_for_dates(date_list): + session = db.get_session() + data = defaultdict(dict) + try: + # GA4 activeUsers + stmt = ( + select(ga4_by_date.c.date, func.sum(ga4_by_date.c.activeUsers)) + .where(ga4_by_date.c.date.in_(date_list)) + .group_by(ga4_by_date.c.date) + ) + for d, val in session.execute(stmt): + data[d]['웹 방문자 수'] = val + + # POS 입장객 수 + stmt = ( + select(pos.c.date, func.sum(pos.c.qty)) + .where( + (pos.c.date.in_(date_list)) & + (pos.c.ca01 == '매표소') & + (pos.c.ca03.in_(visitor_ca_filter)) + ) + .group_by(pos.c.date) + ) + for d, val in session.execute(stmt): + data[d]['입장객 수'] = val + + # 날씨 정보 + stmt = ( + select( + weather.c.date, + func.min(weather.c.minTa), + func.max(weather.c.maxTa), + func.avg(weather.c.avgRhm), + func.sum(weather.c.sumRn) + ) + .where(weather.c.date.in_(date_list)) + .group_by(weather.c.date) + ) + for row in session.execute(stmt): + d, minTa, maxTa, rhm, rn = row + data[d]['최저기온'] = round(minTa or 0, 1) + data[d]['최고기온'] = round(maxTa or 0, 1) + data[d]['습도'] = round(rhm or 0, 1) + data[d]['강수량'] = round(rn or 0, 1) + + # 미세먼지 (pm25) + stmt = ( + select(air.c.date, func.avg(air.c.pm25)) + .where(air.c.date.in_(date_list)) + .group_by(air.c.date) + ) + for d, pm25 in session.execute(stmt): + data[d]['미세먼지'] = pm25_grade(pm25) + + finally: + session.close() + + return data + + +def load_prophet_forecast(file_path=None): + if file_path is None: + file_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'data', 'prophet_result.csv')) + print(f"[DEBUG] Load prophet forecast from: {file_path}") + + if not os.path.exists(file_path): + print(f"[ERROR] 파일이 존재하지 않습니다: {file_path}") + return pd.Series(dtype=float) + + try: + df = pd.read_csv(file_path) + # 컬럼명 출력 확인 + print(f"[DEBUG] CSV columns: {df.columns.tolist()}") + + if 'date' not in df.columns or 'visitor_forecast' not in df.columns: + print("[ERROR] 필요한 컬럼이 CSV에 없습니다.") + return pd.Series(dtype=float) + + df['date'] = pd.to_datetime(df['date']) + df.set_index('date', inplace=True) + + return df['visitor_forecast'] + except Exception as e: + print(f"[ERROR] Prophet 예측 결과 불러오기 실패: {e}") + return pd.Series(dtype=float) + + +def build_dataframe(dates, data, use_forecast_after=None): + """ + use_forecast_after: datetime.date or None + 지정한 날짜 이후부터는 '예상 방문자'로 '입장객 수' 대체 + """ + records = [] + for d in dates: + predicted = data.get(d, {}).get('예상 방문자') + if use_forecast_after is not None and d >= use_forecast_after and predicted is not None: + 입장객수 = int(predicted) + else: + 입장객수 = data.get(d, {}).get('입장객 수', 0) + + r = { + '날짜': d.strftime('%Y-%m-%d'), + '요일': ['월', '화', '수', '목', '금', '토', '일'][d.weekday()], + '공휴일': '✅' if is_korean_holiday(d) else '', + '웹 방문자 수': data.get(d, {}).get('웹 방문자 수', 0), + '입장객 수': 입장객수, + '최저기온': data.get(d, {}).get('최저기온', ''), + '최고기온': data.get(d, {}).get('최고기온', ''), + '습도': data.get(d, {}).get('습도', ''), + '강수량': data.get(d, {}).get('강수량', ''), + '미세먼지': data.get(d, {}).get('미세먼지', ''), + } + records.append(r) + return pd.DataFrame(records) + + +def main(): + today = date.today() + + # 이번 주 일요일 (주말) + weekday = today.weekday() + sunday = today + timedelta(days=(6 - weekday)) + + # 최근 2주 및 작년 동일 요일 (최근 2주는 sunday까지 포함) + recent_dates = [sunday - timedelta(days=i) for i in reversed(range(14))] + prev_year_dates = get_last_year_same_weekdays(recent_dates) + + # 이번 주 예상 대상 (오늘부터 일요일까지) + this_week_dates = [today + timedelta(days=i) for i in range(7 - weekday)] + + # 데이터 조회 + recent_data = fetch_data_for_dates(recent_dates) + prev_year_data = fetch_data_for_dates(prev_year_dates) + forecast_data = fetch_data_for_dates(this_week_dates) + + # 결측 강수량 보정 - 오늘 이후 날짜가 비어있거나 강수량 없으면 날씨예보로 채움 + weekly_precip = get_weekly_precip(load_config()['DATA_API']['serviceKey']) + for d in recent_dates: + if d >= today and (d not in recent_data or '강수량' not in recent_data[d]): + dt_str = d.strftime('%Y%m%d') + if dt_str in weekly_precip: + if d not in recent_data: + recent_data[d] = {} + recent_data[d]['강수량'] = round(float(weekly_precip[dt_str]['sumRn']), 1) + recent_data[d]['최저기온'] = round(float(weekly_precip[dt_str]['minTa']), 1) + recent_data[d]['최고기온'] = round(float(weekly_precip[dt_str]['maxTa']), 1) + recent_data[d]['습도'] = round(float(weekly_precip[dt_str]['avgRhm']), 1) + + # prophet 예측 결과 불러오기 및 이번 주 예상 데이터에 병합 + prophet_forecast = load_prophet_forecast() + for d in this_week_dates: + d_ts = pd.Timestamp(d) + has_forecast = d_ts in prophet_forecast.index + print(f"[DEBUG] 날짜 {d} (Timestamp {d_ts}) 예측 데이터 존재 여부: {has_forecast}") + if has_forecast: + if d not in forecast_data: + forecast_data[d] = {} + forecast_data[d]['예상 방문자'] = round(float(prophet_forecast.loc[d_ts]), 0) + else: + if d not in forecast_data: + forecast_data[d] = {} + forecast_data[d]['예상 방문자'] = None + + # 최근 2주 데이터에도 오늘 이후 날짜에 대해 예상 방문자 병합 + for d in recent_dates: + d_ts = pd.Timestamp(d) + if d >= today and d_ts in prophet_forecast.index: + if d not in recent_data: + recent_data[d] = {} + recent_data[d]['예상 방문자'] = round(float(prophet_forecast.loc[d_ts]), 0) + + # 데이터프레임 생성 + df_recent = build_dataframe(recent_dates, recent_data, use_forecast_after=today) + df_prev = build_dataframe(prev_year_dates, prev_year_data) + + # 출력 설정 + pd.set_option('display.unicode.east_asian_width', True) + pd.set_option('display.max_columns', None) + pd.set_option('display.width', 200) + + print("📊 최근 2주간 방문자 현황:") + print(df_recent.to_string(index=False)) + + print("\n📈 작년 동일 요일 데이터:") + print(df_prev.to_string(index=False)) + + +if __name__ == "__main__": + main() diff --git a/lib/weekly_visitor_forecast_prophet.py b/lib/weekly_visitor_forecast_prophet.py new file mode 100644 index 0000000..fab4c8b --- /dev/null +++ b/lib/weekly_visitor_forecast_prophet.py @@ -0,0 +1,278 @@ +#weekly_visitor_forecast_prophet.py +import os, sys +import re, requests +from sqlalchemy import select, and_, func +from sqlalchemy.orm import Session +from prophet import Prophet +from statsmodels.tsa.arima.model import ARIMA +from sklearn.ensemble import RandomForestRegressor +import numpy as np +import pandas as pd +from datetime import date, datetime, timedelta + +# 경로 설정: 프로젝트 루트 conf 폴더 내 db 및 스키마 모듈 임포트 +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) +from conf import db, db_schema +from weather_forecast import get_weekly_precip # 변경된 날씨 예보 함수 임포트 +from lib.holiday import is_korean_holiday # holiday.py의 DB 기반 휴일 판단 함수 +from lib.common import load_config + +# DB 테이블 객체 초기화 +pos = db_schema.pos +ga4 = db_schema.ga4_by_date +weather = db_schema.weather +air = db_schema.air + +config = load_config() +serviceKey = config['DATA_API']['serviceKey'] +VISITOR_CA = tuple(config['POS']['VISITOR_CA']) + + + +# --- 데이터 로딩 및 전처리 --- + +def get_date_range(start_date, end_date): + return pd.date_range(start_date, end_date).to_pydatetime().tolist() + +def add_korean_holiday_feature(df): + df['is_holiday'] = df['date'].apply(lambda d: 1 if is_korean_holiday(d.date()) else 0) + return df + +def fix_zero_visitors_weighted(df): + df = df.copy() + if 'date' not in df.columns and 'ds' in df.columns: + df['date'] = df['ds'] + if 'pos_qty' not in df.columns and 'y' in df.columns: + df['pos_qty'] = df['y'] + if 'is_holiday' not in df.columns: + raise ValueError("DataFrame에 'is_holiday' 컬럼이 필요합니다.") + df['year_month'] = df['date'].dt.strftime('%Y-%m') + monthly_means = df[df['pos_qty'] > 0].groupby(['year_month', 'is_holiday'])['pos_qty'].mean() + arr = df['pos_qty'].values.copy() + for i in range(len(arr)): + if arr[i] == 0: + ym = df.iloc[i]['year_month'] + holiday_flag = df.iloc[i]['is_holiday'] + mean_val = monthly_means.get((ym, holiday_flag), np.nan) + arr[i] = 0 if np.isnan(mean_val) else mean_val + df['pos_qty'] = arr + if 'y' in df.columns: + df['y'] = df['pos_qty'] + df.drop(columns=['year_month'], inplace=True) + return df + +def load_data(session, start_date, end_date): + dates = get_date_range(start_date, end_date) + stmt_pos = select( + pos.c.date, + func.sum(pos.c.qty).label('pos_qty') + ).where( + and_( + pos.c.date >= start_date, + pos.c.date <= end_date, + pos.c.ca01 == '매표소', + pos.c.ca03.in_(VISITOR_CA) + ) + ).group_by(pos.c.date) + pos_data = {row.date: row.pos_qty for row in session.execute(stmt_pos).fetchall()} + + stmt_ga4 = select(ga4.c.date, ga4.c.activeUsers).where( + and_(ga4.c.date >= start_date, ga4.c.date <= end_date) + ) + ga4_data = {row.date: row.activeUsers for row in session.execute(stmt_ga4).fetchall()} + + stmt_weather = select( + weather.c.date, + weather.c.minTa, + weather.c.maxTa, + weather.c.sumRn, + weather.c.avgRhm + ).where( + and_( + weather.c.date >= start_date, + weather.c.date <= end_date, + weather.c.stnId == 99 + ) + ) + weather_data = {row.date: row for row in session.execute(stmt_weather).fetchall()} + + stmt_air = select(air.c.date, air.c.pm25).where( + and_( + air.c.date >= start_date, + air.c.date <= end_date, + air.c.station == '운정' + ) + ) + air_data = {row.date: row.pm25 for row in session.execute(stmt_air).fetchall()} + + records = [] + for d in dates: + key = d.date() if isinstance(d, datetime) else d + record = { + 'date': d, + 'pos_qty': pos_data.get(key, 0), + 'activeUsers': ga4_data.get(key, 0), + 'minTa': getattr(weather_data.get(key), 'minTa', 0) if weather_data.get(key) else 0, + 'maxTa': getattr(weather_data.get(key), 'maxTa', 0) if weather_data.get(key) else 0, + 'sumRn': getattr(weather_data.get(key), 'sumRn', 0) if weather_data.get(key) else 0, + 'avgRhm': getattr(weather_data.get(key), 'avgRhm', 0) if weather_data.get(key) else 0, + 'pm25': air_data.get(key, 0) + } + records.append(record) + + df = pd.DataFrame(records) + df = add_korean_holiday_feature(df) + df = fix_zero_visitors_weighted(df) + df['weekday'] = df['date'].dt.weekday + return df + +def prepare_prophet_df(df): + prophet_df = pd.DataFrame({ + 'ds': df['date'], + 'y': df['pos_qty'].astype(float), + 'minTa': df['minTa'].astype(float), + 'maxTa': df['maxTa'].astype(float), + 'sumRn': df['sumRn'].astype(float), + 'avgRhm': df['avgRhm'].astype(float), + 'pm25': df['pm25'].astype(float), + 'is_holiday': df['is_holiday'].astype(int) + }) + return prophet_df + +def train_and_predict_prophet(prophet_df, forecast_days=7): + prophet_df = fix_zero_visitors_weighted(prophet_df) + prophet_df.fillna({ + 'minTa': 0, + 'maxTa': 0, + 'sumRn': 0, + 'avgRhm': 0, + 'pm25': 0, + 'is_holiday': 0 + }, inplace=True) + + m = Prophet(weekly_seasonality=True, yearly_seasonality=True, daily_seasonality=False) + m.add_regressor('minTa') + m.add_regressor('maxTa') + m.add_regressor('sumRn') + m.add_regressor('avgRhm') + m.add_regressor('pm25') + m.add_regressor('is_holiday') + + m.fit(prophet_df) + future = m.make_future_dataframe(periods=forecast_days) + future_dates = future['ds'].dt.strftime('%Y%m%d').tolist() + + weekly_precip = get_weekly_precip(serviceKey) # {'YYYYMMDD': {'sumRn': x, 'minTa': y, 'maxTa': z, 'avgRhm': w}, ...} + + # 미래 데이터에 강수량 및 기온/습도 반영 + sumRn_list = [] + minTa_list = [] + maxTa_list = [] + avgRhm_list = [] + for dt_str in future_dates: + day_forecast = weekly_precip.get(dt_str, None) + if day_forecast: + sumRn_list.append(float(day_forecast.get('sumRn', 0))) + minTa_list.append(float(day_forecast.get('minTa', 0))) + maxTa_list.append(float(day_forecast.get('maxTa', 0))) + avgRhm_list.append(float(day_forecast.get('avgRhm', 0))) + else: + sumRn_list.append(0) + minTa_list.append(0) + maxTa_list.append(0) + avgRhm_list.append(0) + + future['sumRn'] = sumRn_list + future['minTa'] = minTa_list + future['maxTa'] = maxTa_list + future['avgRhm'] = avgRhm_list + + # pm25는 과거 마지막 데이터 복사 + last_known = prophet_df.iloc[-1] + future['pm25'] = last_known['pm25'] + + # is_holiday 계산 + future['is_holiday'] = future['ds'].apply(lambda d: 1 if is_korean_holiday(d.date()) else 0) + + forecast = m.predict(future) + + # 예측 결과 저장 + output_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'data', 'prophet_result.csv')) + os.makedirs(os.path.dirname(output_path), exist_ok=True) + + df_to_save = forecast[['ds', 'yhat']].copy() + df_to_save.columns = ['date', 'visitor_forecast'] + df_to_save['date'] = df_to_save['date'].dt.strftime("%Y-%m-%d") + + # 오늘 날짜 이후 데이터만 필터링 + today_str = date.today().strftime("%Y-%m-%d") + df_to_save = df_to_save[df_to_save['date'] >= today_str] + + # visitor_forecast를 정수로 변환 + df_to_save['visitor_forecast'] = df_to_save['visitor_forecast'].round().astype(int) + + df_to_save.to_csv(output_path, index=False) + + return forecast + +def train_and_predict_arima(ts, forecast_days=7): + model = ARIMA(ts, order=(5,1,0)) + model_fit = model.fit() + forecast = model_fit.forecast(steps=forecast_days) + return forecast + +def train_and_predict_rf(df, forecast_days=7): + from sklearn.ensemble import RandomForestRegressor + df = df.copy() + df['weekday'] = df['date'].dt.weekday + X = df[['weekday', 'minTa', 'maxTa', 'sumRn', 'avgRhm', 'pm25']] + y = df['pos_qty'] + model = RandomForestRegressor(n_estimators=100, random_state=42) + model.fit(X, y) + future_dates = pd.date_range(df['date'].max() + timedelta(days=1), periods=forecast_days) + future_df = pd.DataFrame({ + 'date': future_dates, + 'weekday': future_dates.weekday, + 'minTa': 0, + 'maxTa': 0, + 'sumRn': 0, + 'avgRhm': 0, + 'pm25': 0 + }) + future_df['pos_qty'] = model.predict(future_df[['weekday', 'minTa', 'maxTa', 'sumRn', 'avgRhm', 'pm25']]) + return future_df + +def main(): + today = datetime.today().date() + start_date = today - timedelta(days=365) + end_date = today + + with Session(db.engine) as session: + df = load_data(session, start_date, end_date) + + prophet_df = prepare_prophet_df(df) + forecast_days = 7 + + forecast = train_and_predict_prophet(prophet_df, forecast_days) + + # 예측 후 정수 변환 + forecast['yhat'] = forecast['yhat'].round().astype(int) + forecast['yhat_lower'] = forecast['yhat_lower'].round().astype(int) + forecast['yhat_upper'] = forecast['yhat_upper'].round().astype(int) + + # 강수량 정보 포함 출력 (오늘 이후는 날씨 예보 데이터로 덮음) + weekly_precip = get_weekly_precip(serviceKey) + + # 최근 10일 예측 결과 출력 + output_df = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail(10).copy() + output_df.columns = ['날짜', '예상 방문객', '하한', '상한'] + + print("이번 주 강수 예보:") + for dt_str, val in weekly_precip.items(): + print(f"{dt_str}: 강수량={val['sumRn']:.1f}mm, 최저기온={val['minTa']}, 최고기온={val['maxTa']}, 습도={val['avgRhm']:.1f}%") + + print("\n예측 방문객:") + print(output_df.to_string(index=False)) + +if __name__ == '__main__': + main()