기상정보 부분에서 '운정' 기본값인 부분을 config에서 정의하도록 하고, 해당 값을 딕셔너리로 받아서 여러 지역정보를 업데이트할 수 있도록 수정함.
This commit is contained in:
@ -3,13 +3,11 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
|||||||
|
|
||||||
import requests
|
import requests
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from sqlalchemy import select, func
|
from sqlalchemy import select, func, and_
|
||||||
from sqlalchemy.exc import IntegrityError
|
from sqlalchemy.exc import IntegrityError
|
||||||
from sqlalchemy.dialects.mysql import insert as mysql_insert
|
from sqlalchemy.dialects.mysql import insert as mysql_insert
|
||||||
|
|
||||||
from lib import db
|
from conf import db, db_schema
|
||||||
from lib import db_schema
|
|
||||||
|
|
||||||
|
|
||||||
def test_num_of_rows(api_key, station_name, date_str=None):
|
def test_num_of_rows(api_key, station_name, date_str=None):
|
||||||
if date_str is None:
|
if date_str is None:
|
||||||
@ -35,7 +33,6 @@ def test_num_of_rows(api_key, station_name, date_str=None):
|
|||||||
resp.json()
|
resp.json()
|
||||||
print(f"[INFO] numOfRows 최대값 탐색 성공: {max_rows}")
|
print(f"[INFO] numOfRows 최대값 탐색 성공: {max_rows}")
|
||||||
return max_rows
|
return max_rows
|
||||||
|
|
||||||
except (requests.RequestException, ValueError) as e:
|
except (requests.RequestException, ValueError) as e:
|
||||||
print(f"[WARN] numOfRows={max_rows} 에러 발생: {e}, 100 감소 후 재시도")
|
print(f"[WARN] numOfRows={max_rows} 에러 발생: {e}, 100 감소 후 재시도")
|
||||||
max_rows -= 100
|
max_rows -= 100
|
||||||
@ -68,7 +65,7 @@ def fetch_air_quality_data_range(start_date_str, end_date_str, api_key, station_
|
|||||||
print(f"[ERROR] JSON 파싱 실패: {e}")
|
print(f"[ERROR] JSON 파싱 실패: {e}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def save_data_to_db(items, conn, table, force_update=False, debug=False):
|
def save_data_to_db(items, conn, table, station, force_update=False, debug=False):
|
||||||
for item in items:
|
for item in items:
|
||||||
try:
|
try:
|
||||||
item_date = datetime.strptime(item['msurDt'], '%Y-%m-%d').date()
|
item_date = datetime.strptime(item['msurDt'], '%Y-%m-%d').date()
|
||||||
@ -78,6 +75,7 @@ def save_data_to_db(items, conn, table, force_update=False, debug=False):
|
|||||||
|
|
||||||
data = {
|
data = {
|
||||||
'date': item_date,
|
'date': item_date,
|
||||||
|
'station': station,
|
||||||
'pm25': float(item.get('pm25Value')) if item.get('pm25Value') else None,
|
'pm25': float(item.get('pm25Value')) if item.get('pm25Value') else None,
|
||||||
'pm10': float(item.get('pm10Value')) if item.get('pm10Value') else None,
|
'pm10': float(item.get('pm10Value')) if item.get('pm10Value') else None,
|
||||||
'so2': float(item.get('so2Value')) if item.get('so2Value') else None,
|
'so2': float(item.get('so2Value')) if item.get('so2Value') else None,
|
||||||
@ -87,34 +85,35 @@ def save_data_to_db(items, conn, table, force_update=False, debug=False):
|
|||||||
}
|
}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
if debug:
|
||||||
|
print(f"[DEBUG] {item_date} [{station}] → DB 저장 시도: {data}")
|
||||||
|
continue
|
||||||
|
|
||||||
if force_update:
|
if force_update:
|
||||||
stmt = mysql_insert(table).values(**data)
|
stmt = mysql_insert(table).values(**data)
|
||||||
on_duplicate_stmt = stmt.on_duplicate_key_update(**data)
|
on_duplicate_stmt = stmt.on_duplicate_key_update(**data)
|
||||||
conn.execute(on_duplicate_stmt)
|
conn.execute(on_duplicate_stmt)
|
||||||
if debug:
|
print(f"[INFO] {item_date} [{station}] 저장/업데이트 완료 (강제업데이트)")
|
||||||
print(f"[DEBUG] {item_date} 저장/업데이트 완료 (강제업데이트)")
|
|
||||||
else:
|
else:
|
||||||
sel = select(table.c.date).where(table.c.date == item_date)
|
sel = select(table.c.date).where(and_(table.c.date == item_date, table.c.station == station))
|
||||||
exists = conn.execute(sel).fetchone()
|
exists = conn.execute(sel).fetchone()
|
||||||
if exists:
|
if exists:
|
||||||
if debug:
|
print(f"[INFO] {item_date} [{station}] 이미 존재. 저장 생략")
|
||||||
print(f"[DEBUG] {item_date} 이미 존재. 저장 생략")
|
|
||||||
continue
|
continue
|
||||||
ins = table.insert().values(**data)
|
ins = table.insert().values(**data)
|
||||||
conn.execute(ins)
|
conn.execute(ins)
|
||||||
if debug:
|
print(f"[INFO] {item_date} [{station}] 저장 완료")
|
||||||
print(f"[DEBUG] {item_date} 저장 완료")
|
|
||||||
except IntegrityError as e:
|
except IntegrityError as e:
|
||||||
print(f"[ERROR] DB 중복 오류: {e}")
|
print(f"[ERROR] DB 중복 오류: {e}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[ERROR] DB 저장 실패: {e}")
|
print(f"[ERROR] DB 저장 실패: {e}")
|
||||||
|
|
||||||
def get_latest_date(conn, table):
|
def get_latest_date(conn, table, station):
|
||||||
try:
|
try:
|
||||||
sel = select(func.max(table.c.date))
|
sel = select(func.max(table.c.date)).where(table.c.station == station)
|
||||||
result = conn.execute(sel).scalar()
|
result = conn.execute(sel).scalar()
|
||||||
if result is None:
|
if result is None:
|
||||||
print("[INFO] 테이블에 데이터가 없습니다.")
|
print(f"[INFO] {station}: 테이블에 데이터가 없습니다.")
|
||||||
return result
|
return result
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"[ERROR] 가장 최근 날짜 조회 실패: {e}")
|
print(f"[ERROR] 가장 최근 날짜 조회 실패: {e}")
|
||||||
@ -123,49 +122,49 @@ def get_latest_date(conn, table):
|
|||||||
def main():
|
def main():
|
||||||
config = db.load_config()
|
config = db.load_config()
|
||||||
api_key = config['DATA_API']['serviceKey']
|
api_key = config['DATA_API']['serviceKey']
|
||||||
station_name = config['DATA_API']['air']['station_name']
|
station_list = config['DATA_API']['air']['station_name']
|
||||||
debug = config.get('debug', False)
|
debug = config.get('debug', False)
|
||||||
force_update = config.get('force_update', False)
|
force_update = config.get('force_update', False)
|
||||||
|
config_start_date = datetime.strptime(config['DATA_API']['startDt'], '%Y%m%d').date()
|
||||||
|
yesterday = (datetime.now() - timedelta(days=1)).date()
|
||||||
|
|
||||||
engine = db.engine
|
engine = db.engine
|
||||||
table = db_schema.fg_manager_static_air
|
table = db_schema.fg_manager_static_air
|
||||||
|
|
||||||
with engine.connect() as conn:
|
with engine.connect() as conn:
|
||||||
trans = conn.begin()
|
for station_name in station_list:
|
||||||
try:
|
print(f"\n[INFO] 측정소: {station_name}")
|
||||||
start_date = datetime.strptime("2020-01-01", "%Y-%m-%d").date()
|
|
||||||
end_date = (datetime.now() - timedelta(days=1)).date()
|
|
||||||
|
|
||||||
optimal_num_rows = test_num_of_rows(api_key, date_str=start_date.strftime('%Y%m%d'))
|
latest_date = get_latest_date(conn, table, station_name)
|
||||||
if debug:
|
if latest_date:
|
||||||
print(f"[DEBUG] 최적 numOfRows 값: {optimal_num_rows}")
|
start_date = max(config_start_date, latest_date + timedelta(days=1))
|
||||||
|
else:
|
||||||
|
start_date = config_start_date
|
||||||
|
|
||||||
|
if start_date > yesterday:
|
||||||
|
print(f"[INFO] {station_name}: 최신 데이터가 이미 존재합니다. ({latest_date})")
|
||||||
|
continue
|
||||||
|
|
||||||
|
optimal_num_rows = test_num_of_rows(api_key, station_name=station_name, date_str=start_date.strftime('%Y%m%d'))
|
||||||
|
print(f"[INFO] 최적 numOfRows: {optimal_num_rows}")
|
||||||
|
|
||||||
current_start = start_date
|
current_start = start_date
|
||||||
while current_start <= end_date:
|
while current_start <= yesterday:
|
||||||
current_end = current_start + timedelta(days=optimal_num_rows - 1)
|
current_end = min(current_start + timedelta(days=optimal_num_rows - 1), yesterday)
|
||||||
if current_end > end_date:
|
|
||||||
current_end = end_date
|
|
||||||
|
|
||||||
start_str = current_start.strftime('%Y%m%d')
|
start_str = current_start.strftime('%Y%m%d')
|
||||||
end_str = current_end.strftime('%Y%m%d')
|
end_str = current_end.strftime('%Y%m%d')
|
||||||
|
|
||||||
if debug:
|
print(f"[INFO] {station_name}: {start_str} ~ {end_str} 데이터 요청 중...")
|
||||||
print(f"[DEBUG] {start_str} ~ {end_str} 데이터 요청 중...")
|
|
||||||
|
|
||||||
items = fetch_air_quality_data_range(start_str, end_str, api_key, station_name, num_of_rows=optimal_num_rows)
|
items = fetch_air_quality_data_range(start_str, end_str, api_key, station_name, num_of_rows=optimal_num_rows)
|
||||||
|
|
||||||
if items:
|
if items:
|
||||||
save_data_to_db(items, conn, table, force_update=force_update, debug=debug)
|
save_data_to_db(items, conn, table, station=station_name, force_update=force_update, debug=debug)
|
||||||
else:
|
else:
|
||||||
if debug:
|
print(f"[INFO] {station_name}: {start_str} ~ {end_str} 데이터 없음")
|
||||||
print(f"[DEBUG] {start_str} ~ {end_str} 데이터 없음")
|
|
||||||
|
|
||||||
current_start = current_end + timedelta(days=1)
|
current_start = current_end + timedelta(days=1)
|
||||||
|
|
||||||
trans.commit()
|
print("[INFO] 모든 측정소 데이터 처리 완료")
|
||||||
except Exception as e:
|
|
||||||
trans.rollback()
|
|
||||||
print(f"[ERROR] 트랜잭션 처리 중 오류 발생: {e}")
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
main()
|
main()
|
||||||
|
|||||||
Reference in New Issue
Block a user