From fde67fe331e3ddfcae80f64137ceb90ab0cf80bf Mon Sep 17 00:00:00 2001 From: KWON Date: Fri, 4 Jul 2025 18:00:40 +0900 Subject: [PATCH] =?UTF-8?q?DB=EC=97=90=EC=84=9C=20=EC=B5=9C=EC=A2=85=20?= =?UTF-8?q?=EB=82=A0=EC=A7=9C=EB=A5=BC=20=ED=99=95=EC=9D=B8=ED=95=98?= =?UTF-8?q?=EA=B3=A0,=20=ED=95=B4=EB=8B=B9=EC=9D=BC=20=EB=8B=A4=EC=9D=8C?= =?UTF-8?q?=EB=82=A0=EB=B6=80=ED=84=B0=20=EC=A0=84=EC=9D=BC=EA=B9=8C?= =?UTF-8?q?=EC=A7=80=20=EA=B8=B0=EC=83=81=EC=A0=95=EB=B3=B4=EB=A5=BC=20?= =?UTF-8?q?=EB=B0=9B=EC=95=84=EC=99=80=20=EC=A0=80=EC=9E=A5=ED=95=A8,=20?= =?UTF-8?q?=EB=A7=8C=EC=95=BD=20=EC=8B=A4=ED=96=89=EC=8B=9C=EA=B0=84?= =?UTF-8?q?=EC=9D=B4=20=EC=98=A4=EC=A0=84=2011=EC=8B=9C=20=EC=9D=B4?= =?UTF-8?q?=EC=A0=84=EC=9D=B4=EB=9D=BC=EB=A9=B4,=20=EC=A0=84=EC=A0=84?= =?UTF-8?q?=EB=82=A0=20=EB=8D=B0=EC=9D=B4=ED=84=B0=EB=A5=BC=20=EC=A0=80?= =?UTF-8?q?=EC=9E=A5=ED=95=A8.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/weather_asos.py | 182 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 182 insertions(+) create mode 100644 lib/weather_asos.py diff --git a/lib/weather_asos.py b/lib/weather_asos.py new file mode 100644 index 0000000..2804b09 --- /dev/null +++ b/lib/weather_asos.py @@ -0,0 +1,182 @@ +import sys, os +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +import yaml +from datetime import datetime, timedelta, date +import requests +from sqlalchemy.dialects.mysql import insert as mysql_insert +from sqlalchemy import select +from lib import db +from lib import db_schema + +CONFIG_PATH = os.path.join(os.path.dirname(__file__), "../conf/config.yaml") + +def load_config(): + with open(CONFIG_PATH, encoding="utf-8") as f: + return yaml.safe_load(f) + +def fetch_data_range_chunks(start_dt, end_dt, chunk_days=10): + """ + YYYYMMDD 형식 날짜 문자열을 받아 + chunk_days 단위로 (start, end) 튜플 반복자 생성 + """ + current_start = datetime.strptime(start_dt, "%Y%m%d") + end_date = datetime.strptime(end_dt, "%Y%m%d") + + while current_start <= end_date: + current_end = min(current_start + timedelta(days=chunk_days - 1), end_date) + yield current_start.strftime("%Y%m%d"), current_end.strftime("%Y%m%d") + current_start = current_end + timedelta(days=1) + +def fetch_asos_data(stn_id, start_dt, end_dt, service_key): + url = "http://apis.data.go.kr/1360000/AsosDalyInfoService/getWthrDataList" # HTTP로 변경 + + params = { + "serviceKey": service_key, + "pageNo": "1", + "numOfRows": "500", + "dataType": "JSON", + "dataCd": "ASOS", + "dateCd": "DAY", + "startDt": start_dt, + "endDt": end_dt, + "stnIds": str(stn_id), + } + + headers = { + "User-Agent": "Mozilla/5.0", + "Accept": "application/json" + } + + try: + resp = requests.get(url, params=params, headers=headers, timeout=15) + resp.raise_for_status() + data = resp.json() + items = data.get("response", {}).get("body", {}).get("items", {}).get("item", []) + + if items is None: + return [] + if isinstance(items, dict): + items = [items] + return items + + except Exception as e: + print(f"[ERROR] API 요청 실패: {e}") + return [] + +def save_items_to_db(items, conn, table, force_update=False, debug=False): + hrmt_keys = [ + "minTaHrmt", "maxTaHrmt", "mi10MaxRnHrmt", "hr1MaxRnHrmt", + "maxInsWsHrmt", "maxWsHrmt", "minRhmHrmt", "maxPsHrmt", + "minPsHrmt", "hr1MaxIcsrHrmt", "ddMefsHrmt", "ddMesHrmt" + ] + + for item in items: + try: + date_str = item.get("tm") + if not date_str: + continue + record_date = datetime.strptime(date_str, "%Y-%m-%d").date() + + data = {"date": record_date} + + for key in table.c.keys(): + if key == "date": + continue + value = item.get(key) + if value in ["", None, "-"]: + data[key] = None + else: + try: + if key in hrmt_keys or key == "iscs": + fval = float(value) + if fval < 0: + data[key] = None + else: + data[key] = str(int(fval)) + elif key == "stnId": + data[key] = int(float(value)) + else: + data[key] = float(value) + except (ValueError, TypeError): + data[key] = None + + if force_update: + stmt = mysql_insert(table).values(**data) + stmt = stmt.on_duplicate_key_update(**data) + conn.execute(stmt) + if debug: + print(f"[DEBUG] {record_date} 저장/업데이트 완료") + else: + sel = select(table.c.date).where(table.c.date == record_date) + if conn.execute(sel).fetchone(): + if debug: + print(f"[DEBUG] {record_date} 이미 존재, 저장 생략") + continue + conn.execute(table.insert().values(**data)) + if debug: + print(f"[DEBUG] {record_date} 저장 완료") + + except Exception as e: + print(f"[ERROR] 저장 실패: {e}") + raise + +def get_latest_date_from_db(conn, table): + """DB에서 가장 최신 date 값을 조회""" + sel = select(table.c.date).order_by(table.c.date.desc()).limit(1) + result = conn.execute(sel).fetchone() + if result: + return result[0] + return None + +def main(): + config = load_config() + service_key = config["weather_api"]["serviceKey"] + stn_ids = config["weather_api"]["stnIds"] + debug = config.get("debug", False) + force_update = config.get("force_update", False) + + table = db_schema.fg_manager_static_weather + engine = db.engine + + # 현재 날짜 및 시간 판단 + now = datetime.now() + today = now.date() + + # 오전 11시 이전이면 전전날까지, 아니면 전날까지 데이터 요청 + if now.hour < 11: + end_date = today - timedelta(days=2) + else: + end_date = today - timedelta(days=1) + + default_start = datetime.strptime("20170101", "%Y%m%d").date() + + chunk_days = 1000 + + with engine.begin() as conn: + latest_date = get_latest_date_from_db(conn, table) + if latest_date is None: + start_date = default_start + else: + start_date = latest_date + timedelta(days=1) + + if start_date > end_date: + if debug: + print("[INFO] 최신 데이터가 이미 존재하거나 요청할 데이터가 없습니다.") + return + + start_dt = start_date.strftime("%Y%m%d") + end_dt = end_date.strftime("%Y%m%d") + + for stn_id in stn_ids: + for chunk_start, chunk_end in fetch_data_range_chunks(start_dt, end_dt, chunk_days): + if debug: + print(f"[INFO] 지점 {stn_id} 데이터 요청 중: {chunk_start} ~ {chunk_end}") + items = fetch_asos_data(stn_id, chunk_start, chunk_end, service_key) + if items: + save_items_to_db(items, conn, table, force_update, debug) + else: + print(f"[WARN] 지점 {stn_id} {chunk_start}~{chunk_end} 데이터 없음 또는 요청 실패") + +if __name__ == "__main__": + main()