기존 방문객 조회, 전년도 데이터 비교, 예상 방문객 계산

This commit is contained in:
2025-07-08 14:37:28 +09:00
parent 1f054a4eae
commit be1e41528f
2 changed files with 520 additions and 0 deletions

View File

@ -0,0 +1,242 @@
# weekly_visitor_forecast.py
import os, sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from datetime import date, timedelta, datetime
from collections import defaultdict
import pandas as pd
from sqlalchemy import select, func
from weather_forecast import get_weekly_precip
from conf import db, db_schema
from lib.holiday import is_korean_holiday
from lib.common import load_config
config = load_config()
visitor_ca_filter = config.get('POS', {}).get('VISITOR_CA', [])
ga4_by_date = db_schema.ga4_by_date
weather = db_schema.weather
air = db_schema.air
pos = db_schema.pos
engine = db.engine
def get_recent_dates(today=None, days=14):
today = today or date.today()
return [today - timedelta(days=i) for i in reversed(range(days))]
def get_this_week_dates(today=None):
today = today or date.today()
weekday = today.weekday()
return [today + timedelta(days=i) for i in range(7 - weekday)]
def get_last_year_same_weekdays(dates):
return [d.replace(year=d.year - 1) for d in dates]
def pm25_grade(value):
if value is None:
return ''
if value <= 15:
return '좋음'
elif value <= 35:
return '보통'
elif value <= 75:
return '나쁨'
else:
return '매우나쁨'
def fetch_data_for_dates(date_list):
session = db.get_session()
data = defaultdict(dict)
try:
# GA4 activeUsers
stmt = (
select(ga4_by_date.c.date, func.sum(ga4_by_date.c.activeUsers))
.where(ga4_by_date.c.date.in_(date_list))
.group_by(ga4_by_date.c.date)
)
for d, val in session.execute(stmt):
data[d]['웹 방문자 수'] = val
# POS 입장객 수
stmt = (
select(pos.c.date, func.sum(pos.c.qty))
.where(
(pos.c.date.in_(date_list)) &
(pos.c.ca01 == '매표소') &
(pos.c.ca03.in_(visitor_ca_filter))
)
.group_by(pos.c.date)
)
for d, val in session.execute(stmt):
data[d]['입장객 수'] = val
# 날씨 정보
stmt = (
select(
weather.c.date,
func.min(weather.c.minTa),
func.max(weather.c.maxTa),
func.avg(weather.c.avgRhm),
func.sum(weather.c.sumRn)
)
.where(weather.c.date.in_(date_list))
.group_by(weather.c.date)
)
for row in session.execute(stmt):
d, minTa, maxTa, rhm, rn = row
data[d]['최저기온'] = round(minTa or 0, 1)
data[d]['최고기온'] = round(maxTa or 0, 1)
data[d]['습도'] = round(rhm or 0, 1)
data[d]['강수량'] = round(rn or 0, 1)
# 미세먼지 (pm25)
stmt = (
select(air.c.date, func.avg(air.c.pm25))
.where(air.c.date.in_(date_list))
.group_by(air.c.date)
)
for d, pm25 in session.execute(stmt):
data[d]['미세먼지'] = pm25_grade(pm25)
finally:
session.close()
return data
def load_prophet_forecast(file_path=None):
if file_path is None:
file_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'data', 'prophet_result.csv'))
print(f"[DEBUG] Load prophet forecast from: {file_path}")
if not os.path.exists(file_path):
print(f"[ERROR] 파일이 존재하지 않습니다: {file_path}")
return pd.Series(dtype=float)
try:
df = pd.read_csv(file_path)
# 컬럼명 출력 확인
print(f"[DEBUG] CSV columns: {df.columns.tolist()}")
if 'date' not in df.columns or 'visitor_forecast' not in df.columns:
print("[ERROR] 필요한 컬럼이 CSV에 없습니다.")
return pd.Series(dtype=float)
df['date'] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)
return df['visitor_forecast']
except Exception as e:
print(f"[ERROR] Prophet 예측 결과 불러오기 실패: {e}")
return pd.Series(dtype=float)
def build_dataframe(dates, data, use_forecast_after=None):
"""
use_forecast_after: datetime.date or None
지정한 날짜 이후부터는 '예상 방문자''입장객 수' 대체
"""
records = []
for d in dates:
predicted = data.get(d, {}).get('예상 방문자')
if use_forecast_after is not None and d >= use_forecast_after and predicted is not None:
입장객수 = int(predicted)
else:
입장객수 = data.get(d, {}).get('입장객 수', 0)
r = {
'날짜': d.strftime('%Y-%m-%d'),
'요일': ['', '', '', '', '', '', ''][d.weekday()],
'공휴일': '' if is_korean_holiday(d) else '',
'웹 방문자 수': data.get(d, {}).get('웹 방문자 수', 0),
'입장객 수': 입장객수,
'최저기온': data.get(d, {}).get('최저기온', ''),
'최고기온': data.get(d, {}).get('최고기온', ''),
'습도': data.get(d, {}).get('습도', ''),
'강수량': data.get(d, {}).get('강수량', ''),
'미세먼지': data.get(d, {}).get('미세먼지', ''),
}
records.append(r)
return pd.DataFrame(records)
def main():
today = date.today()
# 이번 주 일요일 (주말)
weekday = today.weekday()
sunday = today + timedelta(days=(6 - weekday))
# 최근 2주 및 작년 동일 요일 (최근 2주는 sunday까지 포함)
recent_dates = [sunday - timedelta(days=i) for i in reversed(range(14))]
prev_year_dates = get_last_year_same_weekdays(recent_dates)
# 이번 주 예상 대상 (오늘부터 일요일까지)
this_week_dates = [today + timedelta(days=i) for i in range(7 - weekday)]
# 데이터 조회
recent_data = fetch_data_for_dates(recent_dates)
prev_year_data = fetch_data_for_dates(prev_year_dates)
forecast_data = fetch_data_for_dates(this_week_dates)
# 결측 강수량 보정 - 오늘 이후 날짜가 비어있거나 강수량 없으면 날씨예보로 채움
weekly_precip = get_weekly_precip(load_config()['DATA_API']['serviceKey'])
for d in recent_dates:
if d >= today and (d not in recent_data or '강수량' not in recent_data[d]):
dt_str = d.strftime('%Y%m%d')
if dt_str in weekly_precip:
if d not in recent_data:
recent_data[d] = {}
recent_data[d]['강수량'] = round(float(weekly_precip[dt_str]['sumRn']), 1)
recent_data[d]['최저기온'] = round(float(weekly_precip[dt_str]['minTa']), 1)
recent_data[d]['최고기온'] = round(float(weekly_precip[dt_str]['maxTa']), 1)
recent_data[d]['습도'] = round(float(weekly_precip[dt_str]['avgRhm']), 1)
# prophet 예측 결과 불러오기 및 이번 주 예상 데이터에 병합
prophet_forecast = load_prophet_forecast()
for d in this_week_dates:
d_ts = pd.Timestamp(d)
has_forecast = d_ts in prophet_forecast.index
print(f"[DEBUG] 날짜 {d} (Timestamp {d_ts}) 예측 데이터 존재 여부: {has_forecast}")
if has_forecast:
if d not in forecast_data:
forecast_data[d] = {}
forecast_data[d]['예상 방문자'] = round(float(prophet_forecast.loc[d_ts]), 0)
else:
if d not in forecast_data:
forecast_data[d] = {}
forecast_data[d]['예상 방문자'] = None
# 최근 2주 데이터에도 오늘 이후 날짜에 대해 예상 방문자 병합
for d in recent_dates:
d_ts = pd.Timestamp(d)
if d >= today and d_ts in prophet_forecast.index:
if d not in recent_data:
recent_data[d] = {}
recent_data[d]['예상 방문자'] = round(float(prophet_forecast.loc[d_ts]), 0)
# 데이터프레임 생성
df_recent = build_dataframe(recent_dates, recent_data, use_forecast_after=today)
df_prev = build_dataframe(prev_year_dates, prev_year_data)
# 출력 설정
pd.set_option('display.unicode.east_asian_width', True)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 200)
print("📊 최근 2주간 방문자 현황:")
print(df_recent.to_string(index=False))
print("\n📈 작년 동일 요일 데이터:")
print(df_prev.to_string(index=False))
if __name__ == "__main__":
main()

