7 Commits
web ... lib

7 changed files with 1020 additions and 242 deletions

View File

@ -0,0 +1,299 @@
import os
import sys
import re
import requests
from sqlalchemy import select, and_, func
from sqlalchemy.orm import Session
from prophet import Prophet
from statsmodels.tsa.arima.model import ARIMA
import numpy as np
import pandas as pd
from datetime import date, datetime, timedelta
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from conf import db, db_schema
from weather_forecast import get_weekly_precip
from lib.holiday import is_korean_holiday
from lib.common import load_config
# DB 테이블 객체 초기화
pos = db_schema.pos
ga4 = db_schema.ga4_by_date
weather = db_schema.weather
air = db_schema.air
# config 불러오기
config = load_config()
serviceKey = config['DATA_API']['serviceKey']
weight_cfg = config.get('FORECAST_WEIGHT', {})
VISITOR_CA = tuple(config['POS']['VISITOR_CA'])
visitor_forecast_multiplier = weight_cfg.get('visitor_forecast_multiplier', 1.0)
minTa_weight = weight_cfg.get('minTa', 1.0)
maxTa_weight = weight_cfg.get('maxTa', 1.0)
sumRn_weight = weight_cfg.get('sumRn', 1.0)
avgRhm_weight = weight_cfg.get('avgRhm', 1.0)
pm25_weight = weight_cfg.get('pm25', 1.0)
is_holiday_weight = weight_cfg.get('is_holiday', 1.0)
def get_date_range(start_date, end_date):
return pd.date_range(start_date, end_date).to_pydatetime().tolist()
def add_korean_holiday_feature(df):
df['is_holiday'] = df['date'].apply(lambda d: 1 if is_korean_holiday(d.date()) else 0)
return df
def fix_zero_visitors_weighted(df):
df = df.copy()
if 'date' not in df.columns and 'ds' in df.columns:
df['date'] = df['ds']
if 'pos_qty' not in df.columns and 'y' in df.columns:
df['pos_qty'] = df['y']
if 'is_holiday' not in df.columns:
raise ValueError("DataFrame에 'is_holiday' 컬럼이 필요합니다.")
df['year_month'] = df['date'].dt.strftime('%Y-%m')
monthly_means = df[df['pos_qty'] > 0].groupby(['year_month', 'is_holiday'])['pos_qty'].mean()
arr = df['pos_qty'].values.copy()
for i in range(len(arr)):
if arr[i] == 0:
ym = df.iloc[i]['year_month']
holiday_flag = df.iloc[i]['is_holiday']
mean_val = monthly_means.get((ym, holiday_flag), np.nan)
arr[i] = 0 if np.isnan(mean_val) else mean_val
df['pos_qty'] = arr
if 'y' in df.columns:
df['y'] = df['pos_qty']
df.drop(columns=['year_month'], inplace=True)
return df
def load_data(session, start_date, end_date):
dates = get_date_range(start_date, end_date)
stmt_pos = select(
pos.c.date,
func.sum(pos.c.qty).label('pos_qty')
).where(
and_(
pos.c.date >= start_date,
pos.c.date <= end_date,
pos.c.ca01 == '매표소',
pos.c.ca03.in_(VISITOR_CA)
)
).group_by(pos.c.date)
stmt_ga4 = select(ga4.c.date, ga4.c.activeUsers).where(
and_(ga4.c.date >= start_date, ga4.c.date <= end_date)
)
stmt_weather = select(
weather.c.date,
weather.c.minTa,
weather.c.maxTa,
weather.c.sumRn,
weather.c.avgRhm
).where(
and_(
weather.c.date >= start_date,
weather.c.date <= end_date,
weather.c.stnId == 99
)
)
stmt_air = select(air.c.date, air.c.pm25).where(
and_(
air.c.date >= start_date,
air.c.date <= end_date,
air.c.station == '운정'
)
)
pos_data = {row['date']: row['pos_qty'] for row in session.execute(stmt_pos).mappings().all()}
ga4_data = {row['date']: row['activeUsers'] for row in session.execute(stmt_ga4).mappings().all()}
weather_data = {row['date']: row for row in session.execute(stmt_weather).mappings().all()}
air_data = {row['date']: row['pm25'] for row in session.execute(stmt_air).mappings().all()}
records = []
for d in dates:
key = d.date() if isinstance(d, datetime) else d
record = {
'date': d,
'pos_qty': pos_data.get(key, 0),
'activeUsers': ga4_data.get(key, 0),
'minTa': weather_data.get(key, {}).get('minTa', 0) if weather_data.get(key) else 0,
'maxTa': weather_data.get(key, {}).get('maxTa', 0) if weather_data.get(key) else 0,
'sumRn': weather_data.get(key, {}).get('sumRn', 0) if weather_data.get(key) else 0,
'avgRhm': weather_data.get(key, {}).get('avgRhm', 0) if weather_data.get(key) else 0,
'pm25': air_data.get(key, 0)
}
records.append(record)
df = pd.DataFrame(records)
df = add_korean_holiday_feature(df)
df = fix_zero_visitors_weighted(df)
df['weekday'] = df['date'].dt.weekday
return df
def prepare_prophet_df(df):
prophet_df = pd.DataFrame({
'ds': df['date'],
'y': df['pos_qty'].astype(float),
'minTa': df['minTa'].astype(float),
'maxTa': df['maxTa'].astype(float),
'sumRn': df['sumRn'].astype(float),
'avgRhm': df['avgRhm'].astype(float),
'pm25': df['pm25'].astype(float),
'is_holiday': df['is_holiday'].astype(int)
})
return prophet_df
def train_and_predict_prophet(prophet_df, forecast_days=7):
prophet_df = prophet_df.copy()
# 결측값을 전일과 다음날의 평균치로 선형 보간 처리
for col in ['minTa', 'maxTa', 'sumRn', 'avgRhm', 'pm25', 'is_holiday']:
if col in prophet_df.columns:
prophet_df[col] = prophet_df[col].interpolate(method='linear', limit_direction='both')
# 보간 후 남은 결측치는 0으로 처리
prophet_df.fillna({
'minTa': 0,
'maxTa': 0,
'sumRn': 0,
'avgRhm': 0,
'pm25': 0,
'is_holiday': 0
}, inplace=True)
# 가중치 적용
prophet_df['minTa'] *= minTa_weight
prophet_df['maxTa'] *= maxTa_weight
prophet_df['sumRn'] *= sumRn_weight
prophet_df['avgRhm'] *= avgRhm_weight
prophet_df['pm25'] *= pm25_weight
prophet_df['is_holiday'] *= is_holiday_weight
# 고정 0 방문객값 보정
prophet_df = fix_zero_visitors_weighted(prophet_df)
# Prophet 모델 정의 및 학습
m = Prophet(weekly_seasonality=True, yearly_seasonality=True, daily_seasonality=False)
m.add_regressor('minTa')
m.add_regressor('maxTa')
m.add_regressor('sumRn')
m.add_regressor('avgRhm')
m.add_regressor('pm25')
m.add_regressor('is_holiday')
m.fit(prophet_df)
future = m.make_future_dataframe(periods=forecast_days)
# 미래 데이터에 날씨 예보값과 가중치 적용
weekly_precip = get_weekly_precip(serviceKey)
sumRn_list, minTa_list, maxTa_list, avgRhm_list = [], [], [], []
for dt in future['ds']:
dt_str = dt.strftime('%Y%m%d')
day_forecast = weekly_precip.get(dt_str, None)
if day_forecast:
sumRn_list.append(float(day_forecast.get('sumRn', 0)) * sumRn_weight)
minTa_list.append(float(day_forecast.get('minTa', 0)) * minTa_weight)
maxTa_list.append(float(day_forecast.get('maxTa', 0)) * maxTa_weight)
avgRhm_list.append(float(day_forecast.get('avgRhm', 0)) * avgRhm_weight)
else:
sumRn_list.append(0)
minTa_list.append(0)
maxTa_list.append(0)
avgRhm_list.append(0)
future['sumRn'] = sumRn_list
future['minTa'] = minTa_list
future['maxTa'] = maxTa_list
future['avgRhm'] = avgRhm_list
# pm25는 마지막 과거 데이터값에 가중치 적용
last_known = prophet_df.iloc[-1]
future['pm25'] = last_known['pm25'] * pm25_weight
# 휴일 여부도 가중치 곱해서 적용
future['is_holiday'] = future['ds'].apply(lambda d: 1 if is_korean_holiday(d.date()) else 0) * is_holiday_weight
forecast = m.predict(future)
# 방문객 예측값에 multiplier 적용 및 정수형 변환
forecast['yhat'] = (forecast['yhat'] * visitor_forecast_multiplier).round().astype(int)
forecast['yhat_lower'] = (forecast['yhat_lower'] * visitor_forecast_multiplier).round().astype(int)
forecast['yhat_upper'] = (forecast['yhat_upper'] * visitor_forecast_multiplier).round().astype(int)
# 결과 CSV 저장
output_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'data', 'prophet_result.csv'))
os.makedirs(os.path.dirname(output_path), exist_ok=True)
df_to_save = forecast[['ds', 'yhat']].copy()
df_to_save.columns = ['date', 'visitor_forecast']
df_to_save['date'] = df_to_save['date'].dt.strftime("%Y-%m-%d")
today_str = date.today().strftime("%Y-%m-%d")
df_to_save = df_to_save[df_to_save['date'] >= today_str]
df_to_save.to_csv(output_path, index=False)
return forecast
def train_and_predict_arima(ts, forecast_days=7):
model = ARIMA(ts, order=(5,1,0))
model_fit = model.fit()
forecast = model_fit.forecast(steps=forecast_days)
return forecast
def train_and_predict_rf(df, forecast_days=7):
from sklearn.ensemble import RandomForestRegressor
df = df.copy()
df['weekday'] = df['date'].dt.weekday
X = df[['weekday', 'minTa', 'maxTa', 'sumRn', 'avgRhm', 'pm25']]
y = df['pos_qty']
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X, y)
future_dates = pd.date_range(df['date'].max() + timedelta(days=1), periods=forecast_days)
future_df = pd.DataFrame({
'date': future_dates,
'weekday': future_dates.weekday,
'minTa': 0,
'maxTa': 0,
'sumRn': 0,
'avgRhm': 0,
'pm25': 0
})
future_df['pos_qty'] = model.predict(future_df[['weekday', 'minTa', 'maxTa', 'sumRn', 'avgRhm', 'pm25']])
return future_df
def main():
today = datetime.today().date()
start_date = today - timedelta(days=365)
end_date = today
with Session(db.engine) as session:
df = load_data(session, start_date, end_date)
prophet_df = prepare_prophet_df(df)
forecast_days = 7
forecast = train_and_predict_prophet(prophet_df, forecast_days)
forecast['yhat'] = forecast['yhat'].round().astype(int)
forecast['yhat_lower'] = forecast['yhat_lower'].round().astype(int)
forecast['yhat_upper'] = forecast['yhat_upper'].round().astype(int)
weekly_precip = get_weekly_precip(serviceKey)
output_df = forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']].tail(10).copy()
output_df.columns = ['날짜', '예상 방문객', '하한', '상한']
print("이번 주 강수 예보:")
for dt_str, val in weekly_precip.items():
print(f"{dt_str}: 강수량={val['sumRn']:.1f}mm, 최저기온={val['minTa']}, 최고기온={val['maxTa']}, 습도={val['avgRhm']:.1f}%")
print("\n예측 방문객:")
print(output_df.to_string(index=False))
if __name__ == '__main__':
main()

