DB에서 최근 날짜를 찾아 해당 날짜 다음날부터 전날까지의 대기정보를 받아 DB에 저장함.

This commit is contained in:
2025-07-04 17:59:59 +09:00
parent 4850cac24b
commit 82dcb70cb6

View File

@ -1,22 +1,57 @@
import sys, os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import requests import requests
from datetime import datetime, timedelta from datetime import datetime, timedelta
from sqlalchemy import select, func from sqlalchemy import select, func
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from sqlalchemy.dialects.mysql import insert as mysql_insert
from lib import db from lib import db
from lib import db_schema from lib import db_schema
def fetch_air_quality_data(date_str, api_key, station_name='운정'):
""" def test_num_of_rows(api_key, station_name='운정', date_str=None):
API에서 지정된 날짜의 대기환경 데이터를 가져옴 if date_str is None:
""" date_str = (datetime.now() - timedelta(days=1)).strftime('%Y%m%d')
max_rows = 500
min_rows = 100
while max_rows >= min_rows:
try:
url = "http://apis.data.go.kr/B552584/ArpltnStatsSvc/getMsrstnAcctoRDyrg"
params = {
'serviceKey': api_key,
'returnType': 'json',
'numOfRows': str(max_rows),
'pageNo': '1',
'inqBginDt': date_str,
'inqEndDt': date_str,
'msrstnName': station_name,
}
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
resp.json()
print(f"[INFO] numOfRows 최대값 탐색 성공: {max_rows}")
return max_rows
except (requests.RequestException, ValueError) as e:
print(f"[WARN] numOfRows={max_rows} 에러 발생: {e}, 100 감소 후 재시도")
max_rows -= 100
print("[WARN] 최소 numOfRows 값(100)도 실패했습니다. 기본값 100 사용")
return 100
def fetch_air_quality_data_range(start_date_str, end_date_str, api_key, num_of_rows=100, station_name='운정', page_no=1):
url = "http://apis.data.go.kr/B552584/ArpltnStatsSvc/getMsrstnAcctoRDyrg" url = "http://apis.data.go.kr/B552584/ArpltnStatsSvc/getMsrstnAcctoRDyrg"
params = { params = {
'serviceKey': api_key, 'serviceKey': api_key,
'returnType': 'json', 'returnType': 'json',
'numOfRows': '100', 'numOfRows': str(num_of_rows),
'pageNo': '1', 'pageNo': str(page_no),
'inqBginDt': date_str, 'inqBginDt': start_date_str,
'inqEndDt': date_str, 'inqEndDt': end_date_str,
'msrstnName': station_name, 'msrstnName': station_name,
} }
@ -33,10 +68,7 @@ def fetch_air_quality_data(date_str, api_key, station_name='운정'):
print(f"[ERROR] JSON 파싱 실패: {e}") print(f"[ERROR] JSON 파싱 실패: {e}")
return [] return []
def save_data_to_db(items, conn, table, debug=False): def save_data_to_db(items, conn, table, force_update=False, debug=False):
"""
대기환경 정보를 DB에 저장
"""
for item in items: for item in items:
try: try:
item_date = datetime.strptime(item['msurDt'], '%Y-%m-%d').date() item_date = datetime.strptime(item['msurDt'], '%Y-%m-%d').date()
@ -44,45 +76,40 @@ def save_data_to_db(items, conn, table, debug=False):
print(f"[ERROR] 날짜 파싱 오류: {item.get('msurDt')}{e}") print(f"[ERROR] 날짜 파싱 오류: {item.get('msurDt')}{e}")
continue continue
if debug: data = {
print(f"[DEBUG] 처리 중: {item}") 'date': item_date,
'pm25': float(item.get('pm25Value')) if item.get('pm25Value') else None,
# 중복 확인 'pm10': float(item.get('pm10Value')) if item.get('pm10Value') else None,
sel = select(table.c.date).where(table.c.date == item_date) 'so2': float(item.get('so2Value')) if item.get('so2Value') else None,
try: 'co': float(item.get('coValue')) if item.get('coValue') else None,
exists = conn.execute(sel).fetchone() 'no2': float(item.get('no2Value')) if item.get('no2Value') else None,
except Exception as e: 'o3': float(item.get('o3Value')) if item.get('o3Value') else None,
print(f"[ERROR] SELECT 오류: {e}") }
continue
if exists:
if debug:
print(f"[DEBUG] {item_date} 이미 존재. 저장 생략")
continue
ins = table.insert().values(
date=item_date,
pm25=float(item.get('pm25Value')) if item.get('pm25Value') else None,
pm10=float(item.get('pm10Value')) if item.get('pm10Value') else None,
so2=float(item.get('so2Value')) if item.get('so2Value') else None,
co=float(item.get('coValue')) if item.get('coValue') else None,
no2=float(item.get('no2Value')) if item.get('no2Value') else None,
o3=float(item.get('o3Value')) if item.get('o3Value') else None,
)
try: try:
conn.execute(ins) if force_update:
if debug: stmt = mysql_insert(table).values(**data)
print(f"[DEBUG] {item_date} 저장 완료") on_duplicate_stmt = stmt.on_duplicate_key_update(**data)
conn.execute(on_duplicate_stmt)
if debug:
print(f"[DEBUG] {item_date} 저장/업데이트 완료 (강제업데이트)")
else:
sel = select(table.c.date).where(table.c.date == item_date)
exists = conn.execute(sel).fetchone()
if exists:
if debug:
print(f"[DEBUG] {item_date} 이미 존재. 저장 생략")
continue
ins = table.insert().values(**data)
conn.execute(ins)
if debug:
print(f"[DEBUG] {item_date} 저장 완료")
except IntegrityError as e: except IntegrityError as e:
print(f"[ERROR] DB 중복 오류: {e}") print(f"[ERROR] DB 중복 오류: {e}")
except Exception as e: except Exception as e:
print(f"[ERROR] DB 저장 실패: {e}") print(f"[ERROR] DB 저장 실패: {e}")
def get_latest_date(conn, table): def get_latest_date(conn, table):
"""
테이블에서 가장 최근 날짜를 반환. 값이 없으면 None
"""
try: try:
sel = select(func.max(table.c.date)) sel = select(func.max(table.c.date))
result = conn.execute(sel).scalar() result = conn.execute(sel).scalar()
@ -97,40 +124,47 @@ def main():
config = db.load_config() config = db.load_config()
api_key = config['air_quality_api']['service_key'] api_key = config['air_quality_api']['service_key']
debug = config.get('debug', False) debug = config.get('debug', False)
force_update = config.get('force_update', False)
engine = db.engine engine = db.engine
table = db_schema.fg_manager_static_air table = db_schema.fg_manager_static_air
with engine.connect() as conn: with engine.connect() as conn:
latest_date = get_latest_date(conn, table) trans = conn.begin()
try:
start_date = datetime.strptime("2017-01-01", "%Y-%m-%d").date()
end_date = (datetime.now() - timedelta(days=1)).date()
if latest_date is None: optimal_num_rows = test_num_of_rows(api_key, date_str=start_date.strftime('%Y%m%d'))
start_date = datetime.strptime("2023-01-01", "%Y-%m-%d").date()
else:
start_date = latest_date + timedelta(days=1)
end_date = (datetime.now() - timedelta(days=1)).date()
if debug:
print(f"[DEBUG] 최신 DB 날짜: {latest_date}")
print(f"[DEBUG] 수집 시작일: {start_date}")
print(f"[DEBUG] 수집 종료일: {end_date}")
current_date = start_date
while current_date <= end_date:
date_str = current_date.strftime('%Y%m%d')
if debug: if debug:
print(f"[DEBUG] {date_str} 데이터 요청 중...") print(f"[DEBUG] 최적 numOfRows 값: {optimal_num_rows}")
items = fetch_air_quality_data(date_str, api_key) current_start = start_date
while current_start <= end_date:
current_end = current_start + timedelta(days=optimal_num_rows - 1)
if current_end > end_date:
current_end = end_date
start_str = current_start.strftime('%Y%m%d')
end_str = current_end.strftime('%Y%m%d')
if items:
save_data_to_db(items, conn, table, debug=debug)
else:
if debug: if debug:
print(f"[DEBUG] {date_str} 데이터 없음") print(f"[DEBUG] {start_str} ~ {end_str} 데이터 요청 중...")
current_date += timedelta(days=1) items = fetch_air_quality_data_range(start_str, end_str, api_key, num_of_rows=optimal_num_rows)
if items:
save_data_to_db(items, conn, table, force_update=force_update, debug=debug)
else:
if debug:
print(f"[DEBUG] {start_str} ~ {end_str} 데이터 없음")
current_start = current_end + timedelta(days=1)
trans.commit()
except Exception as e:
trans.rollback()
print(f"[ERROR] 트랜잭션 처리 중 오류 발생: {e}")
if __name__ == '__main__': if __name__ == '__main__':
main() main()