종관기상관측 업데이트
This commit is contained in:
@ -2,12 +2,12 @@ import sys, os
|
|||||||
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
||||||
|
|
||||||
import yaml
|
import yaml
|
||||||
from datetime import datetime, timedelta, date
|
|
||||||
import requests
|
import requests
|
||||||
|
from datetime import datetime, timedelta, date
|
||||||
from sqlalchemy.dialects.mysql import insert as mysql_insert
|
from sqlalchemy.dialects.mysql import insert as mysql_insert
|
||||||
from sqlalchemy import select
|
from sqlalchemy import select
|
||||||
from lib import db
|
|
||||||
from lib import db_schema
|
from conf import db, db_schema
|
||||||
|
|
||||||
CONFIG_PATH = os.path.join(os.path.dirname(__file__), "../conf/config.yaml")
|
CONFIG_PATH = os.path.join(os.path.dirname(__file__), "../conf/config.yaml")
|
||||||
|
|
||||||
@ -16,10 +16,6 @@ def load_config():
|
|||||||
return yaml.safe_load(f)
|
return yaml.safe_load(f)
|
||||||
|
|
||||||
def fetch_data_range_chunks(start_dt, end_dt, chunk_days=10):
|
def fetch_data_range_chunks(start_dt, end_dt, chunk_days=10):
|
||||||
"""
|
|
||||||
YYYYMMDD 형식 날짜 문자열을 받아
|
|
||||||
chunk_days 단위로 (start, end) 튜플 반복자 생성
|
|
||||||
"""
|
|
||||||
current_start = datetime.strptime(start_dt, "%Y%m%d")
|
current_start = datetime.strptime(start_dt, "%Y%m%d")
|
||||||
end_date = datetime.strptime(end_dt, "%Y%m%d")
|
end_date = datetime.strptime(end_dt, "%Y%m%d")
|
||||||
|
|
||||||
@ -29,7 +25,7 @@ def fetch_data_range_chunks(start_dt, end_dt, chunk_days=10):
|
|||||||
current_start = current_end + timedelta(days=1)
|
current_start = current_end + timedelta(days=1)
|
||||||
|
|
||||||
def fetch_asos_data(stn_id, start_dt, end_dt, service_key):
|
def fetch_asos_data(stn_id, start_dt, end_dt, service_key):
|
||||||
url = "http://apis.data.go.kr/1360000/AsosDalyInfoService/getWthrDataList" # HTTP로 변경
|
url = "http://apis.data.go.kr/1360000/AsosDalyInfoService/getWthrDataList"
|
||||||
|
|
||||||
params = {
|
params = {
|
||||||
"serviceKey": service_key,
|
"serviceKey": service_key,
|
||||||
@ -101,28 +97,28 @@ def save_items_to_db(items, conn, table, force_update=False, debug=False):
|
|||||||
except (ValueError, TypeError):
|
except (ValueError, TypeError):
|
||||||
data[key] = None
|
data[key] = None
|
||||||
|
|
||||||
|
if debug:
|
||||||
|
print(f"[DEBUG] {record_date} → DB 저장 시도: {data}")
|
||||||
|
continue
|
||||||
|
|
||||||
if force_update:
|
if force_update:
|
||||||
stmt = mysql_insert(table).values(**data)
|
stmt = mysql_insert(table).values(**data)
|
||||||
stmt = stmt.on_duplicate_key_update(**data)
|
stmt = stmt.on_duplicate_key_update(**data)
|
||||||
conn.execute(stmt)
|
conn.execute(stmt)
|
||||||
if debug:
|
print(f"[INFO] {record_date} 저장/업데이트 완료")
|
||||||
print(f"[DEBUG] {record_date} 저장/업데이트 완료")
|
|
||||||
else:
|
else:
|
||||||
sel = select(table.c.date).where(table.c.date == record_date)
|
sel = select(table.c.date).where(table.c.date == record_date)
|
||||||
if conn.execute(sel).fetchone():
|
if conn.execute(sel).fetchone():
|
||||||
if debug:
|
print(f"[INFO] {record_date} 이미 존재, 저장 생략")
|
||||||
print(f"[DEBUG] {record_date} 이미 존재, 저장 생략")
|
|
||||||
continue
|
continue
|
||||||
conn.execute(table.insert().values(**data))
|
conn.execute(table.insert().values(**data))
|
||||||
if debug:
|
print(f"[INFO] {record_date} 저장 완료")
|
||||||
print(f"[DEBUG] {record_date} 저장 완료")
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[ERROR] 저장 실패: {e}")
|
print(f"[ERROR] 저장 실패: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def get_latest_date_from_db(conn, table):
|
def get_latest_date_from_db(conn, table):
|
||||||
"""DB에서 가장 최신 date 값을 조회"""
|
|
||||||
sel = select(table.c.date).order_by(table.c.date.desc()).limit(1)
|
sel = select(table.c.date).order_by(table.c.date.desc()).limit(1)
|
||||||
result = conn.execute(sel).fetchone()
|
result = conn.execute(sel).fetchone()
|
||||||
if result:
|
if result:
|
||||||
@ -131,34 +127,32 @@ def get_latest_date_from_db(conn, table):
|
|||||||
|
|
||||||
def main():
|
def main():
|
||||||
config = load_config()
|
config = load_config()
|
||||||
service_key = config["weather_api"]["serviceKey"]
|
service_key = config["DATA_API"]["serviceKey"]
|
||||||
stn_ids = config["weather_api"]["stnIds"]
|
stn_ids = config["DATA_API"]["weather"]["stnIds"]
|
||||||
debug = config.get("debug", False)
|
debug = config.get("debug", False)
|
||||||
force_update = config.get("force_update", False)
|
force_update = config.get("force_update", False)
|
||||||
|
|
||||||
table = db_schema.fg_manager_static_weather
|
table = db_schema.fg_manager_static_weather
|
||||||
engine = db.engine
|
engine = db.engine
|
||||||
|
|
||||||
# 현재 날짜 및 시간 판단
|
|
||||||
now = datetime.now()
|
now = datetime.now()
|
||||||
today = now.date()
|
today = now.date()
|
||||||
|
|
||||||
# 오전 11시 이전이면 전전날까지, 아니면 전날까지 데이터 요청
|
|
||||||
if now.hour < 11:
|
if now.hour < 11:
|
||||||
end_date = today - timedelta(days=2)
|
end_date = today - timedelta(days=2)
|
||||||
else:
|
else:
|
||||||
end_date = today - timedelta(days=1)
|
end_date = today - timedelta(days=1)
|
||||||
|
|
||||||
default_start = datetime.strptime("20170101", "%Y%m%d").date()
|
config_start_date = datetime.strptime(config["DATA_API"]["startDt"], "%Y%m%d").date()
|
||||||
|
|
||||||
chunk_days = 1000
|
chunk_days = 1000
|
||||||
|
|
||||||
with engine.begin() as conn:
|
with engine.begin() as conn:
|
||||||
latest_date = get_latest_date_from_db(conn, table)
|
latest_date = get_latest_date_from_db(conn, table)
|
||||||
if latest_date is None:
|
if latest_date is None:
|
||||||
start_date = default_start
|
start_date = config_start_date
|
||||||
else:
|
else:
|
||||||
start_date = latest_date + timedelta(days=1)
|
start_date = max(config_start_date, latest_date + timedelta(days=1))
|
||||||
|
|
||||||
if start_date > end_date:
|
if start_date > end_date:
|
||||||
if debug:
|
if debug:
|
||||||
|
|||||||
Reference in New Issue
Block a user