Files
static/lib/weather_asos.py

181 lines
6.3 KiB
Python

import sys, os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import yaml
import requests
from datetime import datetime, timedelta, date
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy import select
from conf import db, db_schema
CONFIG_PATH = os.path.join(os.path.dirname(__file__), "../conf/config.yaml")
def load_config():
with open(CONFIG_PATH, encoding="utf-8") as f:
return yaml.safe_load(f)
def fetch_data_range_chunks(start_dt, end_dt, chunk_days=10):
current_start = datetime.strptime(start_dt, "%Y%m%d")
end_date = datetime.strptime(end_dt, "%Y%m%d")
while current_start <= end_date:
current_end = min(current_start + timedelta(days=chunk_days - 1), end_date)
yield current_start.strftime("%Y%m%d"), current_end.strftime("%Y%m%d")
current_start = current_end + timedelta(days=1)
def fetch_asos_data(stn_id, start_dt, end_dt, service_key):
url = "http://apis.data.go.kr/1360000/AsosDalyInfoService/getWthrDataList"
params = {
"serviceKey": service_key,
"pageNo": "1",
"numOfRows": "500",
"dataType": "JSON",
"dataCd": "ASOS",
"dateCd": "DAY",
"startDt": start_dt,
"endDt": end_dt,
"stnIds": str(stn_id),
}
headers = {
"User-Agent": "Mozilla/5.0",
"Accept": "application/json"
}
try:
resp = requests.get(url, params=params, headers=headers, timeout=15)
resp.raise_for_status()
data = resp.json()
items = data.get("response", {}).get("body", {}).get("items", {}).get("item", [])
if items is None:
return []
if isinstance(items, dict):
items = [items]
return items
except Exception as e:
print(f"[ERROR] API 요청 실패: {e}")
return []
def save_items_to_db(items, conn, table, force_update=False, debug=False):
hrmt_keys = [
"minTaHrmt", "maxTaHrmt", "mi10MaxRnHrmt", "hr1MaxRnHrmt",
"maxInsWsHrmt", "maxWsHrmt", "minRhmHrmt", "maxPsHrmt",
"minPsHrmt", "hr1MaxIcsrHrmt", "ddMefsHrmt", "ddMesHrmt"
]
for item in items:
try:
date_str = item.get("tm")
if not date_str:
continue
record_date = datetime.strptime(date_str, "%Y-%m-%d").date()
data = {"date": record_date}
for key in table.c.keys():
if key == "date":
continue
value = item.get(key)
if value in ["", None, "-"]:
data[key] = None
else:
try:
if key in hrmt_keys or key == "iscs":
fval = float(value)
if fval < 0:
data[key] = None
else:
data[key] = str(int(fval))
elif key == "stnId":
data[key] = int(float(value))
else:
data[key] = float(value)
except (ValueError, TypeError):
data[key] = None
if debug:
print(f"[DEBUG] {record_date} → DB 저장 시도: {data}")
continue
if force_update:
stmt = mysql_insert(table).values(**data)
stmt = stmt.on_duplicate_key_update(**data)
conn.execute(stmt)
print(f"[INFO] {record_date} 저장/업데이트 완료")
else:
sel = select(table.c.date).where(table.c.date == record_date)
if conn.execute(sel).fetchone():
print(f"[INFO] {record_date} 이미 존재, 저장 생략")
continue
conn.execute(table.insert().values(**data))
print(f"[INFO] {record_date} 저장 완료")
except Exception as e:
print(f"[ERROR] 저장 실패: {e}")
raise
def get_latest_date_from_db(conn, table):
sel = select(table.c.date).order_by(table.c.date.desc()).limit(1)
result = conn.execute(sel).fetchone()
if result:
return result[0]
return None
def main():
config = load_config()
service_key = config["DATA_API"]["serviceKey"]
stn_ids = config["DATA_API"]["weather"]["stnIds"]
debug = config.get("debug", False)
force_update = config.get("force_update", False)
table = db_schema.weather
engine = db.engine
now = datetime.now()
today = now.date()
if now.hour < 11:
end_date = today - timedelta(days=2)
print(f"[INFO] 오전 11시 이전에는 전전일 데이터가 가장 최근입니다. 최종 검색일자 {end_date}")
else:
end_date = today - timedelta(days=1)
print(f"[INFO] 최종 검색일자 {end_date}")
config_start_date = datetime.strptime(config["DATA_API"]["startDt"], "%Y%m%d").date()
chunk_days = 1000
with engine.begin() as conn:
print(f"[INFO] DB 저장 최종 일자 점검")
latest_date = get_latest_date_from_db(conn, table)
print(f"[INFO] 최종 저장일 : {latest_date}")
if latest_date is None:
start_date = config_start_date
print(f"[INFO] 최종 저장 일자가 존재하지 않아 기본 시작일자를 사용합니다. {start_date}")
else:
start_date = max(config_start_date, latest_date + timedelta(days=1))
print(f"[INFO] 시작일자 : {start_date}")
if start_date > end_date:
print("[INFO] 최신 데이터가 이미 존재하거나 요청할 데이터가 없습니다.")
return
start_dt = start_date.strftime("%Y%m%d")
end_dt = end_date.strftime("%Y%m%d")
for stn_id in stn_ids:
for chunk_start, chunk_end in fetch_data_range_chunks(start_dt, end_dt, chunk_days):
print(f"[INFO] 지점 {stn_id} 데이터 요청 중: {chunk_start} ~ {chunk_end}")
items = fetch_asos_data(stn_id, chunk_start, chunk_end, service_key)
if items:
save_items_to_db(items, conn, table, force_update, debug)
else:
print(f"[WARN] 지점 {stn_id} {chunk_start}~{chunk_end} 데이터 없음 또는 요청 실패")
if __name__ == "__main__":
main()