From 1bb6817e06dae6ba9a6064ad0eab3b0b8bef5ad7 Mon Sep 17 00:00:00 2001 From: KWON Date: Mon, 7 Jul 2025 13:42:57 +0900 Subject: [PATCH] =?UTF-8?q?=EC=A2=85=EA=B4=80=EA=B8=B0=EC=83=81=EA=B4=80?= =?UTF-8?q?=EC=B8=A1=20=EC=97=85=EB=8D=B0=EC=9D=B4=ED=8A=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/weather_asos.py | 38 ++++++++++++++++---------------------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/lib/weather_asos.py b/lib/weather_asos.py index 2804b09..a11f76c 100644 --- a/lib/weather_asos.py +++ b/lib/weather_asos.py @@ -2,12 +2,12 @@ import sys, os sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) import yaml -from datetime import datetime, timedelta, date import requests +from datetime import datetime, timedelta, date from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy import select -from lib import db -from lib import db_schema + +from conf import db, db_schema CONFIG_PATH = os.path.join(os.path.dirname(__file__), "../conf/config.yaml") @@ -16,10 +16,6 @@ def load_config(): return yaml.safe_load(f) def fetch_data_range_chunks(start_dt, end_dt, chunk_days=10): - """ - YYYYMMDD 형식 날짜 문자열을 받아 - chunk_days 단위로 (start, end) 튜플 반복자 생성 - """ current_start = datetime.strptime(start_dt, "%Y%m%d") end_date = datetime.strptime(end_dt, "%Y%m%d") @@ -29,7 +25,7 @@ def fetch_data_range_chunks(start_dt, end_dt, chunk_days=10): current_start = current_end + timedelta(days=1) def fetch_asos_data(stn_id, start_dt, end_dt, service_key): - url = "http://apis.data.go.kr/1360000/AsosDalyInfoService/getWthrDataList" # HTTP로 변경 + url = "http://apis.data.go.kr/1360000/AsosDalyInfoService/getWthrDataList" params = { "serviceKey": service_key, @@ -101,28 +97,28 @@ def save_items_to_db(items, conn, table, force_update=False, debug=False): except (ValueError, TypeError): data[key] = None + if debug: + print(f"[DEBUG] {record_date} → DB 저장 시도: {data}") + continue + if force_update: stmt = mysql_insert(table).values(**data) stmt = stmt.on_duplicate_key_update(**data) conn.execute(stmt) - if debug: - print(f"[DEBUG] {record_date} 저장/업데이트 완료") + print(f"[INFO] {record_date} 저장/업데이트 완료") else: sel = select(table.c.date).where(table.c.date == record_date) if conn.execute(sel).fetchone(): - if debug: - print(f"[DEBUG] {record_date} 이미 존재, 저장 생략") + print(f"[INFO] {record_date} 이미 존재, 저장 생략") continue conn.execute(table.insert().values(**data)) - if debug: - print(f"[DEBUG] {record_date} 저장 완료") + print(f"[INFO] {record_date} 저장 완료") except Exception as e: print(f"[ERROR] 저장 실패: {e}") raise def get_latest_date_from_db(conn, table): - """DB에서 가장 최신 date 값을 조회""" sel = select(table.c.date).order_by(table.c.date.desc()).limit(1) result = conn.execute(sel).fetchone() if result: @@ -131,34 +127,32 @@ def get_latest_date_from_db(conn, table): def main(): config = load_config() - service_key = config["weather_api"]["serviceKey"] - stn_ids = config["weather_api"]["stnIds"] + service_key = config["DATA_API"]["serviceKey"] + stn_ids = config["DATA_API"]["weather"]["stnIds"] debug = config.get("debug", False) force_update = config.get("force_update", False) table = db_schema.fg_manager_static_weather engine = db.engine - # 현재 날짜 및 시간 판단 now = datetime.now() today = now.date() - # 오전 11시 이전이면 전전날까지, 아니면 전날까지 데이터 요청 if now.hour < 11: end_date = today - timedelta(days=2) else: end_date = today - timedelta(days=1) - default_start = datetime.strptime("20170101", "%Y%m%d").date() + config_start_date = datetime.strptime(config["DATA_API"]["startDt"], "%Y%m%d").date() chunk_days = 1000 with engine.begin() as conn: latest_date = get_latest_date_from_db(conn, table) if latest_date is None: - start_date = default_start + start_date = config_start_date else: - start_date = latest_date + timedelta(days=1) + start_date = max(config_start_date, latest_date + timedelta(days=1)) if start_date > end_date: if debug: