# =================================================================== # services/analytics/air_quality.py # 대기질 데이터 수집 서비스 모듈 # =================================================================== # 한국환경공단 API를 통해 대기질(미세먼지) 데이터를 수집합니다. # 측정소별 PM2.5, PM10, SO2, CO, NO2, O3 데이터를 저장합니다. # =================================================================== """ 대기질 데이터 수집 서비스 모듈 한국환경공단 공공데이터 API를 통해 대기질 데이터를 수집합니다. 측정소별 일평균 대기오염물질 농도를 조회할 수 있습니다. 사용 예시: from services.analytics.air_quality import AirQualityCollector, get_air_quality # 간단한 데이터 조회 data = get_air_quality(service_key, '운정', '20240101', '20240131') # 자동 데이터 수집 및 저장 collector = AirQualityCollector(config, engine, table) collector.run() """ import os import json import traceback from datetime import datetime, timedelta, date from typing import Dict, List, Optional, Any from sqlalchemy import select, func, and_, Table from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.exc import IntegrityError from sqlalchemy.engine import Engine, Connection from core.logging_utils import get_logger from core.http_client import create_retry_session from core.config import get_config logger = get_logger(__name__) # API URL AIR_QUALITY_API_URL = "http://apis.data.go.kr/B552584/ArpltnStatsSvc/getMsrstnAcctoRDyrg" def get_air_quality( service_key: str, station_name: str, start_date: str, end_date: str, num_of_rows: int = 100, page_no: int = 1 ) -> List[Dict]: """ 대기질 데이터 조회 한국환경공단 API를 호출하여 측정소별 대기질 데이터를 조회합니다. Args: service_key: 공공데이터포털 API 키 station_name: 측정소명 (예: '운정', '서울') start_date: 시작 날짜 (YYYYMMDD) end_date: 종료 날짜 (YYYYMMDD) num_of_rows: 페이지당 결과 수 page_no: 페이지 번호 Returns: 대기질 데이터 리스트 데이터 항목: - msurDt: 측정일 (YYYY-MM-DD) - pm25Value: 초미세먼지 농도 (㎍/㎥) - pm10Value: 미세먼지 농도 (㎍/㎥) - so2Value: 아황산가스 농도 (ppm) - coValue: 일산화탄소 농도 (ppm) - no2Value: 이산화질소 농도 (ppm) - o3Value: 오존 농도 (ppm) """ params = { 'serviceKey': service_key, 'returnType': 'json', 'numOfRows': str(num_of_rows), 'pageNo': str(page_no), 'inqBginDt': start_date, 'inqEndDt': end_date, 'msrstnName': station_name, } session = create_retry_session(retries=3) try: response = session.get(AIR_QUALITY_API_URL, params=params, timeout=20) response.raise_for_status() data = response.json() items = data.get('response', {}).get('body', {}).get('items', []) logger.debug(f"대기질 데이터 조회: {station_name}, {len(items)}건") return items if items else [] except Exception as e: logger.error(f"대기질 API 요청 실패: {e}") traceback.print_exc() return [] finally: session.close() class AirQualityCollector: """ 대기질 데이터 자동 수집기 설정에 따라 대기질 데이터를 자동으로 수집하고 DB에 저장합니다. Attributes: api_key: API 서비스 키 station_list: 측정소 목록 engine: SQLAlchemy 엔진 table: 대상 테이블 start_date: 수집 시작일 force_update: 강제 업데이트 여부 debug: 디버그 모드 """ # 캐시 파일 경로 CACHE_FILE = 'cache/air_num_rows.json' def __init__( self, engine: Engine, table: Table, api_key: Optional[str] = None, station_list: Optional[List[str]] = None, start_date: Optional[str] = None, force_update: bool = False, debug: bool = False ): """ Args: engine: SQLAlchemy 엔진 table: 대상 테이블 api_key: API 키 (None이면 설정에서 로드) station_list: 측정소 목록 (None이면 설정에서 로드) start_date: 수집 시작일 (YYYYMMDD) force_update: 기존 데이터 덮어쓰기 여부 debug: 디버그 모드 """ config = get_config() self.api_key = api_key or config.data_api.get('service_key', '') self.station_list = station_list or config.data_api.get('air_stations', ['운정']) if start_date: self.start_date = datetime.strptime(start_date, '%Y%m%d').date() else: self.start_date = datetime.strptime( config.data_api.get('start_date', '20170101'), '%Y%m%d' ).date() self.engine = engine self.table = table self.force_update = force_update self.debug = debug self.yesterday = (datetime.now() - timedelta(days=1)).date() self.session = create_retry_session(retries=3) # 캐시 로드 self._num_rows_cache = self._load_cache() def _load_cache(self) -> Dict: """캐시 파일 로드""" try: cache_path = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(__file__))), self.CACHE_FILE ) if os.path.exists(cache_path): with open(cache_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception: pass return {} def _save_cache(self): """캐시 파일 저장""" try: cache_path = os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(__file__))), self.CACHE_FILE ) os.makedirs(os.path.dirname(cache_path), exist_ok=True) with open(cache_path, 'w', encoding='utf-8') as f: json.dump(self._num_rows_cache, f, ensure_ascii=False, indent=2) except Exception as e: logger.warning(f"캐시 저장 실패: {e}") def get_latest_date(self, conn: Connection, station: str) -> Optional[date]: """ 특정 측정소의 가장 최근 저장 날짜 조회 Args: conn: DB 연결 station: 측정소명 Returns: 최근 날짜 또는 None """ try: stmt = select(func.max(self.table.c.date)).where( self.table.c.station == station ) result = conn.execute(stmt).scalar() return result except Exception as e: logger.error(f"최근 날짜 조회 실패: {e}") return None def parse_item_to_record(self, item: Dict, station: str) -> Optional[Dict]: """ API 응답 아이템을 DB 레코드로 변환 Args: item: API 응답 아이템 station: 측정소명 Returns: DB 레코드 딕셔너리 또는 None """ try: item_date = datetime.strptime(item['msurDt'], '%Y-%m-%d').date() except Exception as e: logger.warning(f"날짜 파싱 오류: {item.get('msurDt')} - {e}") return None def safe_float(val): """안전한 float 변환""" try: return float(val) if val else None except (ValueError, TypeError): return None return { 'date': item_date, 'station': station, 'pm25': safe_float(item.get('pm25Value')), 'pm10': safe_float(item.get('pm10Value')), 'so2': safe_float(item.get('so2Value')), 'co': safe_float(item.get('coValue')), 'no2': safe_float(item.get('no2Value')), 'o3': safe_float(item.get('o3Value')), } def save_items_to_db( self, items: List[Dict], conn: Connection, station: str ) -> int: """ 데이터 항목들을 DB에 저장 Args: items: 저장할 데이터 리스트 conn: DB 연결 station: 측정소명 Returns: 저장된 레코드 수 """ saved_count = 0 for item in items: data = self.parse_item_to_record(item, station) if not data: continue item_date = data['date'] if self.debug: logger.debug(f"[DEBUG] {item_date} [{station}] 저장 시도: {data}") continue try: if self.force_update: # UPSERT stmt = mysql_insert(self.table).values(**data) stmt = stmt.on_duplicate_key_update(**data) conn.execute(stmt) logger.info(f"{item_date} [{station}] 저장/업데이트 완료") else: # 중복 확인 후 삽입 sel = select(self.table.c.date).where( and_( self.table.c.date == item_date, self.table.c.station == station ) ) if conn.execute(sel).fetchone(): logger.debug(f"{item_date} [{station}] 이미 존재, 생략") continue conn.execute(self.table.insert().values(**data)) logger.info(f"{item_date} [{station}] 저장 완료") saved_count += 1 except IntegrityError as e: logger.error(f"중복 오류: {e}") except Exception as e: logger.error(f"저장 실패: {e}") traceback.print_exc() return saved_count def find_optimal_num_rows(self, station_name: str, date_str: str) -> int: """ 최적의 numOfRows 파라미터 값 탐색 API 서버 상태에 따라 최대 허용 rows 수가 다를 수 있어 적절한 값을 탐색합니다. Args: station_name: 측정소명 date_str: 날짜 (YYYYMMDD) Returns: 최적의 numOfRows 값 (100~1000) """ # 캐시 확인 cache_key = f"{station_name}:{date_str}" if cache_key in self._num_rows_cache: cached_val = int(self._num_rows_cache[cache_key]) logger.debug(f"캐시된 numOfRows 사용: {cached_val}") return cached_val # 점진적으로 감소하며 테스트 max_rows = 1000 min_rows = 100 while max_rows >= min_rows: try: params = { 'serviceKey': self.api_key, 'returnType': 'json', 'numOfRows': str(max_rows), 'pageNo': '1', 'inqBginDt': date_str, 'inqEndDt': date_str, 'msrstnName': station_name, } response = self.session.get(AIR_QUALITY_API_URL, params=params, timeout=20) response.raise_for_status() response.json() # JSON 파싱 테스트 # 성공 - 캐시에 저장 self._num_rows_cache[cache_key] = max_rows self._save_cache() logger.debug(f"numOfRows 최대값: {max_rows}") return max_rows except Exception as e: logger.warning(f"numOfRows={max_rows} 실패: {e}, 재시도...") max_rows -= 100 logger.warning("기본값 100 사용") return 100 def run(self) -> int: """ 데이터 수집 및 저장 실행 모든 측정소에 대해 데이터를 수집하고 DB에 저장합니다. Returns: 총 저장된 레코드 수 """ total_saved = 0 with self.engine.begin() as conn: for station_name in self.station_list: logger.info(f"측정소 처리 시작: {station_name}") # 시작일 결정 latest_date = self.get_latest_date(conn, station_name) if latest_date: start_date = latest_date + timedelta(days=1) else: start_date = self.start_date if start_date > self.yesterday: logger.info(f"{station_name}: 최신 데이터 존재 ({latest_date})") continue # 최적 numOfRows 탐색 optimal_rows = self.find_optimal_num_rows( station_name, start_date.strftime('%Y%m%d') ) # 청크 단위로 데이터 수집 current_start = start_date while current_start <= self.yesterday: current_end = min( current_start + timedelta(days=optimal_rows - 1), self.yesterday ) logger.info(f"{station_name}: {current_start} ~ {current_end} 수집") items = get_air_quality( self.api_key, station_name, current_start.strftime('%Y%m%d'), current_end.strftime('%Y%m%d'), num_of_rows=optimal_rows ) if items: saved = self.save_items_to_db(items, conn, station_name) total_saved += saved current_start = current_end + timedelta(days=1) logger.info(f"대기질 데이터 수집 완료: 총 {total_saved}건 저장") return total_saved if __name__ == '__main__': """ 대기질 데이터 수집 서비스 모듈 테스트 사용법: python services/analytics/air_quality.py """ logger.info("=== 대기질 데이터 수집 서비스 모듈 테스트 ===") try: config = get_config() service_key = config.data_api['service_key'] or "TEST_KEY" logger.info(f"설정 로드 완료") logger.info(f"- 서비스 키: {service_key[:10] if service_key else 'NOT SET'}***") logger.info("\n제공 기능:") logger.info("- get_air_quality: 대기질 데이터 조회") logger.info("- AirQualityCollector: 자동 데이터 수집 및 DB 저장") logger.info("\n측정 항목:") logger.info("- PM2.5: 초미세먼지") logger.info("- PM10: 미세먼지") logger.info("- SO2, CO, NO2, O3: 기타 오염물질") logger.info("\n✓ 대기질 데이터 수집 서비스 모듈 테스트 완료") except Exception as e: logger.error(f"대기질 모듈 테스트 실패: {e}") logger.error(traceback.format_exc())