import os, sys import requests import json from datetime import datetime, timedelta from sqlalchemy import select, func, and_ from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.exc import IntegrityError import traceback sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from conf import db, db_schema from requests_utils import make_requests_session CACHE_FILE = os.path.join(os.path.dirname(__file__), '..', 'cache', 'air_num_rows.json') class AirQualityCollector: def __init__(self, config, engine, table): self.api_key = config['DATA_API']['serviceKey'] self.station_list = config['DATA_API']['air']['station_name'] self.start_date = datetime.strptime(config['DATA_API']['startDt'], '%Y%m%d').date() self.debug = config.get('debug', False) self.force_update = config.get('force_update', False) self.engine = engine self.table = table self.yesterday = (datetime.now() - timedelta(days=1)).date() self.session = make_requests_session() # load cache self._num_rows_cache = {} try: if os.path.exists(CACHE_FILE): with open(CACHE_FILE, 'r', encoding='utf-8') as f: self._num_rows_cache = json.load(f) except Exception: self._num_rows_cache = {} def _save_num_rows_cache(self): try: os.makedirs(os.path.dirname(CACHE_FILE), exist_ok=True) with open(CACHE_FILE, 'w', encoding='utf-8') as f: json.dump(self._num_rows_cache, f, ensure_ascii=False, indent=2) except Exception as e: print(f"[WARN] num_rows 캐시 저장 실패: {e}") def get_latest_date(self, conn, station): try: stmt = select(func.max(self.table.c.date)).where(self.table.c.station == station) result = conn.execute(stmt).scalar() if result is None: print(f"[INFO] {station}: 테이블에 데이터가 없습니다.") return result except Exception as e: print(f"[ERROR] 가장 최근 날짜 조회 실패: {e}") traceback.print_exc() return None def save_data_to_db(self, items, conn, station): for item in items: try: item_date = datetime.strptime(item['msurDt'], '%Y-%m-%d').date() except Exception as e: print(f"[ERROR] 날짜 파싱 오류: {item.get('msurDt')} {e}") continue data = { 'date': item_date, 'station': station, 'pm25': float(item.get('pm25Value')) if item.get('pm25Value') else None, 'pm10': float(item.get('pm10Value')) if item.get('pm10Value') else None, 'so2': float(item.get('so2Value')) if item.get('so2Value') else None, 'co': float(item.get('coValue')) if item.get('coValue') else None, 'no2': float(item.get('no2Value')) if item.get('no2Value') else None, 'o3': float(item.get('o3Value')) if item.get('o3Value') else None, } try: if self.debug: print(f"[DEBUG] {item_date} [{station}] DB 저장 시도: {data}") continue if self.force_update: stmt = mysql_insert(self.table).values(**data) on_dup_stmt = stmt.on_duplicate_key_update(**data) conn.execute(on_dup_stmt) print(f"[INFO] {item_date} [{station}] 저장/업데이트 완료 (강제업데이트)") else: sel = select(self.table.c.date).where( and_(self.table.c.date == item_date, self.table.c.station == station) ) exists = conn.execute(sel).fetchone() if exists: print(f"[INFO] {item_date} [{station}] 이미 존재. 저장 생략") continue conn.execute(self.table.insert().values(**data)) print(f"[INFO] {item_date} [{station}] 저장 완료") except IntegrityError as e: print(f"[ERROR] DB 중복 오류: {e}") except Exception as e: print(f"[ERROR] DB 저장 실패: {e}") traceback.print_exc() def fetch_air_quality_data_range(self, start_date_str, end_date_str, station_name, num_of_rows=100, page_no=1): url = "http://apis.data.go.kr/B552584/ArpltnStatsSvc/getMsrstnAcctoRDyrg" params = { 'serviceKey': self.api_key, 'returnType': 'json', 'numOfRows': str(num_of_rows), 'pageNo': str(page_no), 'inqBginDt': start_date_str, 'inqEndDt': end_date_str, 'msrstnName': station_name, } resp = None try: resp = self.session.get(url, params=params, timeout=20) resp.raise_for_status() data = resp.json() return data.get('response', {}).get('body', {}).get('items', []) except Exception as e: body_preview = None try: if resp is not None: body_preview = resp.text[:1000] except Exception: body_preview = None print(f"[ERROR] 요청 실패: {e} status={getattr(resp, 'status_code', 'n/a')} body_preview={body_preview}") traceback.print_exc() return [] def test_num_of_rows(self, station_name, date_str): # 캐시 확인 try: cache_key = f"{station_name}:{date_str}" if cache_key in self._num_rows_cache: val = int(self._num_rows_cache[cache_key]) print(f"[INFO] 캐시된 numOfRows 사용: {val} ({cache_key})") return val except Exception: pass max_rows = 1000 min_rows = 100 while max_rows >= min_rows: resp = None try: url = "http://apis.data.go.kr/B552584/ArpltnStatsSvc/getMsrstnAcctoRDyrg" params = { 'serviceKey': self.api_key, 'returnType': 'json', 'numOfRows': str(max_rows), 'pageNo': '1', 'inqBginDt': date_str, 'inqEndDt': date_str, 'msrstnName': station_name, } resp = self.session.get(url, params=params, timeout=20) resp.raise_for_status() resp.json() # 성공하면 해당 max_rows 사용 가능 print(f"[INFO] numOfRows 최대값 탐색 성공: {max_rows}") # 캐시에 저장 try: self._num_rows_cache[f"{station_name}:{date_str}"] = max_rows self._save_num_rows_cache() except Exception: pass return max_rows except Exception as e: body_preview = None try: if resp is not None: body_preview = resp.text[:500] except Exception: body_preview = None print(f"[WARN] numOfRows={max_rows} 실패: {e}, body_preview={body_preview} 100 감소 후 재시도") max_rows -= 100 print("[WARN] 최소 numOfRows 값(100)도 실패했습니다. 기본값 100 사용") return 100 def run(self): with self.engine.begin() as conn: for station_name in self.station_list: print(f"\n[INFO] 측정소: {station_name}") latest_date = self.get_latest_date(conn, station_name) start_date = max(self.start_date, latest_date + timedelta(days=1)) if latest_date else self.start_date if start_date > self.yesterday: print(f"[INFO] {station_name}: 최신 데이터가 이미 존재합니다. ({latest_date})") continue optimal_num_rows = self.test_num_of_rows(station_name, start_date.strftime('%Y%m%d')) print(f"[INFO] 최적 numOfRows: {optimal_num_rows}") current_start = start_date while current_start <= self.yesterday: current_end = min(current_start + timedelta(days=optimal_num_rows - 1), self.yesterday) start_str = current_start.strftime('%Y%m%d') end_str = current_end.strftime('%Y%m%d') print(f"[INFO] {station_name}: {start_str} ~ {end_str} 데이터 요청 중...") items = self.fetch_air_quality_data_range(start_str, end_str, station_name, optimal_num_rows) if items: self.save_data_to_db(items, conn, station_name) else: print(f"[INFO] {station_name}: {start_str} ~ {end_str} 데이터 없음") current_start = current_end + timedelta(days=1) print("\n[INFO] 모든 측정소 데이터 처리 완료") if __name__ == '__main__': config = db.load_config() collector = AirQualityCollector(config, db.engine, db_schema.air) collector.run()