diff --git a/lib/air_quality.py b/lib/air_quality.py index 393eccf..5dd7d75 100644 --- a/lib/air_quality.py +++ b/lib/air_quality.py @@ -1,22 +1,57 @@ +import sys, os +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + import requests from datetime import datetime, timedelta from sqlalchemy import select, func from sqlalchemy.exc import IntegrityError +from sqlalchemy.dialects.mysql import insert as mysql_insert + from lib import db from lib import db_schema -def fetch_air_quality_data(date_str, api_key, station_name='운정'): - """ - API에서 지정된 날짜의 대기환경 데이터를 가져옴 - """ + +def test_num_of_rows(api_key, station_name='운정', date_str=None): + if date_str is None: + date_str = (datetime.now() - timedelta(days=1)).strftime('%Y%m%d') + + max_rows = 500 + min_rows = 100 + + while max_rows >= min_rows: + try: + url = "http://apis.data.go.kr/B552584/ArpltnStatsSvc/getMsrstnAcctoRDyrg" + params = { + 'serviceKey': api_key, + 'returnType': 'json', + 'numOfRows': str(max_rows), + 'pageNo': '1', + 'inqBginDt': date_str, + 'inqEndDt': date_str, + 'msrstnName': station_name, + } + resp = requests.get(url, params=params, timeout=10) + resp.raise_for_status() + resp.json() + print(f"[INFO] numOfRows 최대값 탐색 성공: {max_rows}") + return max_rows + + except (requests.RequestException, ValueError) as e: + print(f"[WARN] numOfRows={max_rows} 에러 발생: {e}, 100 감소 후 재시도") + max_rows -= 100 + + print("[WARN] 최소 numOfRows 값(100)도 실패했습니다. 기본값 100 사용") + return 100 + +def fetch_air_quality_data_range(start_date_str, end_date_str, api_key, num_of_rows=100, station_name='운정', page_no=1): url = "http://apis.data.go.kr/B552584/ArpltnStatsSvc/getMsrstnAcctoRDyrg" params = { 'serviceKey': api_key, 'returnType': 'json', - 'numOfRows': '100', - 'pageNo': '1', - 'inqBginDt': date_str, - 'inqEndDt': date_str, + 'numOfRows': str(num_of_rows), + 'pageNo': str(page_no), + 'inqBginDt': start_date_str, + 'inqEndDt': end_date_str, 'msrstnName': station_name, } @@ -33,10 +68,7 @@ def fetch_air_quality_data(date_str, api_key, station_name='운정'): print(f"[ERROR] JSON 파싱 실패: {e}") return [] -def save_data_to_db(items, conn, table, debug=False): - """ - 대기환경 정보를 DB에 저장 - """ +def save_data_to_db(items, conn, table, force_update=False, debug=False): for item in items: try: item_date = datetime.strptime(item['msurDt'], '%Y-%m-%d').date() @@ -44,45 +76,40 @@ def save_data_to_db(items, conn, table, debug=False): print(f"[ERROR] 날짜 파싱 오류: {item.get('msurDt')} → {e}") continue - if debug: - print(f"[DEBUG] 처리 중: {item}") - - # 중복 확인 - sel = select(table.c.date).where(table.c.date == item_date) - try: - exists = conn.execute(sel).fetchone() - except Exception as e: - print(f"[ERROR] SELECT 오류: {e}") - continue - - if exists: - if debug: - print(f"[DEBUG] {item_date} 이미 존재. 저장 생략") - continue - - ins = table.insert().values( - date=item_date, - pm25=float(item.get('pm25Value')) if item.get('pm25Value') else None, - pm10=float(item.get('pm10Value')) if item.get('pm10Value') else None, - so2=float(item.get('so2Value')) if item.get('so2Value') else None, - co=float(item.get('coValue')) if item.get('coValue') else None, - no2=float(item.get('no2Value')) if item.get('no2Value') else None, - o3=float(item.get('o3Value')) if item.get('o3Value') else None, - ) + data = { + 'date': item_date, + 'pm25': float(item.get('pm25Value')) if item.get('pm25Value') else None, + 'pm10': float(item.get('pm10Value')) if item.get('pm10Value') else None, + 'so2': float(item.get('so2Value')) if item.get('so2Value') else None, + 'co': float(item.get('coValue')) if item.get('coValue') else None, + 'no2': float(item.get('no2Value')) if item.get('no2Value') else None, + 'o3': float(item.get('o3Value')) if item.get('o3Value') else None, + } try: - conn.execute(ins) - if debug: - print(f"[DEBUG] {item_date} 저장 완료") + if force_update: + stmt = mysql_insert(table).values(**data) + on_duplicate_stmt = stmt.on_duplicate_key_update(**data) + conn.execute(on_duplicate_stmt) + if debug: + print(f"[DEBUG] {item_date} 저장/업데이트 완료 (강제업데이트)") + else: + sel = select(table.c.date).where(table.c.date == item_date) + exists = conn.execute(sel).fetchone() + if exists: + if debug: + print(f"[DEBUG] {item_date} 이미 존재. 저장 생략") + continue + ins = table.insert().values(**data) + conn.execute(ins) + if debug: + print(f"[DEBUG] {item_date} 저장 완료") except IntegrityError as e: print(f"[ERROR] DB 중복 오류: {e}") except Exception as e: print(f"[ERROR] DB 저장 실패: {e}") def get_latest_date(conn, table): - """ - 테이블에서 가장 최근 날짜를 반환. 값이 없으면 None - """ try: sel = select(func.max(table.c.date)) result = conn.execute(sel).scalar() @@ -97,40 +124,47 @@ def main(): config = db.load_config() api_key = config['air_quality_api']['service_key'] debug = config.get('debug', False) + force_update = config.get('force_update', False) engine = db.engine table = db_schema.fg_manager_static_air with engine.connect() as conn: - latest_date = get_latest_date(conn, table) + trans = conn.begin() + try: + start_date = datetime.strptime("2017-01-01", "%Y-%m-%d").date() + end_date = (datetime.now() - timedelta(days=1)).date() - if latest_date is None: - start_date = datetime.strptime("2023-01-01", "%Y-%m-%d").date() - else: - start_date = latest_date + timedelta(days=1) - - end_date = (datetime.now() - timedelta(days=1)).date() - - if debug: - print(f"[DEBUG] 최신 DB 날짜: {latest_date}") - print(f"[DEBUG] 수집 시작일: {start_date}") - print(f"[DEBUG] 수집 종료일: {end_date}") - - current_date = start_date - while current_date <= end_date: - date_str = current_date.strftime('%Y%m%d') + optimal_num_rows = test_num_of_rows(api_key, date_str=start_date.strftime('%Y%m%d')) if debug: - print(f"[DEBUG] {date_str} 데이터 요청 중...") + print(f"[DEBUG] 최적 numOfRows 값: {optimal_num_rows}") - items = fetch_air_quality_data(date_str, api_key) + current_start = start_date + while current_start <= end_date: + current_end = current_start + timedelta(days=optimal_num_rows - 1) + if current_end > end_date: + current_end = end_date + + start_str = current_start.strftime('%Y%m%d') + end_str = current_end.strftime('%Y%m%d') - if items: - save_data_to_db(items, conn, table, debug=debug) - else: if debug: - print(f"[DEBUG] {date_str} 데이터 없음") + print(f"[DEBUG] {start_str} ~ {end_str} 데이터 요청 중...") - current_date += timedelta(days=1) + items = fetch_air_quality_data_range(start_str, end_str, api_key, num_of_rows=optimal_num_rows) + + if items: + save_data_to_db(items, conn, table, force_update=force_update, debug=debug) + else: + if debug: + print(f"[DEBUG] {start_str} ~ {end_str} 데이터 없음") + + current_start = current_end + timedelta(days=1) + + trans.commit() + except Exception as e: + trans.rollback() + print(f"[ERROR] 트랜잭션 처리 중 오류 발생: {e}") if __name__ == '__main__': main()