562 lines
17 KiB
Python
562 lines
17 KiB
Python
# ===================================================================
|
|
# services/weather/forecast.py
|
|
# 기상청 예보 API 서비스 모듈
|
|
# ===================================================================
|
|
# 기상청 공공데이터포털 API를 통해 다양한 예보 데이터를 조회합니다:
|
|
# - 초단기예보: 향후 6시간 이내 예보
|
|
# - 단기예보: 향후 3일간 예보
|
|
# - 중기예보: 3~10일 후 예보
|
|
# ===================================================================
|
|
"""
|
|
기상청 예보 API 서비스 모듈
|
|
|
|
기상청 공공데이터포털 API를 통해 초단기, 단기, 중기 예보 데이터를 조회합니다.
|
|
|
|
사용 예시:
|
|
from services.weather.forecast import get_daily_vilage_forecast, get_midterm_forecast
|
|
|
|
# 단기 예보 조회
|
|
forecast = get_daily_vilage_forecast(service_key)
|
|
|
|
# 중기 예보 조회
|
|
precip_probs, raw_data = get_midterm_forecast(service_key)
|
|
"""
|
|
|
|
import re
|
|
from datetime import datetime, timedelta, date
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
|
|
import requests
|
|
|
|
from core.logging_utils import get_logger
|
|
from core.http_client import create_retry_session
|
|
from core.config import get_config
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
# 기상청 API 기본 URL
|
|
VILAGE_FCST_URL = "http://apis.data.go.kr/1360000/VilageFcstInfoService_2.0"
|
|
MID_FCST_URL = "http://apis.data.go.kr/1360000/MidFcstInfoService"
|
|
|
|
# 기본 좌표 (파주 지역)
|
|
DEFAULT_NX = 57
|
|
DEFAULT_NY = 130
|
|
|
|
|
|
def parse_precip(value: Any) -> float:
|
|
"""
|
|
강수량 텍스트를 숫자(mm)로 변환
|
|
|
|
기상청 API에서 반환하는 강수량 텍스트를 파싱합니다.
|
|
|
|
Args:
|
|
value: 강수량 값 ('강수없음', '1mm 미만', '3.5mm' 등)
|
|
|
|
Returns:
|
|
강수량 (mm). 파싱 실패 시 0.0
|
|
|
|
Examples:
|
|
>>> parse_precip('강수없음')
|
|
0.0
|
|
>>> parse_precip('1mm 미만')
|
|
0.5
|
|
>>> parse_precip('3.5')
|
|
3.5
|
|
"""
|
|
if value == '강수없음':
|
|
return 0.0
|
|
elif '1mm 미만' in str(value):
|
|
return 0.5
|
|
else:
|
|
# 숫자 추출 시도
|
|
match = re.search(r"[\d.]+", str(value))
|
|
if match:
|
|
try:
|
|
return float(match.group())
|
|
except ValueError:
|
|
pass
|
|
try:
|
|
return float(value)
|
|
except (ValueError, TypeError):
|
|
return 0.0
|
|
|
|
|
|
def get_latest_base_datetime(now: Optional[datetime] = None) -> Tuple[str, str]:
|
|
"""
|
|
최신 발표 시각 계산
|
|
|
|
단기예보 API의 base_time은 특정 시간에만 발표됩니다.
|
|
현재 시간 기준 가장 최근 발표 시각을 계산합니다.
|
|
|
|
Args:
|
|
now: 기준 시간 (None이면 현재 시간)
|
|
|
|
Returns:
|
|
(base_date, base_time) 튜플 (YYYYMMDD, HHMM)
|
|
"""
|
|
if now is None:
|
|
now = datetime.now()
|
|
|
|
# 발표 시각 목록 (하루 8회)
|
|
base_times = ["0200", "0500", "0800", "1100", "1400", "1700", "2000", "2300"]
|
|
|
|
# 현재 시간에 맞는 가장 최근 발표 시각 찾기
|
|
candidate = None
|
|
for bt in reversed(base_times):
|
|
hour = int(bt[:2])
|
|
minute = int(bt[2:])
|
|
|
|
if (now.hour > hour) or (now.hour == hour and now.minute >= minute + 10):
|
|
candidate = bt
|
|
break
|
|
|
|
# 적합한 시각이 없으면 전날 마지막 발표 사용
|
|
if candidate is None:
|
|
candidate = "2300"
|
|
now -= timedelta(days=1)
|
|
|
|
base_date = now.strftime("%Y%m%d")
|
|
return base_date, candidate
|
|
|
|
|
|
def get_ultra_forecast(
|
|
service_key: str,
|
|
base_date: Optional[str] = None,
|
|
base_time: Optional[str] = None,
|
|
nx: int = DEFAULT_NX,
|
|
ny: int = DEFAULT_NY
|
|
) -> List[Dict]:
|
|
"""
|
|
초단기예보 조회 (향후 6시간)
|
|
|
|
기상청 초단기예보 API를 호출하여 원시 데이터를 반환합니다.
|
|
|
|
Args:
|
|
service_key: 공공데이터포털 API 키
|
|
base_date: 발표 날짜 (YYYYMMDD). None이면 현재 날짜
|
|
base_time: 발표 시각 (HHMM). None이면 자동 계산
|
|
nx: 격자 X 좌표
|
|
ny: 격자 Y 좌표
|
|
|
|
Returns:
|
|
예보 아이템 리스트 (기상청 API 원시 응답)
|
|
"""
|
|
if base_date is None or base_time is None:
|
|
base_date, base_time = get_latest_base_datetime()
|
|
|
|
url = f"{VILAGE_FCST_URL}/getUltraSrtFcst"
|
|
params = {
|
|
'serviceKey': service_key,
|
|
'numOfRows': '1000',
|
|
'pageNo': '1',
|
|
'dataType': 'JSON',
|
|
'base_date': base_date,
|
|
'base_time': base_time,
|
|
'nx': str(nx),
|
|
'ny': str(ny)
|
|
}
|
|
|
|
try:
|
|
session = create_retry_session(retries=3)
|
|
response = session.get(url, params=params, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
items = data.get('response', {}).get('body', {}).get('items', {}).get('item', [])
|
|
|
|
logger.debug(f"초단기예보 조회 완료: {len(items)}건")
|
|
return items
|
|
|
|
except Exception as e:
|
|
logger.error(f"초단기예보 조회 실패: {e}")
|
|
return []
|
|
|
|
|
|
def get_vilage_forecast(
|
|
service_key: str,
|
|
base_date: Optional[str] = None,
|
|
base_time: str = "0200",
|
|
nx: int = DEFAULT_NX,
|
|
ny: int = DEFAULT_NY
|
|
) -> List[Dict]:
|
|
"""
|
|
단기예보 조회 (향후 3일)
|
|
|
|
기상청 단기예보 API를 호출하여 원시 데이터를 반환합니다.
|
|
|
|
Args:
|
|
service_key: 공공데이터포털 API 키
|
|
base_date: 발표 날짜 (YYYYMMDD). None이면 현재 날짜
|
|
base_time: 발표 시각 (기본: 0200)
|
|
nx: 격자 X 좌표
|
|
ny: 격자 Y 좌표
|
|
|
|
Returns:
|
|
예보 아이템 리스트 (기상청 API 원시 응답)
|
|
"""
|
|
if base_date is None:
|
|
base_date = datetime.now().strftime("%Y%m%d")
|
|
|
|
url = f"{VILAGE_FCST_URL}/getVilageFcst"
|
|
params = {
|
|
'serviceKey': service_key,
|
|
'numOfRows': '1000',
|
|
'pageNo': '1',
|
|
'dataType': 'JSON',
|
|
'base_date': base_date,
|
|
'base_time': base_time,
|
|
'nx': str(nx),
|
|
'ny': str(ny)
|
|
}
|
|
|
|
try:
|
|
session = create_retry_session(retries=3)
|
|
response = session.get(url, params=params, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
items = data.get('response', {}).get('body', {}).get('items', {}).get('item', [])
|
|
|
|
logger.debug(f"단기예보 조회 완료: {len(items)}건")
|
|
return items
|
|
|
|
except Exception as e:
|
|
logger.error(f"단기예보 조회 실패: {e}")
|
|
return []
|
|
|
|
|
|
def get_daily_ultra_forecast(service_key: str) -> Dict[str, Dict]:
|
|
"""
|
|
초단기예보 일별 요약 데이터 조회
|
|
|
|
초단기예보 데이터를 날짜별로 집계하여 반환합니다.
|
|
|
|
Args:
|
|
service_key: 공공데이터포털 API 키
|
|
|
|
Returns:
|
|
날짜별 요약 딕셔너리
|
|
{날짜: {'sumRn': 강수량합계, 'minTa': 최저기온, 'maxTa': 최고기온, 'avgRhm': 평균습도}}
|
|
"""
|
|
base_date, base_time = get_latest_base_datetime()
|
|
items = get_ultra_forecast(service_key, base_date, base_time)
|
|
|
|
if not items:
|
|
return {}
|
|
|
|
# 날짜별 데이터 집계
|
|
daily_data: Dict[str, Dict] = {}
|
|
|
|
for item in items:
|
|
dt = item.get('fcstDate', '')
|
|
cat = item.get('category', '')
|
|
val = item.get('fcstValue', '')
|
|
|
|
if not dt:
|
|
continue
|
|
|
|
if dt not in daily_data:
|
|
daily_data[dt] = {'sumRn': 0, 'temps': [], 'rhm': []}
|
|
|
|
# 카테고리별 처리
|
|
if cat == 'RN1': # 1시간 강수량
|
|
daily_data[dt]['sumRn'] += parse_precip(val)
|
|
elif cat == 'T1H': # 기온
|
|
try:
|
|
daily_data[dt]['temps'].append(float(val))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
elif cat == 'REH': # 습도
|
|
try:
|
|
daily_data[dt]['rhm'].append(float(val))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# 집계 결과 변환
|
|
result = {}
|
|
for dt, vals in daily_data.items():
|
|
temps = vals['temps']
|
|
rhm = vals['rhm']
|
|
|
|
result[dt] = {
|
|
'sumRn': round(vals['sumRn'], 2),
|
|
'minTa': min(temps) if temps else 0,
|
|
'maxTa': max(temps) if temps else 0,
|
|
'avgRhm': round(sum(rhm) / len(rhm), 1) if rhm else 0
|
|
}
|
|
|
|
return result
|
|
|
|
|
|
def get_daily_vilage_forecast(service_key: str) -> Dict[str, Dict]:
|
|
"""
|
|
단기예보 일별 요약 데이터 조회
|
|
|
|
단기예보 데이터를 날짜별로 집계하여 반환합니다.
|
|
|
|
Args:
|
|
service_key: 공공데이터포털 API 키
|
|
|
|
Returns:
|
|
날짜별 요약 딕셔너리
|
|
{날짜: {'sumRn': 강수량합계, 'minTa': 최저기온, 'maxTa': 최고기온, 'avgRhm': 평균습도}}
|
|
"""
|
|
base_date, _ = get_latest_base_datetime()
|
|
items = get_vilage_forecast(service_key, base_date)
|
|
|
|
if not items:
|
|
return {}
|
|
|
|
# 날짜별 데이터 집계
|
|
daily_data: Dict[str, Dict] = {}
|
|
|
|
for item in items:
|
|
dt = item.get('fcstDate', '')
|
|
cat = item.get('category', '')
|
|
val = item.get('fcstValue', '')
|
|
|
|
if not dt:
|
|
continue
|
|
|
|
if dt not in daily_data:
|
|
daily_data[dt] = {'sumRn': 0, 'minTa': [], 'maxTa': [], 'rhm': []}
|
|
|
|
# 카테고리별 처리
|
|
if cat == 'PCP': # 강수량
|
|
daily_data[dt]['sumRn'] += parse_precip(val)
|
|
elif cat == 'TMN': # 최저기온
|
|
try:
|
|
daily_data[dt]['minTa'].append(float(val))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
elif cat == 'TMX': # 최고기온
|
|
try:
|
|
daily_data[dt]['maxTa'].append(float(val))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
elif cat == 'REH': # 습도
|
|
try:
|
|
daily_data[dt]['rhm'].append(float(val))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
# 집계 결과 변환
|
|
result = {}
|
|
for dt, vals in daily_data.items():
|
|
min_ta = vals['minTa']
|
|
max_ta = vals['maxTa']
|
|
rhm = vals['rhm']
|
|
|
|
result[dt] = {
|
|
'sumRn': round(vals['sumRn'], 2),
|
|
'minTa': min(min_ta) if min_ta else 0,
|
|
'maxTa': max(max_ta) if max_ta else 0,
|
|
'avgRhm': round(sum(rhm) / len(rhm), 1) if rhm else 0
|
|
}
|
|
|
|
return result
|
|
|
|
|
|
def get_midterm_forecast(
|
|
service_key: str,
|
|
reg_id: str = '11B20305'
|
|
) -> Tuple[Dict[int, int], Dict]:
|
|
"""
|
|
중기 강수확률 예보 조회 (3~10일 후)
|
|
|
|
Args:
|
|
service_key: 공공데이터포털 API 키
|
|
reg_id: 예보 지역 코드 (기본: 파주)
|
|
|
|
Returns:
|
|
(강수확률 딕셔너리, 원시 응답) 튜플
|
|
강수확률: {일수: 확률} (예: {3: 30, 4: 20, ...})
|
|
"""
|
|
url = f"{MID_FCST_URL}/getMidLandFcst"
|
|
|
|
# 발표 시각 계산 (06시 또는 18시)
|
|
now = datetime.now()
|
|
if now.hour < 6:
|
|
tm_fc = (now - timedelta(days=1)).strftime("%Y%m%d") + "1800"
|
|
elif now.hour < 18:
|
|
tm_fc = now.strftime("%Y%m%d") + "0600"
|
|
else:
|
|
tm_fc = now.strftime("%Y%m%d") + "1800"
|
|
|
|
params = {
|
|
'serviceKey': service_key,
|
|
'regId': reg_id,
|
|
'tmFc': tm_fc,
|
|
'numOfRows': '10',
|
|
'pageNo': '1',
|
|
'dataType': 'JSON',
|
|
}
|
|
|
|
try:
|
|
session = create_retry_session(retries=3)
|
|
response = session.get(url, params=params, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
items = data.get('response', {}).get('body', {}).get('items', {}).get('item', [])
|
|
|
|
if not items:
|
|
logger.warning(f"중기예보 응답 없음: tmFc={tm_fc}, regId={reg_id}")
|
|
return {}, {}
|
|
|
|
item = items[0]
|
|
|
|
# 3~10일 후 강수확률 추출
|
|
precip_probs = {}
|
|
for day in range(3, 11):
|
|
key = f'rnSt{day}'
|
|
try:
|
|
precip_probs[day] = int(item.get(key, 0))
|
|
except (ValueError, TypeError):
|
|
precip_probs[day] = 0
|
|
|
|
logger.debug(f"중기 강수확률 조회 완료: {precip_probs}")
|
|
return precip_probs, item
|
|
|
|
except Exception as e:
|
|
logger.error(f"중기예보 조회 실패: {e}")
|
|
return {}, {}
|
|
|
|
|
|
def get_midterm_temperature(
|
|
service_key: str,
|
|
reg_id: str = '11B20305'
|
|
) -> Dict[int, Dict[str, int]]:
|
|
"""
|
|
중기 기온 예보 조회 (3~10일 후)
|
|
|
|
Args:
|
|
service_key: 공공데이터포털 API 키
|
|
reg_id: 예보 지역 코드 (기본: 파주)
|
|
|
|
Returns:
|
|
일자별 기온 딕셔너리
|
|
{일수: {'min': 최저기온, 'max': 최고기온}}
|
|
"""
|
|
url = f"{MID_FCST_URL}/getMidTa"
|
|
|
|
# 발표 시각 계산
|
|
now = datetime.now()
|
|
if now.hour < 6:
|
|
tm_fc = (now - timedelta(days=1)).strftime("%Y%m%d") + "1800"
|
|
elif now.hour < 18:
|
|
tm_fc = now.strftime("%Y%m%d") + "0600"
|
|
else:
|
|
tm_fc = now.strftime("%Y%m%d") + "1800"
|
|
|
|
params = {
|
|
'serviceKey': service_key,
|
|
'regId': reg_id,
|
|
'tmFc': tm_fc,
|
|
'pageNo': '1',
|
|
'numOfRows': '10',
|
|
'dataType': 'JSON'
|
|
}
|
|
|
|
try:
|
|
session = create_retry_session(retries=3)
|
|
response = session.get(url, params=params, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
items = data.get('response', {}).get('body', {}).get('items', {}).get('item', [])
|
|
|
|
if not items:
|
|
logger.warning(f"중기기온예보 응답 없음: tmFc={tm_fc}, regId={reg_id}")
|
|
return {}
|
|
|
|
item = items[0]
|
|
|
|
# 3~10일 후 기온 추출
|
|
temps = {}
|
|
for day in range(3, 11):
|
|
min_key = f'taMin{day}'
|
|
max_key = f'taMax{day}'
|
|
try:
|
|
temps[day] = {
|
|
'min': int(item.get(min_key, 0)),
|
|
'max': int(item.get(max_key, 0))
|
|
}
|
|
except (ValueError, TypeError):
|
|
temps[day] = {'min': 0, 'max': 0}
|
|
|
|
logger.debug(f"중기 기온 조회 완료: {temps}")
|
|
return temps
|
|
|
|
except Exception as e:
|
|
logger.error(f"중기기온예보 조회 실패: {e}")
|
|
return {}
|
|
|
|
|
|
def get_weekly_precip(service_key: str) -> Dict[str, float]:
|
|
"""
|
|
금주 일요일까지의 일별 예상 강수량 조회
|
|
|
|
단기예보와 중기예보를 조합하여 금주 일요일까지의 강수량을 예측합니다.
|
|
|
|
Args:
|
|
service_key: 공공데이터포털 API 키
|
|
|
|
Returns:
|
|
날짜별 예상 강수량 딕셔너리 {YYYYMMDD: 강수량(mm)}
|
|
"""
|
|
today = date.today()
|
|
sunday = today + timedelta(days=(6 - today.weekday()))
|
|
|
|
result = {}
|
|
|
|
# 단기예보 데이터 (오늘~+2일)
|
|
vilage_data = get_daily_vilage_forecast(service_key)
|
|
for dt, vals in vilage_data.items():
|
|
dt_date = datetime.strptime(dt, "%Y%m%d").date()
|
|
if dt_date <= sunday:
|
|
result[dt] = vals['sumRn']
|
|
|
|
# 중기예보 강수확률 (3일 이후)
|
|
precip_probs, _ = get_midterm_forecast(service_key)
|
|
for day, prob in precip_probs.items():
|
|
target_date = today + timedelta(days=day)
|
|
if target_date <= sunday:
|
|
dt = target_date.strftime("%Y%m%d")
|
|
if dt not in result:
|
|
# 강수확률을 기반으로 예상 강수량 추정
|
|
# (간단한 휴리스틱: 확률 * 0.1mm)
|
|
result[dt] = round(prob * 0.1, 1)
|
|
|
|
return result
|
|
|
|
|
|
if __name__ == '__main__':
|
|
"""
|
|
기상청 예보 모듈 테스트
|
|
|
|
사용법:
|
|
python services/weather/forecast.py
|
|
"""
|
|
logger = get_logger(__name__)
|
|
logger.info("=== 기상청 예보 모듈 테스트 ===")
|
|
|
|
try:
|
|
config = get_config()
|
|
service_key = config.data_api['service_key'] or "TEST_KEY"
|
|
|
|
logger.info(f"설정 로드 완료")
|
|
logger.info(f"- 서비스 키: {service_key[:10] if service_key else 'NOT SET'}***")
|
|
|
|
logger.info("\n모듈 기능 확인:")
|
|
logger.info("- get_ultra_forecast: 초단기 예보 조회")
|
|
logger.info("- get_vilage_forecast: 동네 예보 조회")
|
|
logger.info("- get_midterm_forecast: 중기 예보 조회")
|
|
|
|
logger.info("\n✓ 기상청 예보 모듈 테스트 완료")
|
|
|
|
except Exception as e:
|
|
logger.error(f"기상청 예보 모듈 테스트 실패: {e}")
|
|
import traceback
|
|
logger.error(traceback.format_exc())
|