View File

@ -0,0 +1,278 @@
#weekly_visitor_forecast_prophet.py
import os, sys
import re, requests
from sqlalchemy import select, and_, func
from sqlalchemy.orm import Session
from prophet import Prophet
from statsmodels.tsa.arima.model import ARIMA
from sklearn.ensemble import RandomForestRegressor
import numpy as np
import pandas as pd
from datetime import date, datetime, timedelta
# 경로 설정: 프로젝트 루트 conf 폴더 내 db 및 스키마 모듈 임포트
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from conf import db, db_schema
from weather_forecast import get_weekly_precip # 변경된 날씨 예보 함수 임포트
from lib.holiday import is_korean_holiday # holiday.py의 DB 기반 휴일 판단 함수
from lib.common import load_config
# DB 테이블 객체 초기화
pos = db_schema.pos
ga4 = db_schema.ga4_by_date
weather = db_schema.weather
air = db_schema.air
config = load_config()
serviceKey = config['DATA_API']['serviceKey']
VISITOR_CA = tuple(config['POS']['VISITOR_CA'])
# --- 데이터 로딩 및 전처리 ---
def get_date_range(start_date, end_date):
return pd.date_range(start_date, end_date).to_pydatetime().tolist()
def add_korean_holiday_feature(df):
df['is_holiday'] = df['date'].apply(lambda d: 1 if is_korean_holiday(d.date()) else 0)
return df
def fix_zero_visitors_weighted(df):
df = df.copy()
if 'date' not in df.columns and 'ds' in df.columns:
df['date'] = df['ds']
if 'pos_qty' not in df.columns and 'y' in df.columns:
df['pos_qty'] = df['y']
if 'is_holiday' not in df.columns:
raise ValueError("DataFrame에 'is_holiday' 컬럼이 필요합니다.")
df['year_month'] = df['date'].dt.strftime('%Y-%m')
monthly_means = df[df['pos_qty'] > 0].groupby(['year_month', 'is_holiday'])['pos_qty'].mean()
arr = df['pos_qty'].values.copy()
for i in range(len(arr)):
if arr[i] == 0:
ym = df.iloc[i]['year_month']
holiday_flag = df.iloc[i]['is_holiday']
mean_val = monthly_means.get((ym, holiday_flag), np.nan)
arr[i] = 0 if np.isnan(mean_val) else mean_val
df['pos_qty'] = arr
if 'y' in df.columns:
df['y'] = df['pos_qty']
df.drop(columns=['year_month'], inplace=True)
return df
def load_data(session, start_date, end_date):
dates = get_date_range(start_date, end_date)
stmt_pos = select(
pos.c.date,
func.sum(pos.c.qty).label('pos_qty')
).where(
and_(
pos.c.date >= start_date,
pos.c.date <= end_date,
pos.c.ca01 == '매표소',
pos.c.ca03.in_(VISITOR_CA)
)
).group_by(pos.c.date)
pos_data = {row.date: row.pos_qty for row in session.execute(stmt_pos).fetchall()}
stmt_ga4 = select(ga4.c.date, ga4.c.activeUsers).where(
and_(ga4.c.date >= start_date, ga4.c.date <= end_date)
)
ga4_data = {row.date: row.activeUsers for row in session.execute(stmt_ga4).fetchall()}
stmt_weather = select(
weather.c.date,
weather.c.minTa,
weather.c.maxTa,
weather.c.sumRn,
weather.c.avgRhm
).where(
and_(
weather.c.date >= start_date,
weather.c.date <= end_date,
weather.c.stnId == 99
)
)
weather_data = {row.date: row for row in session.execute(stmt_weather).fetchall()}
stmt_air = select(air.c.date, air.c.pm25).where(
and_(
air.c.date >= start_date,
air.c.date <= end_date,
air.c.station == '운정'
)
)
air_data = {row.date: row.pm25 for row in session.execute(stmt_air).fetchall()}
records = []
for d in dates:
key = d.date() if isinstance(d, datetime) else d
record = {
'date': d,
'pos_qty': pos_data.get(key, 0),
'activeUsers': ga4_data.get(key, 0),
'minTa': getattr(weather_data.get(key), 'minTa', 0) if weather_data.get(key) else 0,
'maxTa': getattr(weather_data.get(key), 'maxTa', 0) if weather_data.get(key) else 0,
'sumRn': getattr(weather_data.get(key), 'sumRn', 0) if weather_data.get(key) else 0,
'avgRhm': getattr(weather_data.get(key), 'avgRhm', 0) if weather_data.get(key) else 0,
'pm25': air_data.get(key, 0)
}
records.append(record)
df = pd.DataFrame(records)
df = add_korean_holiday_feature(df)
df = fix_zero_visitors_weighted(df)
df['weekday'] = df['date'].dt.weekday
return df
def prepare_prophet_df(df):
prophet_df = pd.DataFrame({
'ds': df['date'],
'y': df['pos_qty'].astype(float),
'minTa': df['minTa'].astype(float),
'maxTa': df['maxTa'].astype(float),
'sumRn': df['sumRn'].astype(float),
'avgRhm': df['avgRhm'].astype(float),
'pm25': df['pm25'].astype(float),
'is_holiday': df['is_holiday'].astype(int)
})
return prophet_df
def train_and_predict_prophet(prophet_df, forecast_days=7):
prophet_df = fix_zero_visitors_weighted(prophet_df)
prophet_df.fillna({
'minTa': 0,
'maxTa': 0,
'sumRn': 0,
'avgRhm': 0,
'pm25': 0,
'is_holiday': 0
}, inplace=True)
m = Prophet(weekly_seasonality=True, yearly_seasonality=True, daily_seasonality=False)
m.add_regressor('minTa')
m.add_regressor('maxTa')
m.add_regressor('sumRn')
m.add_regressor('avgRhm')
m.add_regressor('pm25')
m.add_regressor('is_holiday')
m.fit(prophet_df)
future = m.make_future_dataframe(periods=forecast_days)
future_dates = future['ds'].dt.strftime('%Y%m%d').tolist()
weekly_precip = get_weekly_precip(serviceKey) # {'YYYYMMDD': {'sumRn': x, 'minTa': y, 'maxTa': z, 'avgRhm': w}, ...}
# 미래 데이터에 강수량 및 기온/습도 반영
sumRn_list = []
minTa_list = []
maxTa_list = []
avgRhm_list = []
for dt_str in future_dates:
day_forecast = weekly_precip.get(dt_str, None)
if day_forecast:
sumRn_list.append(float(day_forecast.get('sumRn', 0)))
minTa_list.append(float(day_forecast.get('minTa', 0)))
maxTa_list.append(float(day_forecast.get('maxTa', 0)))
avgRhm_list.append(float(day_forecast.get('avgRhm', 0)))
else:
sumRn_list.append(0)
minTa_list.append(0)
maxTa_list.append(0)
avgRhm_list.append(0)
future['sumRn'] = sumRn_list
future['minTa'] = minTa_list
future['maxTa'] = maxTa_list
future['avgRhm'] = avgRhm_list
# pm25는 과거 마지막 데이터 복사
last_known = prophet_df.iloc[-1]
future['pm25'] = last_known['pm25']
# is_holiday 계산
future['is_holiday'] = future['ds'].apply(lambda d: 1 if is_korean_holiday(d.date()) else 0)
forecast = m.predict(future)
# 예측 결과 저장
output_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'data', 'prophet_result.csv'))
os.makedirs(os.path.dirname(output_path), exist_ok=True)
df_to_save = forecast[['ds', 'yhat']].copy()
df_to_save.columns = ['date', 'visitor_forecast']
df_to_save['date'] = df_to_save['date'].dt.strftime("%Y-%m-%d")
# 오늘 날짜 이후 데이터만 필터링
today_str = date.today().strftime("%Y-%m-%d")
df_to_save = df_to_save[df_to_save['date'] >= today_str]
# visitor_forecast를 정수로 변환
df_to_save['visitor_forecast'] = df_to_save['visitor_forecast'].round().astype(int)
df_to_save.to_csv(output_path, index=False)
return forecast
def train_and_predict_arima(ts, forecast_days=7):
model = ARIMA(ts, order=(5,1,0))
model_fit = model.fit()
forecast = model_fit.forecast(steps=forecast_days)
return forecast
def train_and_predict_rf(df, forecast_days=7):
from sklearn.ensemble import RandomForestRegressor
df = df.copy()
df['weekday'] = df['date'].dt.weekday
X = df[['weekday', 'minTa', 'maxTa', 'sumRn', 'avgRhm', 'pm25']]
y = df['pos_qty']
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X, y)
future_dates = pd.date_range(df['date'].max() + timedelta(days=1), periods=forecast_days)
future_df = pd.DataFrame({
'date': future_dates,
'weekday': future_dates.weekday,
'minTa': 0,
'maxTa': 0,
'sumRn': 0,
'avgRhm': 0,
'pm25': 0
})
future_df['pos_qty'] = model.predict(future_df[['weekday', 'minTa', 'maxTa', 'sumRn', 'avgRhm', 'pm25']])
return future_df
def main():
today = datetime.today().date()
start_date = today - timedelta(days=365)
end_date = today
with Session(db.engine) as session:
df = load_data(session, start_date, end_date)
prophet_df = prepare_prophet_df(df)
forecast_days = 7
forecast = train_and_predict_prophet(prophet_df, forecast_days)
# 예측 후 정수 변환
forecast['yhat'] = forecast['yhat'].round().astype(int)
forecast['yhat_lower'] = forecast['yhat_lower'].round().astype(int)
forecast['yhat_upper'] = forecast['yhat_upper'].round().astype(int)
# 강수량 정보 포함 출력 (오늘 이후는 날씨 예보 데이터로 덮음)
weekly_precip = get_weekly_precip(serviceKey)
# 최근 10일 예측 결과 출력
output_df = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail(10).copy()
output_df.columns = ['날짜', '예상 방문객', '하한', '상한']
print("이번 주 강수 예보:")
for dt_str, val in weekly_precip.items():
print(f"{dt_str}: 강수량={val['sumRn']:.1f}mm, 최저기온={val['minTa']}, 최고기온={val['maxTa']}, 습도={val['avgRhm']:.1f}%")
print("\n예측 방문객:")
print(output_df.to_string(index=False))
if __name__ == '__main__':
main()