From 4850cac24bc598d54cfff9b88a1fc8b628dec0f1 Mon Sep 17 00:00:00 2001 From: KWON Date: Fri, 4 Jul 2025 16:50:21 +0900 Subject: [PATCH] =?UTF-8?q?=EB=8C=80=EA=B8=B0=ED=99=98=EA=B2=BD=EC=A0=95?= =?UTF-8?q?=EB=B3=B4=20api=20=EB=B6=88=EB=9F=AC=EC=99=80=20DB=EC=97=90=20?= =?UTF-8?q?=EB=84=A3=EA=B8=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/air_quality.py | 136 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 136 insertions(+) create mode 100644 lib/air_quality.py diff --git a/lib/air_quality.py b/lib/air_quality.py new file mode 100644 index 0000000..393eccf --- /dev/null +++ b/lib/air_quality.py @@ -0,0 +1,136 @@ +import requests +from datetime import datetime, timedelta +from sqlalchemy import select, func +from sqlalchemy.exc import IntegrityError +from lib import db +from lib import db_schema + +def fetch_air_quality_data(date_str, api_key, station_name='운정'): + """ + API에서 지정된 날짜의 대기환경 데이터를 가져옴 + """ + url = "http://apis.data.go.kr/B552584/ArpltnStatsSvc/getMsrstnAcctoRDyrg" + params = { + 'serviceKey': api_key, + 'returnType': 'json', + 'numOfRows': '100', + 'pageNo': '1', + 'inqBginDt': date_str, + 'inqEndDt': date_str, + 'msrstnName': station_name, + } + + try: + resp = requests.get(url, params=params, timeout=10) + resp.raise_for_status() + data = resp.json() + items = data.get('response', {}).get('body', {}).get('items', []) + return items + except requests.RequestException as e: + print(f"[ERROR] 요청 실패: {e}") + return [] + except ValueError as e: + print(f"[ERROR] JSON 파싱 실패: {e}") + return [] + +def save_data_to_db(items, conn, table, debug=False): + """ + 대기환경 정보를 DB에 저장 + """ + for item in items: + try: + item_date = datetime.strptime(item['msurDt'], '%Y-%m-%d').date() + except Exception as e: + print(f"[ERROR] 날짜 파싱 오류: {item.get('msurDt')} → {e}") + continue + + if debug: + print(f"[DEBUG] 처리 중: {item}") + + # 중복 확인 + sel = select(table.c.date).where(table.c.date == item_date) + try: + exists = conn.execute(sel).fetchone() + except Exception as e: + print(f"[ERROR] SELECT 오류: {e}") + continue + + if exists: + if debug: + print(f"[DEBUG] {item_date} 이미 존재. 저장 생략") + continue + + ins = table.insert().values( + date=item_date, + pm25=float(item.get('pm25Value')) if item.get('pm25Value') else None, + pm10=float(item.get('pm10Value')) if item.get('pm10Value') else None, + so2=float(item.get('so2Value')) if item.get('so2Value') else None, + co=float(item.get('coValue')) if item.get('coValue') else None, + no2=float(item.get('no2Value')) if item.get('no2Value') else None, + o3=float(item.get('o3Value')) if item.get('o3Value') else None, + ) + + try: + conn.execute(ins) + if debug: + print(f"[DEBUG] {item_date} 저장 완료") + except IntegrityError as e: + print(f"[ERROR] DB 중복 오류: {e}") + except Exception as e: + print(f"[ERROR] DB 저장 실패: {e}") + +def get_latest_date(conn, table): + """ + 테이블에서 가장 최근 날짜를 반환. 값이 없으면 None + """ + try: + sel = select(func.max(table.c.date)) + result = conn.execute(sel).scalar() + if result is None: + print("[INFO] 테이블에 데이터가 없습니다.") + return result + except Exception as e: + print(f"[ERROR] 가장 최근 날짜 조회 실패: {e}") + return None + +def main(): + config = db.load_config() + api_key = config['air_quality_api']['service_key'] + debug = config.get('debug', False) + + engine = db.engine + table = db_schema.fg_manager_static_air + + with engine.connect() as conn: + latest_date = get_latest_date(conn, table) + + if latest_date is None: + start_date = datetime.strptime("2023-01-01", "%Y-%m-%d").date() + else: + start_date = latest_date + timedelta(days=1) + + end_date = (datetime.now() - timedelta(days=1)).date() + + if debug: + print(f"[DEBUG] 최신 DB 날짜: {latest_date}") + print(f"[DEBUG] 수집 시작일: {start_date}") + print(f"[DEBUG] 수집 종료일: {end_date}") + + current_date = start_date + while current_date <= end_date: + date_str = current_date.strftime('%Y%m%d') + if debug: + print(f"[DEBUG] {date_str} 데이터 요청 중...") + + items = fetch_air_quality_data(date_str, api_key) + + if items: + save_data_to_db(items, conn, table, debug=debug) + else: + if debug: + print(f"[DEBUG] {date_str} 데이터 없음") + + current_date += timedelta(days=1) + +if __name__ == '__main__': + main()