From 113e5b8460038a43231cf547bdfd6e280abea6e2 Mon Sep 17 00:00:00 2001 From: KWON Date: Mon, 7 Jul 2025 13:35:27 +0900 Subject: [PATCH] =?UTF-8?q?=EA=B8=B0=EC=83=81=EC=A0=95=EB=B3=B4=20?= =?UTF-8?q?=EB=B6=80=EB=B6=84=EC=97=90=EC=84=9C=20'=EC=9A=B4=EC=A0=95'=20?= =?UTF-8?q?=EA=B8=B0=EB=B3=B8=EA=B0=92=EC=9D=B8=20=EB=B6=80=EB=B6=84?= =?UTF-8?q?=EC=9D=84=20config=EC=97=90=EC=84=9C=20=EC=A0=95=EC=9D=98?= =?UTF-8?q?=ED=95=98=EB=8F=84=EB=A1=9D=20=ED=95=98=EA=B3=A0,=20=ED=95=B4?= =?UTF-8?q?=EB=8B=B9=20=EA=B0=92=EC=9D=84=20=EB=94=95=EC=85=94=EB=84=88?= =?UTF-8?q?=EB=A6=AC=EB=A1=9C=20=EB=B0=9B=EC=95=84=EC=84=9C=20=EC=97=AC?= =?UTF-8?q?=EB=9F=AC=20=EC=A7=80=EC=97=AD=EC=A0=95=EB=B3=B4=EB=A5=BC=20?= =?UTF-8?q?=EC=97=85=EB=8D=B0=EC=9D=B4=ED=8A=B8=ED=95=A0=20=EC=88=98=20?= =?UTF-8?q?=EC=9E=88=EB=8F=84=EB=A1=9D=20=EC=88=98=EC=A0=95=ED=95=A8.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/air_quality.py | 77 +++++++++++++++++++++++----------------------- 1 file changed, 38 insertions(+), 39 deletions(-) diff --git a/lib/air_quality.py b/lib/air_quality.py index 3b7ed2f..6d0fc27 100644 --- a/lib/air_quality.py +++ b/lib/air_quality.py @@ -3,13 +3,11 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) import requests from datetime import datetime, timedelta -from sqlalchemy import select, func +from sqlalchemy import select, func, and_ from sqlalchemy.exc import IntegrityError from sqlalchemy.dialects.mysql import insert as mysql_insert -from lib import db -from lib import db_schema - +from conf import db, db_schema def test_num_of_rows(api_key, station_name, date_str=None): if date_str is None: @@ -35,7 +33,6 @@ def test_num_of_rows(api_key, station_name, date_str=None): resp.json() print(f"[INFO] numOfRows 최대값 탐색 성공: {max_rows}") return max_rows - except (requests.RequestException, ValueError) as e: print(f"[WARN] numOfRows={max_rows} 에러 발생: {e}, 100 감소 후 재시도") max_rows -= 100 @@ -68,7 +65,7 @@ def fetch_air_quality_data_range(start_date_str, end_date_str, api_key, station_ print(f"[ERROR] JSON 파싱 실패: {e}") return [] -def save_data_to_db(items, conn, table, force_update=False, debug=False): +def save_data_to_db(items, conn, table, station, force_update=False, debug=False): for item in items: try: item_date = datetime.strptime(item['msurDt'], '%Y-%m-%d').date() @@ -78,6 +75,7 @@ def save_data_to_db(items, conn, table, force_update=False, debug=False): data = { 'date': item_date, + 'station': station, 'pm25': float(item.get('pm25Value')) if item.get('pm25Value') else None, 'pm10': float(item.get('pm10Value')) if item.get('pm10Value') else None, 'so2': float(item.get('so2Value')) if item.get('so2Value') else None, @@ -87,34 +85,35 @@ def save_data_to_db(items, conn, table, force_update=False, debug=False): } try: + if debug: + print(f"[DEBUG] {item_date} [{station}] → DB 저장 시도: {data}") + continue + if force_update: stmt = mysql_insert(table).values(**data) on_duplicate_stmt = stmt.on_duplicate_key_update(**data) conn.execute(on_duplicate_stmt) - if debug: - print(f"[DEBUG] {item_date} 저장/업데이트 완료 (강제업데이트)") + print(f"[INFO] {item_date} [{station}] 저장/업데이트 완료 (강제업데이트)") else: - sel = select(table.c.date).where(table.c.date == item_date) + sel = select(table.c.date).where(and_(table.c.date == item_date, table.c.station == station)) exists = conn.execute(sel).fetchone() if exists: - if debug: - print(f"[DEBUG] {item_date} 이미 존재. 저장 생략") + print(f"[INFO] {item_date} [{station}] 이미 존재. 저장 생략") continue ins = table.insert().values(**data) conn.execute(ins) - if debug: - print(f"[DEBUG] {item_date} 저장 완료") + print(f"[INFO] {item_date} [{station}] 저장 완료") except IntegrityError as e: print(f"[ERROR] DB 중복 오류: {e}") except Exception as e: print(f"[ERROR] DB 저장 실패: {e}") -def get_latest_date(conn, table): +def get_latest_date(conn, table, station): try: - sel = select(func.max(table.c.date)) + sel = select(func.max(table.c.date)).where(table.c.station == station) result = conn.execute(sel).scalar() if result is None: - print("[INFO] 테이블에 데이터가 없습니다.") + print(f"[INFO] {station}: 테이블에 데이터가 없습니다.") return result except Exception as e: print(f"[ERROR] 가장 최근 날짜 조회 실패: {e}") @@ -123,49 +122,49 @@ def get_latest_date(conn, table): def main(): config = db.load_config() api_key = config['DATA_API']['serviceKey'] - station_name = config['DATA_API']['air']['station_name'] + station_list = config['DATA_API']['air']['station_name'] debug = config.get('debug', False) force_update = config.get('force_update', False) + config_start_date = datetime.strptime(config['DATA_API']['startDt'], '%Y%m%d').date() + yesterday = (datetime.now() - timedelta(days=1)).date() engine = db.engine table = db_schema.fg_manager_static_air with engine.connect() as conn: - trans = conn.begin() - try: - start_date = datetime.strptime("2020-01-01", "%Y-%m-%d").date() - end_date = (datetime.now() - timedelta(days=1)).date() + for station_name in station_list: + print(f"\n[INFO] 측정소: {station_name}") - optimal_num_rows = test_num_of_rows(api_key, date_str=start_date.strftime('%Y%m%d')) - if debug: - print(f"[DEBUG] 최적 numOfRows 값: {optimal_num_rows}") + latest_date = get_latest_date(conn, table, station_name) + if latest_date: + start_date = max(config_start_date, latest_date + timedelta(days=1)) + else: + start_date = config_start_date + + if start_date > yesterday: + print(f"[INFO] {station_name}: 최신 데이터가 이미 존재합니다. ({latest_date})") + continue + + optimal_num_rows = test_num_of_rows(api_key, station_name=station_name, date_str=start_date.strftime('%Y%m%d')) + print(f"[INFO] 최적 numOfRows: {optimal_num_rows}") current_start = start_date - while current_start <= end_date: - current_end = current_start + timedelta(days=optimal_num_rows - 1) - if current_end > end_date: - current_end = end_date - + while current_start <= yesterday: + current_end = min(current_start + timedelta(days=optimal_num_rows - 1), yesterday) start_str = current_start.strftime('%Y%m%d') end_str = current_end.strftime('%Y%m%d') - if debug: - print(f"[DEBUG] {start_str} ~ {end_str} 데이터 요청 중...") - + print(f"[INFO] {station_name}: {start_str} ~ {end_str} 데이터 요청 중...") items = fetch_air_quality_data_range(start_str, end_str, api_key, station_name, num_of_rows=optimal_num_rows) if items: - save_data_to_db(items, conn, table, force_update=force_update, debug=debug) + save_data_to_db(items, conn, table, station=station_name, force_update=force_update, debug=debug) else: - if debug: - print(f"[DEBUG] {start_str} ~ {end_str} 데이터 없음") + print(f"[INFO] {station_name}: {start_str} ~ {end_str} 데이터 없음") current_start = current_end + timedelta(days=1) - trans.commit() - except Exception as e: - trans.rollback() - print(f"[ERROR] 트랜잭션 처리 중 오류 발생: {e}") + print("[INFO] 모든 측정소 데이터 처리 완료") if __name__ == '__main__': main()