import sys, os sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) import yaml import requests from datetime import datetime, timedelta, date from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy import select from conf import db, db_schema CONFIG_PATH = os.path.join(os.path.dirname(__file__), "../conf/config.yaml") def load_config(): with open(CONFIG_PATH, encoding="utf-8") as f: return yaml.safe_load(f) def fetch_data_range_chunks(start_dt, end_dt, chunk_days=10): current_start = datetime.strptime(start_dt, "%Y%m%d") end_date = datetime.strptime(end_dt, "%Y%m%d") while current_start <= end_date: current_end = min(current_start + timedelta(days=chunk_days - 1), end_date) yield current_start.strftime("%Y%m%d"), current_end.strftime("%Y%m%d") current_start = current_end + timedelta(days=1) def fetch_asos_data(stn_id, start_dt, end_dt, service_key): url = "http://apis.data.go.kr/1360000/AsosDalyInfoService/getWthrDataList" params = { "serviceKey": service_key, "pageNo": "1", "numOfRows": "500", "dataType": "JSON", "dataCd": "ASOS", "dateCd": "DAY", "startDt": start_dt, "endDt": end_dt, "stnIds": str(stn_id), } headers = { "User-Agent": "Mozilla/5.0", "Accept": "application/json" } try: resp = requests.get(url, params=params, headers=headers, timeout=15) resp.raise_for_status() data = resp.json() items = data.get("response", {}).get("body", {}).get("items", {}).get("item", []) if items is None: return [] if isinstance(items, dict): items = [items] return items except Exception as e: print(f"[ERROR] API 요청 실패: {e}") return [] def save_items_to_db(items, conn, table, force_update=False, debug=False): hrmt_keys = [ "minTaHrmt", "maxTaHrmt", "mi10MaxRnHrmt", "hr1MaxRnHrmt", "maxInsWsHrmt", "maxWsHrmt", "minRhmHrmt", "maxPsHrmt", "minPsHrmt", "hr1MaxIcsrHrmt", "ddMefsHrmt", "ddMesHrmt" ] for item in items: try: date_str = item.get("tm") if not date_str: continue record_date = datetime.strptime(date_str, "%Y-%m-%d").date() data = {"date": record_date} for key in table.c.keys(): if key == "date": continue value = item.get(key) if value in ["", None, "-"]: data[key] = None else: try: if key in hrmt_keys or key == "iscs": fval = float(value) if fval < 0: data[key] = None else: data[key] = str(int(fval)) elif key == "stnId": data[key] = int(float(value)) else: data[key] = float(value) except (ValueError, TypeError): data[key] = None if debug: print(f"[DEBUG] {record_date} → DB 저장 시도: {data}") continue if force_update: stmt = mysql_insert(table).values(**data) stmt = stmt.on_duplicate_key_update(**data) conn.execute(stmt) print(f"[INFO] {record_date} 저장/업데이트 완료") else: sel = select(table.c.date).where(table.c.date == record_date) if conn.execute(sel).fetchone(): print(f"[INFO] {record_date} 이미 존재, 저장 생략") continue conn.execute(table.insert().values(**data)) print(f"[INFO] {record_date} 저장 완료") except Exception as e: print(f"[ERROR] 저장 실패: {e}") raise def get_latest_date_from_db(conn, table): sel = select(table.c.date).order_by(table.c.date.desc()).limit(1) result = conn.execute(sel).fetchone() if result: return result[0] return None def main(): config = load_config() service_key = config["DATA_API"]["serviceKey"] stn_ids = config["DATA_API"]["weather"]["stnIds"] debug = config.get("debug", False) force_update = config.get("force_update", False) table = db_schema.weather engine = db.engine now = datetime.now() today = now.date() if now.hour < 11: end_date = today - timedelta(days=2) else: end_date = today - timedelta(days=1) config_start_date = datetime.strptime(config["DATA_API"]["startDt"], "%Y%m%d").date() chunk_days = 1000 with engine.begin() as conn: latest_date = get_latest_date_from_db(conn, table) if latest_date is None: start_date = config_start_date else: start_date = max(config_start_date, latest_date + timedelta(days=1)) if start_date > end_date: if debug: print("[INFO] 최신 데이터가 이미 존재하거나 요청할 데이터가 없습니다.") return start_dt = start_date.strftime("%Y%m%d") end_dt = end_date.strftime("%Y%m%d") for stn_id in stn_ids: for chunk_start, chunk_end in fetch_data_range_chunks(start_dt, end_dt, chunk_days): if debug: print(f"[INFO] 지점 {stn_id} 데이터 요청 중: {chunk_start} ~ {chunk_end}") items = fetch_asos_data(stn_id, chunk_start, chunk_end, service_key) if items: save_items_to_db(items, conn, table, force_update, debug) else: print(f"[WARN] 지점 {stn_id} {chunk_start}~{chunk_end} 데이터 없음 또는 요청 실패") if __name__ == "__main__": main()