87
lib/visitor_update.py Normal file
View File

@ -0,0 +1,87 @@
# ./lib/visitor_update.py
import os
import sys
import pandas as pd
from datetime import datetime
# 프로젝트 루트 경로 추가
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from conf.db import get_session
from conf.db_schema import pos
from sqlalchemy import select
# 상수 정의
FILE_PATH = os.path.join(os.path.dirname(__file__), '..', 'data', 'visitor_raw.xlsx')
CA01 = '매표소'
CA02 = 'POS'
CA03 = '입장료'
BARCODE = 11111111
DEFAULT_INT = 0
def load_excel(filepath):
df = pd.read_excel(filepath)
df.columns = ['date', 'qty']
df['date'] = pd.to_datetime(df['date']).dt.date
return df
def get_existing_dates(session, dates):
"""DB에 이미 존재하는 날짜 목록 조회"""
stmt = select(pos.c.date).where(pos.c.date.in_(dates))
result = session.execute(stmt).scalars().all()
return set(result)
def insert_data(df):
session = get_session()
try:
all_dates = set(df['date'].unique())
existing_dates = get_existing_dates(session, all_dates)
# 중복 날짜 제거
if existing_dates:
print(f"[INFO] 이미 존재하는 날짜는 건너뜁니다: {sorted(existing_dates)}")
df = df[~df['date'].isin(existing_dates)]
if df.empty:
print("[INFO] 삽입할 신규 데이터가 없습니다.")
return
for _, row in df.iterrows():
record = {
'date': row['date'],
'ca01': CA01,
'ca02': CA02,
'ca03': CA03,
'barcode': BARCODE,
'name': '입장객',
'qty': int(row['qty']),
'tot_amount': DEFAULT_INT,
'tot_discount': DEFAULT_INT,
'actual_amount': DEFAULT_INT
}
session.execute(pos.insert().values(**record))
session.commit()
print(f"[INFO] {len(df)}건의 데이터가 성공적으로 삽입되었습니다.")
except Exception as e:
session.rollback()
print(f"[ERROR] 데이터 저장 중 오류 발생: {e}")
finally:
session.close()
def main():
if not os.path.exists(FILE_PATH):
print(f"[ERROR] 파일을 찾을 수 없습니다: {FILE_PATH}")
return
df = load_excel(FILE_PATH)
insert_data(df)
if __name__ == "__main__":
main()

96
lib/weatherFileUpdate.py Normal file
View File

@ -0,0 +1,96 @@
# weatherFileUpdate.py
import os
import csv
import sys
from datetime import datetime
from sqlalchemy import select, and_
# 경로 설정
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from conf import db, db_schema
CSV_FILENAME = 'weather.csv' # 데이터 파일명
CSV_PATH = os.path.join(os.path.dirname(__file__), '../data', CSV_FILENAME)
weather_table = db_schema.fg_manager_static_weather
STN_ID = 99 # 고정된 stnId
def parse_float(value):
try:
f = float(value)
return f if f == f else 0.0 # NaN 체크, NaN일 경우 0.0
except (ValueError, TypeError):
return 0.0
def load_csv(filepath):
rows = []
try:
with open(filepath, newline='', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
try:
date = datetime.strptime(row['날짜'], '%Y-%m-%d').date()
data = {
'date': date,
'stnId': STN_ID,
'minTa': parse_float(row.get('최저기온', 0)),
'maxTa': parse_float(row.get('최고기온', 0)),
'sumRn': parse_float(row.get('일강수량\n(mm)', 0)),
'avgWs': parse_float(row.get('평균풍속\n(m/s)', 0)),
'avgRhm': parse_float(row.get('습도', 0)),
}
rows.append(data)
except Exception as e:
print(f"[WARN] 잘못된 행 건너뜀: {row} / 오류: {e}")
except FileNotFoundError:
print(f"[ERROR] 파일이 존재하지 않음: {filepath}")
sys.exit(1)
return rows
def row_exists(session, date, stnId):
stmt = select(weather_table.c.date).where(
and_(
weather_table.c.date == date,
weather_table.c.stnId == stnId
)
)
return session.execute(stmt).scalar() is not None
def insert_rows(rows):
inserted = 0
skipped = 0
session = db.get_session()
try:
for row in rows:
if row_exists(session, row['date'], row['stnId']):
skipped += 1
continue
session.execute(weather_table.insert().values(**row))
inserted += 1
session.commit()
except Exception as e:
session.rollback()
print(f"[ERROR] DB 삽입 실패: {e}")
finally:
session.close()
return inserted, skipped
def main():
print(f"[INFO] CSV 파일 로드: {CSV_PATH}")
rows = load_csv(CSV_PATH)
print(f"[INFO] 총 행 수: {len(rows)}")
if not rows:
print("[WARN] 삽입할 데이터가 없습니다.")
return
inserted, skipped = insert_rows(rows)
print(f"[DONE] 삽입 완료: {inserted}건, 건너뜀: {skipped}")
if __name__ == "__main__":
main()

View File

@ -1,6 +1,12 @@
import requests
import os
import json
from datetime import datetime, timedelta
def valid_until_hours(cached, hours=2):
ts = datetime.fromisoformat(cached['ts'])
return datetime.now() - ts < timedelta(hours=hours)
def parse_precip(value):
if value == '강수없음':
return 0.0
@ -12,6 +18,27 @@ def parse_precip(value):
except:
return 0.0
def ensure_cache_dir():
cache_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'data', 'cache'))
os.makedirs(cache_dir, exist_ok=True)
return cache_dir
def get_cache_or_request(name, valid_until_fn, request_fn):
cache_dir = ensure_cache_dir()
today = datetime.now().strftime("%Y%m%d")
cache_file = os.path.join(cache_dir, f"{name}_{today}.json")
if os.path.exists(cache_file):
with open(cache_file, 'r', encoding='utf-8') as f:
cached = json.load(f)
if valid_until_fn(cached):
return cached['data']
data = request_fn()
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump({'ts': datetime.now().isoformat(), 'data': data}, f, ensure_ascii=False)
return data
def get_latest_base_date_time(now=None):
if now is None:
now = datetime.now()
@ -30,6 +57,7 @@ def get_latest_base_date_time(now=None):
return base_date, candidate
def get_daily_ultra_forecast(serviceKey):
def request():
base_date, base_time = get_latest_base_date_time()
url = "http://apis.data.go.kr/1360000/VilageFcstInfoService_2.0/getUltraSrtFcst"
params = {
@ -78,10 +106,19 @@ def get_daily_ultra_forecast(serviceKey):
maxTa = max(vals['maxTa']) if vals['maxTa'] else 0
avgRhm = sum(vals['rhm']) / len(vals['rhm']) if vals['rhm'] else 0
sumRn = round(vals['sumRn'], 2)
result[dt] = {'sumRn': sumRn, 'minTa': minTa, 'maxTa': maxTa, 'avgRhm': avgRhm}
result[dt] = {
'sumRn': round(sumRn, 1),
'minTa': round(minTa, 1),
'maxTa': round(maxTa, 1),
'avgRhm': round(avgRhm, 1)
}
return result
return get_cache_or_request('ultra_forecast', lambda cached: valid_until_hours(cached, 2), request)
def get_daily_vilage_forecast(serviceKey):
def request():
base_date, _ = get_latest_base_date_time()
url = "http://apis.data.go.kr/1360000/VilageFcstInfoService_2.0/getVilageFcst"
params = {
@ -135,19 +172,19 @@ def get_daily_vilage_forecast(serviceKey):
avgRhm = sum(vals['rhm']) / len(vals['rhm']) if vals['rhm'] else 0
sumRn = round(vals['sumRn'], 2)
result[dt] = {
'sumRn': sumRn,
'minTa': minTa,
'maxTa': maxTa,
'avgRhm': avgRhm
'sumRn': round(sumRn, 1),
'minTa': round(minTa, 1),
'maxTa': round(maxTa, 1),
'avgRhm': round(avgRhm, 1)
}
return result
return get_cache_or_request('vilage_forecast', lambda cached: valid_until_hours(cached, 6), request)
def get_midterm_forecast(serviceKey, regId='11B20305'):
# 중기 강수확률 예보
def request():
url = "http://apis.data.go.kr/1360000/MidFcstInfoService/getMidLandFcst"
# 발표 시각 계산: 06시 또는 18시만 존재
now = datetime.now()
if now.hour < 6:
tmFc = (now - timedelta(days=1)).strftime("%Y%m%d") + "1800"
@ -170,18 +207,14 @@ def get_midterm_forecast(serviceKey, regId='11B20305'):
resp.raise_for_status()
data = resp.json()
items = data.get('response', {}).get('body', {}).get('items', {}).get('item', [])
if not items:
print(f"[ERROR] 중기예보 응답 item 없음. tmFc={tmFc}, regId={regId}")
return {}, {}
item = items[0] # 실제 예보 데이터
return {}
item = items[0]
except Exception as e:
print(f"[ERROR] 중기예보 호출 실패: {e}")
return {}, {}
return {}
# 3~10일 후 강수확률 추출
precip_probs = {}
for day in range(3, 11):
key = f'rnSt{day}'
@ -190,12 +223,13 @@ def get_midterm_forecast(serviceKey, regId='11B20305'):
except:
precip_probs[day] = 0
return precip_probs, item
return precip_probs
def get_midterm_temperature_forecast(serviceKey, regId='11B20305'): # 파주 코드
return get_cache_or_request('midterm_precip', lambda cached: valid_until_hours(cached, 12), request)
def get_midterm_temperature_forecast(serviceKey, regId='11B20305'):
def request():
url = "http://apis.data.go.kr/1360000/MidFcstInfoService/getMidTa"
# 발표시각은 06:00 또는 18:00
now = datetime.now()
if now.hour < 6:
tmFc = (now - timedelta(days=1)).strftime("%Y%m%d") + "1800"
@ -217,15 +251,12 @@ def get_midterm_temperature_forecast(serviceKey, regId='11B20305'): # 파주
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
data = resp.json()
# 응답 검증
items = data.get("response", {}).get("body", {}).get("items", {}).get("item", [])
if not items:
print(f"[ERROR] 응답에 item 없음. tmFc={tmFc}, regId={regId}")
return {}
item = items[0]
except Exception as e:
print(f"[ERROR] 중기기온예보 호출 실패: {e}")
return {}
@ -234,16 +265,21 @@ def get_midterm_temperature_forecast(serviceKey, regId='11B20305'): # 파주
for day in range(3, 11):
min_key = f'taMin{day}'
max_key = f'taMax{day}'
min_val = item.get(min_key)
max_val = item.get(max_key)
try:
temps[day] = {
'min': int(item.get(min_key, 0)),
'max': int(item.get(max_key, 0))
'min': int(min_val) if min_val is not None else None,
'max': int(max_val) if max_val is not None else None
}
except:
temps[day] = {'min': 0, 'max': 0}
except Exception:
temps[day] = {'min': None, 'max': None}
return temps
return get_cache_or_request('midterm_temp', lambda cached: valid_until_hours(cached, 12), request)
def get_weekly_precip(serviceKey):
from datetime import date
today = date.today()
@ -251,7 +287,7 @@ def get_weekly_precip(serviceKey):
ultra = get_daily_ultra_forecast(serviceKey)
short = get_daily_vilage_forecast(serviceKey)
mid_precip, _ = get_midterm_forecast(serviceKey)
mid_precip = get_midterm_forecast(serviceKey)
mid_temp = get_midterm_temperature_forecast(serviceKey)
results = {}
@ -267,34 +303,77 @@ def get_weekly_precip(serviceKey):
'avgRhm': 0
}
# 강수량과 습도는 초단기예보 우선 반영
if dt_str in ultra:
results[dt_str]['sumRn'] = ultra[dt_str]['sumRn']
results[dt_str]['avgRhm'] = ultra[dt_str]['avgRhm']
# 최고/최저기온은 단기예보로만 덮어쓰기 (0이 아니면 덮어쓰기)
if dt_str in short:
if short[dt_str]['minTa'] != 0:
results[dt_str]['minTa'] = short[dt_str]['minTa']
if short[dt_str]['maxTa'] != 0:
results[dt_str]['maxTa'] = short[dt_str]['maxTa']
# 중기예보 보정 (3일 이후부터)
day_idx = (dt - today).days + 1
if day_idx >= 3:
if day_idx in mid_precip:
mid_rain = mid_precip[day_idx] / 100 * 5.0
if results[dt_str]['sumRn'] < mid_rain:
results[dt_str]['sumRn'] = mid_rain
if day_idx in mid_temp:
# 단기예보로 이미 값이 있으면 건너뛰기
if results[dt_str]['minTa'] == 0:
results[dt_str]['minTa'] = mid_temp[day_idx]['min']
if results[dt_str]['maxTa'] == 0:
results[dt_str]['maxTa'] = mid_temp[day_idx]['max']
day_offset = (dt - today).days # 0부터 시작
if day_offset >= 3:
# 중기예보 강수 우선 적용
if day_offset in mid_precip:
mid_rain = float(mid_precip[day_offset]) / 100 * 5.0
if mid_rain > results[dt_str]['sumRn']:
results[dt_str]['sumRn'] = mid_rain
# 중기예보 기온 적용: 단, None이거나 0이면 단기예보로 대체
key = str(day_offset)
if key in mid_temp:
mid_min = mid_temp[key]['min']
mid_max = mid_temp[key]['max']
if mid_min not in (None, 0):
results[dt_str]['minTa'] = mid_min
elif dt_str in short and short[dt_str]['minTa'] != 0:
results[dt_str]['minTa'] = short[dt_str]['minTa']
if mid_max not in (None, 0):
results[dt_str]['maxTa'] = mid_max
elif dt_str in short and short[dt_str]['maxTa'] != 0:
results[dt_str]['maxTa'] = short[dt_str]['maxTa']
# 중기 기온 적용 이후, 습도 보완
if results[dt_str]['avgRhm'] == 0 and dt_str in short and short[dt_str]['avgRhm'] != 0:
results[dt_str]['avgRhm'] = short[dt_str]['avgRhm']
results[dt_str] = {
'sumRn': round(results[dt_str]['sumRn'], 1),
'minTa': round(results[dt_str]['minTa'], 1),
'maxTa': round(results[dt_str]['maxTa'], 1),
'avgRhm': round(results[dt_str]['avgRhm'], 1),
}
return results
def print_weekly_precip_table(data_dict):
# 헤더 출력
header = f"{'날짜':<10} {'강수량(mm)':>10} {'최저기온(℃)':>12} {'최고기온(℃)':>12} {'평균습도(%)':>12}"
print(header)
print('-' * len(header))
# 날짜 순서대로 출력
for dt in sorted(data_dict.keys()):
vals = data_dict[dt]
print(f"{dt:<10} {vals['sumRn']:10.1f} {vals['minTa']:12.1f} {vals['maxTa']:12.1f} {vals['avgRhm']:12.1f}")
if __name__ == '__main__':
serviceKey = "mHrZoSnzVc+2S4dpCe3A1CgI9cAu1BRttqRdoEy9RGbnKAKyQT4sqcESDqqY3grgBGQMuLeEgWIS3Qxi8rcDVA=="
import os, sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from lib.common import load_config
serviceKey = load_config()['DATA_API']['serviceKey']
data = get_weekly_precip(serviceKey)
print(get_weekly_precip(serviceKey))
print_weekly_precip_table(data)
print(get_daily_vilage_forecast(serviceKey))
print(get_midterm_temperature_forecast(serviceKey))

View File

@ -7,8 +7,8 @@ from collections import defaultdict
import pandas as pd
from sqlalchemy import select, func
from weather_forecast import get_weekly_precip
from conf import db, db_schema
from lib.weather_forecast import get_weekly_precip
from lib.holiday import is_korean_holiday
from lib.common import load_config
@ -22,12 +22,50 @@ pos = db_schema.pos
engine = db.engine
def get_recent_dataframe(today=None) -> pd.DataFrame:
today = today or date.today()
weekday = today.weekday()
sunday = today + timedelta(days=(6 - weekday))
recent_dates = [sunday - timedelta(days=i) for i in reversed(range(14))]
recent_data = fetch_data_for_dates(recent_dates)
# 결측 강수량 보정
weekly_precip = get_weekly_precip(config['DATA_API']['serviceKey'])
for d in recent_dates:
if d >= today and (d not in recent_data or '강수량' not in recent_data[d]):
dt_str = d.strftime('%Y%m%d')
if dt_str in weekly_precip:
recent_data[d] = recent_data.get(d, {})
recent_data[d]['강수량'] = round(float(weekly_precip[dt_str]['sumRn']), 1)
recent_data[d]['최저기온'] = round(float(weekly_precip[dt_str]['minTa']), 1)
recent_data[d]['최고기온'] = round(float(weekly_precip[dt_str]['maxTa']), 1)
recent_data[d]['습도'] = round(float(weekly_precip[dt_str]['avgRhm']), 1)
# prophet 예측값 병합
prophet_forecast = load_prophet_forecast()
for d in recent_dates:
d_ts = pd.Timestamp(d)
if d >= today and d_ts in prophet_forecast.index:
recent_data[d] = recent_data.get(d, {})
recent_data[d]['예상 방문자'] = round(float(prophet_forecast.loc[d_ts]), 0)
return build_dataframe(recent_dates, recent_data, use_forecast_after=today)
def get_last_year_dataframe(today=None) -> pd.DataFrame:
today = today or date.today()
weekday = today.weekday()
sunday = today + timedelta(days=(6 - weekday))
recent_dates = [sunday - timedelta(days=i) for i in reversed(range(14))]
prev_year_dates = get_last_year_same_weekdays(recent_dates)
prev_year_data = fetch_data_for_dates(prev_year_dates)
return build_dataframe(prev_year_dates, prev_year_data)
def get_recent_dates(today=None, days=14):
today = today or date.today()
return [today - timedelta(days=i) for i in reversed(range(days))]
def get_this_week_dates(today=None):
today = today or date.today()
weekday = today.weekday()
@ -228,20 +266,8 @@ def main():
# prophet 예측 결과 불러오기 및 이번 주 예상 데이터에 병합
prophet_forecast = load_prophet_forecast()
for d in this_week_dates:
d_ts = pd.Timestamp(d)
has_forecast = d_ts in prophet_forecast.index
print(f"[DEBUG] 날짜 {d} (Timestamp {d_ts}) 예측 데이터 존재 여부: {has_forecast}")
if has_forecast:
if d not in forecast_data:
forecast_data[d] = {}
forecast_data[d]['예상 방문자'] = round(float(prophet_forecast.loc[d_ts]), 0)
else:
if d not in forecast_data:
forecast_data[d] = {}
forecast_data[d]['예상 방문자'] = None
# 최근 2주 데이터에 오늘 이후 날짜에 대해 예상 방문자 병합
# 최근 2주 데이터에 오늘 이후 날짜에 대해 예상 방문자 병합
for d in recent_dates:
d_ts = pd.Timestamp(d)
if d >= today and d_ts in prophet_forecast.index:
@ -264,6 +290,19 @@ def main():
print("\n📈 작년 동일 요일 데이터:")
print(df_prev.to_string(index=False))
# 🔽 엑셀 파일로 저장
output_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', 'output'))
os.makedirs(output_dir, exist_ok=True)
recent_excel_path = os.path.join(output_dir, 'recent_visitors.xlsx')
prev_excel_path = os.path.join(output_dir, 'lastyear_visitors.xlsx')
df_recent.to_excel(recent_excel_path, index=False)
df_prev.to_excel(prev_excel_path, index=False)
print(f"\n📁 엑셀 파일 저장 완료:")
print(f" - 최근 2주: {recent_excel_path}")
print(f" - 작년 동일 요일: {prev_excel_path}")
if __name__ == "__main__":
main()

View File

@ -1,4 +1,7 @@
# weekly_visitor_forecast_prophet.py
# 퍼스트가든 방문객 예측 프로그램
# prophet를 활용한 예측처리
import os, sys
import re, requests
from sqlalchemy import select, and_, func
@ -13,8 +16,8 @@ from datetime import date, datetime, timedelta
# 경로 설정: 프로젝트 루트 conf 폴더 내 db 및 스키마 모듈 임포트
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from conf import db, db_schema
from weather_forecast import get_weekly_precip # 변경된 날씨 예보 함수 임포트
from lib.holiday import is_korean_holiday # holiday.py의 DB 기반 휴일 판단 함수
from lib.weather_forecast import get_weekly_precip
from lib.holiday import is_korean_holiday
from lib.common import load_config
# DB 테이블 객체 초기화
@ -263,6 +266,31 @@ def train_and_predict_rf(df, forecast_days=7):
future_df['pos_qty'] = model.predict(future_df[['weekday', 'minTa', 'maxTa', 'sumRn', 'avgRhm', 'pm25']])
return future_df
# weekly_visitor_forecast_prophet.py 하단에 추가
def get_forecast_dict(forecast_days=3) -> dict:
"""
오늘 기준 forecast_days일 만큼 방문객 예측 데이터를 계산해
{'2025-07-11': 1020, '2025-07-12': 1103, ...} 형태로 반환
"""
today = datetime.today().date()
start_date = today - timedelta(days=365)
end_date = today
with Session(db.engine) as session:
df = load_data(session, start_date, end_date)
prophet_df = prepare_prophet_df(df)
forecast = train_and_predict_prophet(prophet_df, forecast_days)
result = (
forecast[forecast['ds'].dt.date >= today]
[['ds', 'yhat']]
.copy()
)
result['ds'] = result['ds'].dt.strftime('%Y-%m-%d')
return dict(result.values)
def main():
today = datetime.today().date()
start_date = today - timedelta(days=365)

View File

@ -0,0 +1,150 @@
import pandas as pd
from openpyxl import Workbook
from openpyxl.styles import Font, Alignment, Border, Side
from openpyxl.chart import LineChart, Reference
from openpyxl.chart.series import SeriesLabel
from datetime import date
import os
def generate_excel_report(today, recent_dates, prev_year_dates, recent_data, prev_year_data, filename="visitor_report.xlsx"):
weekday_names = ['', '', '', '', '', '', '']
wb = Workbook()
ws = wb.active
ws.title = "방문자 리포트"
bold = Font(bold=True)
center = Alignment(horizontal='center', vertical='center')
thick_border = Border(
left=Side(style='thick'), right=Side(style='thick'),
top=Side(style='thick'), bottom=Side(style='thick')
)
def fmt(d):
return f"{d.month}{d.day}{weekday_names[d.weekday()]}"
headers = ["구분"] + [fmt(d) for d in recent_dates]
ws.append([])
for _ in range(23):
ws.append([])
data_start_row = 24
ws.append(headers)
# 범례 영역
ws.merge_cells(start_row=data_start_row, start_column=1, end_row=data_start_row + 6, end_column=1)
ws.merge_cells(start_row=data_start_row + 7, start_column=1, end_row=data_start_row + 13, end_column=1)
ws.cell(row=data_start_row, column=1, value=f"{today.year}").font = bold
ws.cell(row=data_start_row + 7, column=1, value=f"{today.year - 1}").font = bold
def row(label, key, data, suffix="", fmt_func=None):
r = [label]
for d in recent_dates:
v = data.get(d, {}).get(key, "")
if fmt_func:
v = fmt_func(v)
if v == 0 or v == '':
r.append("")
else:
r.append(f"{v}{suffix}")
return r
# 올해 예측 포함 입장객
merged_visitors = ["입장객수"]
for d in recent_dates:
actual = recent_data.get(d, {}).get("입장객 수", 0)
forecast = recent_data.get(d, {}).get("예상 방문자", None)
if d >= today and forecast:
merged_visitors.append(f"{actual} ({int(forecast)})")
else:
merged_visitors.append(actual if actual else "")
year_rows = [
row("홈페이지", "웹 방문자 수", recent_data),
merged_visitors,
row("최저기온", "최저기온", recent_data),
row("최고기온", "최고기온", recent_data),
row("습도", "습도", recent_data, "%"),
row("강수량", "강수량", recent_data),
row("미세먼지지수", "미세먼지", recent_data),
]
for r in year_rows:
ws.append(r)
# 작년 데이터
def prev_row(label, key, suffix="", fmt_func=None):
r = [label]
for d in prev_year_dates:
v = prev_year_data.get(d, {}).get(key, "")
if fmt_func:
v = fmt_func(v)
if v == 0 or v == '':
r.append("")
else:
r.append(f"{v}{suffix}")
return r
prev_rows = [
prev_row("홈페이지", "웹 방문자 수"),
prev_row("입장객수", "입장객 수"),
prev_row("최저기온", "최저기온"),
prev_row("최고기온", "최고기온"),
prev_row("습도", "습도", "%"),
prev_row("강수량", "강수량"),
prev_row("미세먼지지수", "미세먼지"),
]
for r in prev_rows:
ws.append(r)
# 증감 비교
diff = ["입장객 증감"]
rate = ["입장객 변동률"]
temp_dev = ["최고기온 편차"]
for i, d in enumerate(recent_dates):
cur = recent_data.get(d, {}).get("입장객 수", 0)
prev = prev_year_data.get(prev_year_dates[i], {}).get("입장객 수", 0)
if prev:
diff.append(cur - prev)
rate.append(f"{(cur - prev) / prev * 100:.1f}%")
else:
diff.append("")
rate.append("")
t1 = recent_data.get(d, {}).get("최고기온")
t2 = prev_year_data.get(prev_year_dates[i], {}).get("최고기온")
temp_dev.append(round(t1 - t2, 1) if t1 is not None and t2 is not None else "")
for row in [diff, rate, temp_dev]:
ws.append(row)
# 굵은 테두리 처리
for col, d in enumerate(recent_dates, start=2):
if d >= today:
for r in range(data_start_row + 1, data_start_row + 18):
ws.cell(row=r, column=col).border = thick_border
# 차트
chart = LineChart()
chart.title = "입장객 비교 (예상 포함 vs 작년)"
chart.height = 10
chart.width = 22
chart.y_axis.title = ""
chart.x_axis.title = "날짜"
label_ref = Reference(ws, min_col=2, min_row=data_start_row, max_col=1 + len(recent_dates))
this_year_ref = Reference(ws, min_col=2, min_row=data_start_row + 2, max_col=1 + len(recent_dates))
last_year_ref = Reference(ws, min_col=2, min_row=data_start_row + 9, max_col=1 + len(recent_dates))
chart.set_categories(label_ref)
chart.add_data(this_year_ref, titles_from_data=False)
chart.add_data(last_year_ref, titles_from_data=False)
chart.series[0].tx = SeriesLabel(v="입장객수 (예상 포함)")
chart.series[1].tx = SeriesLabel(v="작년 입장객수")
chart.series[1].graphicalProperties.solidFill = "999999"
ws.add_chart(chart, "A1")
wb.save(filename)
print(f"✅ 엑셀 저장 완료: {filename}")