# =================================================================== # services/weather/asos.py # 기상청 ASOS 종관기상 데이터 서비스 모듈 # =================================================================== # 기상청 종관기상관측(ASOS) API를 통해 일별 기상 데이터를 조회합니다. # 과거 날씨 데이터 수집 및 DB 저장 기능을 제공합니다. # =================================================================== """ 기상청 ASOS 종관기상 데이터 서비스 모듈 기상청 공공데이터포털의 종관기상관측(ASOS) API를 통해 일별 기상 데이터(기온, 강수량, 습도 등)를 조회합니다. 사용 예시: from services.weather.asos import get_asos_weather, ASOSDataCollector # 단일 기간 조회 data = get_asos_weather(service_key, stn_id=99, start_dt='20240101', end_dt='20240131') # 수집기를 통한 자동 데이터 수집 collector = ASOSDataCollector(service_key) collector.collect_and_save([99]) """ import traceback from datetime import datetime, timedelta, date from typing import Dict, List, Optional, Generator, Tuple, Any from sqlalchemy import select, Table from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.engine import Connection from core.logging_utils import get_logger from core.http_client import create_retry_session from core.config import get_config logger = get_logger(__name__) # ASOS API URL ASOS_API_URL = "http://apis.data.go.kr/1360000/AsosDalyInfoService/getWthrDataList" # 시간 관련 컬럼 (음수 값을 null로 처리) HRMT_KEYS = [ "minTaHrmt", "maxTaHrmt", "mi10MaxRnHrmt", "hr1MaxRnHrmt", "maxInsWsHrmt", "maxWsHrmt", "minRhmHrmt", "maxPsHrmt", "minPsHrmt", "hr1MaxIcsrHrmt", "ddMefsHrmt", "ddMesHrmt" ] def fetch_date_range_chunks( start_dt: str, end_dt: str, chunk_days: int = 10 ) -> Generator[Tuple[str, str], None, None]: """ 날짜 범위를 청크 단위로 분할 대량의 데이터를 조회할 때 API 요청을 분할하여 처리합니다. Args: start_dt: 시작 날짜 (YYYYMMDD) end_dt: 종료 날짜 (YYYYMMDD) chunk_days: 청크 크기 (일 단위) Yields: (시작일, 종료일) 튜플 """ current_start = datetime.strptime(start_dt, "%Y%m%d") end_date = datetime.strptime(end_dt, "%Y%m%d") while current_start <= end_date: current_end = min(current_start + timedelta(days=chunk_days - 1), end_date) yield current_start.strftime("%Y%m%d"), current_end.strftime("%Y%m%d") current_start = current_end + timedelta(days=1) def get_asos_weather( service_key: str, stn_id: int, start_dt: str, end_dt: str, session = None ) -> List[Dict]: """ ASOS 종관기상 데이터 조회 기상청 ASOS API를 호출하여 일별 기상 데이터를 조회합니다. Args: service_key: 공공데이터포털 API 키 stn_id: 관측 지점 ID start_dt: 시작 날짜 (YYYYMMDD) end_dt: 종료 날짜 (YYYYMMDD) session: requests 세션 (재사용용) Returns: 일별 기상 데이터 리스트 데이터 항목: - tm: 일시 (YYYY-MM-DD) - avgTa: 평균 기온 (℃) - minTa: 최저 기온 (℃) - maxTa: 최고 기온 (℃) - sumRn: 일 강수량 (mm) - avgRhm: 평균 상대습도 (%) - avgWs: 평균 풍속 (m/s) 등 """ if session is None: session = create_retry_session(retries=3) params = { "serviceKey": service_key, "pageNo": "1", "numOfRows": "500", "dataType": "JSON", "dataCd": "ASOS", "dateCd": "DAY", "startDt": start_dt, "endDt": end_dt, "stnIds": str(stn_id), } headers = { "User-Agent": "Mozilla/5.0", "Accept": "application/json" } try: response = session.get(ASOS_API_URL, params=params, headers=headers, timeout=20) response.raise_for_status() data = response.json() items = data.get("response", {}).get("body", {}).get("items", {}).get("item", []) if items is None: return [] # 단일 항목인 경우 리스트로 변환 if isinstance(items, dict): items = [items] logger.debug(f"ASOS 데이터 조회 완료: 지점={stn_id}, 건수={len(items)}") return items except Exception as e: logger.error(f"ASOS API 요청 실패: {e}") traceback.print_exc() return [] class ASOSDataCollector: """ ASOS 데이터 자동 수집기 설정에 따라 ASOS 데이터를 자동으로 수집하고 DB에 저장합니다. 마지막 저장 일자를 확인하여 중복 없이 증분 수집합니다. Attributes: service_key: API 서비스 키 force_update: 기존 데이터 덮어쓰기 여부 debug: 디버그 모드 (True면 실제 저장 안 함) """ def __init__( self, service_key: Optional[str] = None, force_update: bool = False, debug: bool = False ): """ Args: service_key: API 키 (None이면 설정에서 로드) force_update: 기존 데이터 덮어쓰기 여부 debug: 디버그 모드 """ if service_key is None: config = get_config() service_key = config.data_api['service_key'] self.service_key = service_key self.force_update = force_update self.debug = debug self.session = create_retry_session(retries=3) def get_latest_date_from_db(self, conn: Connection, table: Table) -> Optional[date]: """ DB에서 가장 최근 저장된 날짜 조회 Args: conn: DB 연결 table: 대상 테이블 Returns: 최근 날짜 또는 None """ sel = select(table.c.date).order_by(table.c.date.desc()).limit(1) result = conn.execute(sel).fetchone() return result[0] if result else None def parse_item_to_record(self, item: Dict, table: Table) -> Optional[Dict]: """ API 응답 아이템을 DB 레코드로 변환 Args: item: API 응답 아이템 table: 대상 테이블 Returns: DB 레코드 딕셔너리 또는 None """ date_str = item.get("tm") if not date_str: return None try: record_date = datetime.strptime(date_str, "%Y-%m-%d").date() except ValueError: logger.warning(f"날짜 파싱 실패: {date_str}") return None data = {"date": record_date} for key in table.c.keys(): if key == "date": continue value = item.get(key) # 빈 값 처리 if value in ["", None, "-"]: data[key] = None continue try: # 시간 관련 컬럼 또는 특수 컬럼 처리 if key in HRMT_KEYS or key == "iscs": fval = float(value) data[key] = str(int(fval)) if fval >= 0 else None elif key == "stnId": data[key] = int(float(value)) else: data[key] = float(value) except (ValueError, TypeError): data[key] = None return data def save_items_to_db( self, items: List[Dict], conn: Connection, table: Table ) -> int: """ 데이터 항목들을 DB에 저장 Args: items: 저장할 데이터 리스트 conn: DB 연결 table: 대상 테이블 Returns: 저장된 레코드 수 """ saved_count = 0 for item in items: data = self.parse_item_to_record(item, table) if not data: continue record_date = data['date'] if self.debug: logger.debug(f"[DEBUG] {record_date} DB 저장 시도: {data}") continue try: if self.force_update: # UPSERT: 존재하면 업데이트, 없으면 삽입 stmt = mysql_insert(table).values(**data) stmt = stmt.on_duplicate_key_update(**data) conn.execute(stmt) logger.info(f"{record_date} 저장/업데이트 완료") else: # 중복 확인 후 삽입 sel = select(table.c.date).where(table.c.date == record_date) if conn.execute(sel).fetchone(): logger.debug(f"{record_date} 이미 존재, 저장 생략") continue conn.execute(table.insert().values(**data)) logger.info(f"{record_date} 저장 완료") saved_count += 1 except Exception as e: logger.error(f"저장 실패 ({record_date}): {e}") traceback.print_exc() raise return saved_count def collect_and_save( self, stn_ids: List[int], engine, table: Table, start_date: Optional[str] = None, end_date: Optional[str] = None, chunk_days: int = 1000 ) -> int: """ 데이터 수집 및 저장 실행 Args: stn_ids: 관측 지점 ID 리스트 engine: SQLAlchemy 엔진 table: 대상 테이블 start_date: 시작 날짜 (None이면 자동 계산) end_date: 종료 날짜 (None이면 자동 계산) chunk_days: 청크 크기 Returns: 총 저장된 레코드 수 """ now = datetime.now() today = now.date() # 종료일 계산 (오전 11시 이전이면 전전일, 이후면 전일) if end_date is None: if now.hour < 11: end_dt = (today - timedelta(days=2)).strftime("%Y%m%d") logger.info(f"오전 11시 이전, 전전일까지 조회: {end_dt}") else: end_dt = (today - timedelta(days=1)).strftime("%Y%m%d") logger.info(f"전일까지 조회: {end_dt}") else: end_dt = end_date total_saved = 0 with engine.begin() as conn: # 시작일 계산 if start_date is None: latest_date = self.get_latest_date_from_db(conn, table) if latest_date: start_dt = (latest_date + timedelta(days=1)).strftime("%Y%m%d") logger.info(f"마지막 저장일: {latest_date}, 시작일: {start_dt}") else: config = get_config() start_dt = config.data_api.get('start_date', '20170101') logger.info(f"저장된 데이터 없음, 기본 시작일: {start_dt}") else: start_dt = start_date # 날짜 검증 if start_dt > end_dt: logger.info("최신 데이터가 이미 존재합니다.") return 0 # 각 관측 지점별 데이터 수집 for stn_id in stn_ids: for chunk_start, chunk_end in fetch_date_range_chunks(start_dt, end_dt, chunk_days): logger.info(f"지점 {stn_id} 데이터 요청: {chunk_start} ~ {chunk_end}") items = get_asos_weather( self.service_key, stn_id, chunk_start, chunk_end, self.session ) if items: saved = self.save_items_to_db(items, conn, table) total_saved += saved else: logger.warning(f"지점 {stn_id} {chunk_start}~{chunk_end} 데이터 없음") logger.info(f"총 {total_saved}건 저장 완료") return total_saved if __name__ == '__main__': """ ASOS 모듈 테스트 사용법: python services/weather/asos.py """ logger.info("=== ASOS 모듈 테스트 ===") try: config = get_config() service_key = config.data_api['service_key'] or "TEST_KEY" logger.info(f"설정 로드 완료") logger.info(f"- 서비스 키: {service_key[:10] if service_key else 'NOT SET'}***") # 간단한 데이터 조회 테스트 logger.info("\n기본 설정 확인:") logger.info(f"- ASOS API URL: {ASOS_API_URL}") logger.info(f"- 시간별 컬럼 수: {len(HRMT_KEYS)}") logger.info("\n✓ ASOS 모듈 테스트 완료") except Exception as e: logger.error(f"ASOS 모듈 테스트 실패: {e}") logger.error(traceback.format_exc())