DB에서 최종 날짜를 확인하고, 해당일 다음날부터 전일까지 기상정보를 받아와 저장함, 만약 실행시간이 오전 11시 이전이라면, 전전날 데이터를 저장함.
This commit is contained in:
182
lib/weather_asos.py
Normal file
182
lib/weather_asos.py
Normal file
@ -0,0 +1,182 @@
|
||||
import sys, os
|
||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||
|
||||
import yaml
|
||||
from datetime import datetime, timedelta, date
|
||||
import requests
|
||||
from sqlalchemy.dialects.mysql import insert as mysql_insert
|
||||
from sqlalchemy import select
|
||||
from lib import db
|
||||
from lib import db_schema
|
||||
|
||||
CONFIG_PATH = os.path.join(os.path.dirname(__file__), "../conf/config.yaml")
|
||||
|
||||
def load_config():
|
||||
with open(CONFIG_PATH, encoding="utf-8") as f:
|
||||
return yaml.safe_load(f)
|
||||
|
||||
def fetch_data_range_chunks(start_dt, end_dt, chunk_days=10):
|
||||
"""
|
||||
YYYYMMDD 형식 날짜 문자열을 받아
|
||||
chunk_days 단위로 (start, end) 튜플 반복자 생성
|
||||
"""
|
||||
current_start = datetime.strptime(start_dt, "%Y%m%d")
|
||||
end_date = datetime.strptime(end_dt, "%Y%m%d")
|
||||
|
||||
while current_start <= end_date:
|
||||
current_end = min(current_start + timedelta(days=chunk_days - 1), end_date)
|
||||
yield current_start.strftime("%Y%m%d"), current_end.strftime("%Y%m%d")
|
||||
current_start = current_end + timedelta(days=1)
|
||||
|
||||
def fetch_asos_data(stn_id, start_dt, end_dt, service_key):
|
||||
url = "http://apis.data.go.kr/1360000/AsosDalyInfoService/getWthrDataList" # HTTP로 변경
|
||||
|
||||
params = {
|
||||
"serviceKey": service_key,
|
||||
"pageNo": "1",
|
||||
"numOfRows": "500",
|
||||
"dataType": "JSON",
|
||||
"dataCd": "ASOS",
|
||||
"dateCd": "DAY",
|
||||
"startDt": start_dt,
|
||||
"endDt": end_dt,
|
||||
"stnIds": str(stn_id),
|
||||
}
|
||||
|
||||
headers = {
|
||||
"User-Agent": "Mozilla/5.0",
|
||||
"Accept": "application/json"
|
||||
}
|
||||
|
||||
try:
|
||||
resp = requests.get(url, params=params, headers=headers, timeout=15)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
items = data.get("response", {}).get("body", {}).get("items", {}).get("item", [])
|
||||
|
||||
if items is None:
|
||||
return []
|
||||
if isinstance(items, dict):
|
||||
items = [items]
|
||||
return items
|
||||
|
||||
except Exception as e:
|
||||
print(f"[ERROR] API 요청 실패: {e}")
|
||||
return []
|
||||
|
||||
def save_items_to_db(items, conn, table, force_update=False, debug=False):
|
||||
hrmt_keys = [
|
||||
"minTaHrmt", "maxTaHrmt", "mi10MaxRnHrmt", "hr1MaxRnHrmt",
|
||||
"maxInsWsHrmt", "maxWsHrmt", "minRhmHrmt", "maxPsHrmt",
|
||||
"minPsHrmt", "hr1MaxIcsrHrmt", "ddMefsHrmt", "ddMesHrmt"
|
||||
]
|
||||
|
||||
for item in items:
|
||||
try:
|
||||
date_str = item.get("tm")
|
||||
if not date_str:
|
||||
continue
|
||||
record_date = datetime.strptime(date_str, "%Y-%m-%d").date()
|
||||
|
||||
data = {"date": record_date}
|
||||
|
||||
for key in table.c.keys():
|
||||
if key == "date":
|
||||
continue
|
||||
value = item.get(key)
|
||||
if value in ["", None, "-"]:
|
||||
data[key] = None
|
||||
else:
|
||||
try:
|
||||
if key in hrmt_keys or key == "iscs":
|
||||
fval = float(value)
|
||||
if fval < 0:
|
||||
data[key] = None
|
||||
else:
|
||||
data[key] = str(int(fval))
|
||||
elif key == "stnId":
|
||||
data[key] = int(float(value))
|
||||
else:
|
||||
data[key] = float(value)
|
||||
except (ValueError, TypeError):
|
||||
data[key] = None
|
||||
|
||||
if force_update:
|
||||
stmt = mysql_insert(table).values(**data)
|
||||
stmt = stmt.on_duplicate_key_update(**data)
|
||||
conn.execute(stmt)
|
||||
if debug:
|
||||
print(f"[DEBUG] {record_date} 저장/업데이트 완료")
|
||||
else:
|
||||
sel = select(table.c.date).where(table.c.date == record_date)
|
||||
if conn.execute(sel).fetchone():
|
||||
if debug:
|
||||
print(f"[DEBUG] {record_date} 이미 존재, 저장 생략")
|
||||
continue
|
||||
conn.execute(table.insert().values(**data))
|
||||
if debug:
|
||||
print(f"[DEBUG] {record_date} 저장 완료")
|
||||
|
||||
except Exception as e:
|
||||
print(f"[ERROR] 저장 실패: {e}")
|
||||
raise
|
||||
|
||||
def get_latest_date_from_db(conn, table):
|
||||
"""DB에서 가장 최신 date 값을 조회"""
|
||||
sel = select(table.c.date).order_by(table.c.date.desc()).limit(1)
|
||||
result = conn.execute(sel).fetchone()
|
||||
if result:
|
||||
return result[0]
|
||||
return None
|
||||
|
||||
def main():
|
||||
config = load_config()
|
||||
service_key = config["weather_api"]["serviceKey"]
|
||||
stn_ids = config["weather_api"]["stnIds"]
|
||||
debug = config.get("debug", False)
|
||||
force_update = config.get("force_update", False)
|
||||
|
||||
table = db_schema.fg_manager_static_weather
|
||||
engine = db.engine
|
||||
|
||||
# 현재 날짜 및 시간 판단
|
||||
now = datetime.now()
|
||||
today = now.date()
|
||||
|
||||
# 오전 11시 이전이면 전전날까지, 아니면 전날까지 데이터 요청
|
||||
if now.hour < 11:
|
||||
end_date = today - timedelta(days=2)
|
||||
else:
|
||||
end_date = today - timedelta(days=1)
|
||||
|
||||
default_start = datetime.strptime("20170101", "%Y%m%d").date()
|
||||
|
||||
chunk_days = 1000
|
||||
|
||||
with engine.begin() as conn:
|
||||
latest_date = get_latest_date_from_db(conn, table)
|
||||
if latest_date is None:
|
||||
start_date = default_start
|
||||
else:
|
||||
start_date = latest_date + timedelta(days=1)
|
||||
|
||||
if start_date > end_date:
|
||||
if debug:
|
||||
print("[INFO] 최신 데이터가 이미 존재하거나 요청할 데이터가 없습니다.")
|
||||
return
|
||||
|
||||
start_dt = start_date.strftime("%Y%m%d")
|
||||
end_dt = end_date.strftime("%Y%m%d")
|
||||
|
||||
for stn_id in stn_ids:
|
||||
for chunk_start, chunk_end in fetch_data_range_chunks(start_dt, end_dt, chunk_days):
|
||||
if debug:
|
||||
print(f"[INFO] 지점 {stn_id} 데이터 요청 중: {chunk_start} ~ {chunk_end}")
|
||||
items = fetch_asos_data(stn_id, chunk_start, chunk_end, service_key)
|
||||
if items:
|
||||
save_items_to_db(items, conn, table, force_update, debug)
|
||||
else:
|
||||
print(f"[WARN] 지점 {stn_id} {chunk_start}~{chunk_end} 데이터 없음 또는 요청 실패")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user