diff --git a/lib/air_quality.py b/lib/air_quality.py index 9d8a503..b4a80cf 100644 --- a/lib/air_quality.py +++ b/lib/air_quality.py @@ -1,170 +1,167 @@ -import sys, os -sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) - +import os, sys import requests from datetime import datetime, timedelta from sqlalchemy import select, func, and_ -from sqlalchemy.exc import IntegrityError from sqlalchemy.dialects.mysql import insert as mysql_insert +from sqlalchemy.exc import IntegrityError +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from conf import db, db_schema -def test_num_of_rows(api_key, station_name, date_str=None): - if date_str is None: - date_str = (datetime.now() - timedelta(days=1)).strftime('%Y%m%d') - max_rows = 1000 - min_rows = 100 +class AirQualityCollector: + def __init__(self, config, engine, table): + self.api_key = config['DATA_API']['serviceKey'] + self.station_list = config['DATA_API']['air']['station_name'] + self.start_date = datetime.strptime(config['DATA_API']['startDt'], '%Y%m%d').date() + self.debug = config.get('debug', False) + self.force_update = config.get('force_update', False) - while max_rows >= min_rows: + self.engine = engine + self.table = table + self.yesterday = (datetime.now() - timedelta(days=1)).date() + + def get_latest_date(self, conn, station): try: - url = "http://apis.data.go.kr/B552584/ArpltnStatsSvc/getMsrstnAcctoRDyrg" - params = { - 'serviceKey': api_key, - 'returnType': 'json', - 'numOfRows': str(max_rows), - 'pageNo': '1', - 'inqBginDt': date_str, - 'inqEndDt': date_str, - 'msrstnName': station_name, - } - resp = requests.get(url, params=params, timeout=10) - resp.raise_for_status() - resp.json() - print(f"[INFO] numOfRows 최대값 탐색 성공: {max_rows}") - return max_rows - except (requests.RequestException, ValueError) as e: - print(f"[WARN] numOfRows={max_rows} 에러 발생: {e}, 100 감소 후 재시도") - max_rows -= 100 - - print("[WARN] 최소 numOfRows 값(100)도 실패했습니다. 기본값 100 사용") - return 100 - -def fetch_air_quality_data_range(start_date_str, end_date_str, api_key, station_name, num_of_rows=100, page_no=1): - url = "http://apis.data.go.kr/B552584/ArpltnStatsSvc/getMsrstnAcctoRDyrg" - params = { - 'serviceKey': api_key, - 'returnType': 'json', - 'numOfRows': str(num_of_rows), - 'pageNo': str(page_no), - 'inqBginDt': start_date_str, - 'inqEndDt': end_date_str, - 'msrstnName': station_name, - } - - try: - resp = requests.get(url, params=params, timeout=10) - resp.raise_for_status() - data = resp.json() - items = data.get('response', {}).get('body', {}).get('items', []) - return items - except requests.RequestException as e: - print(f"[ERROR] 요청 실패: {e}") - return [] - except ValueError as e: - print(f"[ERROR] JSON 파싱 실패: {e}") - return [] - -def save_data_to_db(items, conn, table, station, force_update=False, debug=False): - for item in items: - try: - item_date = datetime.strptime(item['msurDt'], '%Y-%m-%d').date() + stmt = select(func.max(self.table.c.date)).where(self.table.c.station == station) + result = conn.execute(stmt).scalar() + if result is None: + print(f"[INFO] {station}: 테이블에 데이터가 없습니다.") + return result except Exception as e: - print(f"[ERROR] 날짜 파싱 오류: {item.get('msurDt')} → {e}") - continue + print(f"[ERROR] 가장 최근 날짜 조회 실패: {e}") + return None - data = { - 'date': item_date, - 'station': station, - 'pm25': float(item.get('pm25Value')) if item.get('pm25Value') else None, - 'pm10': float(item.get('pm10Value')) if item.get('pm10Value') else None, - 'so2': float(item.get('so2Value')) if item.get('so2Value') else None, - 'co': float(item.get('coValue')) if item.get('coValue') else None, - 'no2': float(item.get('no2Value')) if item.get('no2Value') else None, - 'o3': float(item.get('o3Value')) if item.get('o3Value') else None, + def save_data_to_db(self, items, conn, station): + for item in items: + try: + item_date = datetime.strptime(item['msurDt'], '%Y-%m-%d').date() + except Exception as e: + print(f"[ERROR] 날짜 파싱 오류: {item.get('msurDt')} → {e}") + continue + + data = { + 'date': item_date, + 'station': station, + 'pm25': float(item.get('pm25Value')) if item.get('pm25Value') else None, + 'pm10': float(item.get('pm10Value')) if item.get('pm10Value') else None, + 'so2': float(item.get('so2Value')) if item.get('so2Value') else None, + 'co': float(item.get('coValue')) if item.get('coValue') else None, + 'no2': float(item.get('no2Value')) if item.get('no2Value') else None, + 'o3': float(item.get('o3Value')) if item.get('o3Value') else None, + } + + try: + if self.debug: + print(f"[DEBUG] {item_date} [{station}] → DB 저장 시도: {data}") + continue + + if self.force_update: + stmt = mysql_insert(self.table).values(**data) + on_dup_stmt = stmt.on_duplicate_key_update(**data) + conn.execute(on_dup_stmt) + print(f"[INFO] {item_date} [{station}] 저장/업데이트 완료 (강제업데이트)") + else: + sel = select(self.table.c.date).where( + and_(self.table.c.date == item_date, self.table.c.station == station) + ) + exists = conn.execute(sel).fetchone() + if exists: + print(f"[INFO] {item_date} [{station}] 이미 존재. 저장 생략") + continue + conn.execute(self.table.insert().values(**data)) + print(f"[INFO] {item_date} [{station}] 저장 완료") + except IntegrityError as e: + print(f"[ERROR] DB 중복 오류: {e}") + except Exception as e: + print(f"[ERROR] DB 저장 실패: {e}") + + def fetch_air_quality_data_range(self, start_date_str, end_date_str, station_name, num_of_rows=100, page_no=1): + url = "http://apis.data.go.kr/B552584/ArpltnStatsSvc/getMsrstnAcctoRDyrg" + params = { + 'serviceKey': self.api_key, + 'returnType': 'json', + 'numOfRows': str(num_of_rows), + 'pageNo': str(page_no), + 'inqBginDt': start_date_str, + 'inqEndDt': end_date_str, + 'msrstnName': station_name, } try: - if debug: - print(f"[DEBUG] {item_date} [{station}] → DB 저장 시도: {data}") - continue + resp = requests.get(url, params=params, timeout=10) + resp.raise_for_status() + data = resp.json() + return data.get('response', {}).get('body', {}).get('items', []) + except requests.RequestException as e: + print(f"[ERROR] 요청 실패: {e}") + return [] + except ValueError as e: + print(f"[ERROR] JSON 파싱 실패: {e}") + return [] - if force_update: - stmt = mysql_insert(table).values(**data) - on_duplicate_stmt = stmt.on_duplicate_key_update(**data) - conn.execute(on_duplicate_stmt) - print(f"[INFO] {item_date} [{station}] 저장/업데이트 완료 (강제업데이트)") - else: - sel = select(table.c.date).where(and_(table.c.date == item_date, table.c.station == station)) - exists = conn.execute(sel).fetchone() - if exists: - print(f"[INFO] {item_date} [{station}] 이미 존재. 저장 생략") + def test_num_of_rows(self, station_name, date_str): + max_rows = 1000 + min_rows = 100 + + while max_rows >= min_rows: + try: + url = "http://apis.data.go.kr/B552584/ArpltnStatsSvc/getMsrstnAcctoRDyrg" + params = { + 'serviceKey': self.api_key, + 'returnType': 'json', + 'numOfRows': str(max_rows), + 'pageNo': '1', + 'inqBginDt': date_str, + 'inqEndDt': date_str, + 'msrstnName': station_name, + } + resp = requests.get(url, params=params, timeout=10) + resp.raise_for_status() + resp.json() + print(f"[INFO] numOfRows 최대값 탐색 성공: {max_rows}") + return max_rows + except Exception as e: + print(f"[WARN] numOfRows={max_rows} 실패: {e}, 100 감소 후 재시도") + max_rows -= 100 + + print("[WARN] 최소 numOfRows 값(100)도 실패했습니다. 기본값 100 사용") + return 100 + + def run(self): + with self.engine.begin() as conn: + for station_name in self.station_list: + print(f"\n[INFO] 측정소: {station_name}") + + latest_date = self.get_latest_date(conn, station_name) + start_date = max(self.start_date, latest_date + timedelta(days=1)) if latest_date else self.start_date + + if start_date > self.yesterday: + print(f"[INFO] {station_name}: 최신 데이터가 이미 존재합니다. ({latest_date})") continue - ins = table.insert().values(**data) - conn.execute(ins) - print(f"[INFO] {item_date} [{station}] 저장 완료") - except IntegrityError as e: - print(f"[ERROR] DB 중복 오류: {e}") - except Exception as e: - print(f"[ERROR] DB 저장 실패: {e}") -def get_latest_date(conn, table, station): - try: - sel = select(func.max(table.c.date)).where(table.c.station == station) - result = conn.execute(sel).scalar() - if result is None: - print(f"[INFO] {station}: 테이블에 데이터가 없습니다.") - return result - except Exception as e: - print(f"[ERROR] 가장 최근 날짜 조회 실패: {e}") - return None + optimal_num_rows = self.test_num_of_rows(station_name, start_date.strftime('%Y%m%d')) + print(f"[INFO] 최적 numOfRows: {optimal_num_rows}") -def main(): - config = db.load_config() - api_key = config['DATA_API']['serviceKey'] - station_list = config['DATA_API']['air']['station_name'] - debug = config.get('debug', False) - force_update = config.get('force_update', False) - config_start_date = datetime.strptime(config['DATA_API']['startDt'], '%Y%m%d').date() - yesterday = (datetime.now() - timedelta(days=1)).date() + current_start = start_date + while current_start <= self.yesterday: + current_end = min(current_start + timedelta(days=optimal_num_rows - 1), self.yesterday) + start_str = current_start.strftime('%Y%m%d') + end_str = current_end.strftime('%Y%m%d') - engine = db.engine - table = db_schema.air + print(f"[INFO] {station_name}: {start_str} ~ {end_str} 데이터 요청 중...") + items = self.fetch_air_quality_data_range(start_str, end_str, station_name, optimal_num_rows) - with engine.connect() as conn: - for station_name in station_list: - print(f"\n[INFO] 측정소: {station_name}") + if items: + self.save_data_to_db(items, conn, station_name) + else: + print(f"[INFO] {station_name}: {start_str} ~ {end_str} 데이터 없음") - latest_date = get_latest_date(conn, table, station_name) - if latest_date: - start_date = max(config_start_date, latest_date + timedelta(days=1)) - else: - start_date = config_start_date - - if start_date > yesterday: - print(f"[INFO] {station_name}: 최신 데이터가 이미 존재합니다. ({latest_date})") - continue - - optimal_num_rows = test_num_of_rows(api_key, station_name=station_name, date_str=start_date.strftime('%Y%m%d')) - print(f"[INFO] 최적 numOfRows: {optimal_num_rows}") - - current_start = start_date - while current_start <= yesterday: - current_end = min(current_start + timedelta(days=optimal_num_rows - 1), yesterday) - start_str = current_start.strftime('%Y%m%d') - end_str = current_end.strftime('%Y%m%d') - - print(f"[INFO] {station_name}: {start_str} ~ {end_str} 데이터 요청 중...") - items = fetch_air_quality_data_range(start_str, end_str, api_key, station_name, num_of_rows=optimal_num_rows) - - if items: - save_data_to_db(items, conn, table, station=station_name, force_update=force_update, debug=debug) - else: - print(f"[INFO] {station_name}: {start_str} ~ {end_str} 데이터 없음") - - current_start = current_end + timedelta(days=1) - - print("[INFO] 모든 측정소 데이터 처리 완료") + current_start = current_end + timedelta(days=1) + print("\n[INFO] 모든 측정소 데이터 처리 완료") + if __name__ == '__main__': - main() + config = db.load_config() + collector = AirQualityCollector(config, db.engine, db_schema.air) + collector.run()