137 lines
4.4 KiB
Python
137 lines
4.4 KiB
Python
import requests
|
|
from datetime import datetime, timedelta
|
|
from sqlalchemy import select, func
|
|
from sqlalchemy.exc import IntegrityError
|
|
from lib import db
|
|
from lib import db_schema
|
|
|
|
def fetch_air_quality_data(date_str, api_key, station_name='운정'):
|
|
"""
|
|
API에서 지정된 날짜의 대기환경 데이터를 가져옴
|
|
"""
|
|
url = "http://apis.data.go.kr/B552584/ArpltnStatsSvc/getMsrstnAcctoRDyrg"
|
|
params = {
|
|
'serviceKey': api_key,
|
|
'returnType': 'json',
|
|
'numOfRows': '100',
|
|
'pageNo': '1',
|
|
'inqBginDt': date_str,
|
|
'inqEndDt': date_str,
|
|
'msrstnName': station_name,
|
|
}
|
|
|
|
try:
|
|
resp = requests.get(url, params=params, timeout=10)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
items = data.get('response', {}).get('body', {}).get('items', [])
|
|
return items
|
|
except requests.RequestException as e:
|
|
print(f"[ERROR] 요청 실패: {e}")
|
|
return []
|
|
except ValueError as e:
|
|
print(f"[ERROR] JSON 파싱 실패: {e}")
|
|
return []
|
|
|
|
def save_data_to_db(items, conn, table, debug=False):
|
|
"""
|
|
대기환경 정보를 DB에 저장
|
|
"""
|
|
for item in items:
|
|
try:
|
|
item_date = datetime.strptime(item['msurDt'], '%Y-%m-%d').date()
|
|
except Exception as e:
|
|
print(f"[ERROR] 날짜 파싱 오류: {item.get('msurDt')} → {e}")
|
|
continue
|
|
|
|
if debug:
|
|
print(f"[DEBUG] 처리 중: {item}")
|
|
|
|
# 중복 확인
|
|
sel = select(table.c.date).where(table.c.date == item_date)
|
|
try:
|
|
exists = conn.execute(sel).fetchone()
|
|
except Exception as e:
|
|
print(f"[ERROR] SELECT 오류: {e}")
|
|
continue
|
|
|
|
if exists:
|
|
if debug:
|
|
print(f"[DEBUG] {item_date} 이미 존재. 저장 생략")
|
|
continue
|
|
|
|
ins = table.insert().values(
|
|
date=item_date,
|
|
pm25=float(item.get('pm25Value')) if item.get('pm25Value') else None,
|
|
pm10=float(item.get('pm10Value')) if item.get('pm10Value') else None,
|
|
so2=float(item.get('so2Value')) if item.get('so2Value') else None,
|
|
co=float(item.get('coValue')) if item.get('coValue') else None,
|
|
no2=float(item.get('no2Value')) if item.get('no2Value') else None,
|
|
o3=float(item.get('o3Value')) if item.get('o3Value') else None,
|
|
)
|
|
|
|
try:
|
|
conn.execute(ins)
|
|
if debug:
|
|
print(f"[DEBUG] {item_date} 저장 완료")
|
|
except IntegrityError as e:
|
|
print(f"[ERROR] DB 중복 오류: {e}")
|
|
except Exception as e:
|
|
print(f"[ERROR] DB 저장 실패: {e}")
|
|
|
|
def get_latest_date(conn, table):
|
|
"""
|
|
테이블에서 가장 최근 날짜를 반환. 값이 없으면 None
|
|
"""
|
|
try:
|
|
sel = select(func.max(table.c.date))
|
|
result = conn.execute(sel).scalar()
|
|
if result is None:
|
|
print("[INFO] 테이블에 데이터가 없습니다.")
|
|
return result
|
|
except Exception as e:
|
|
print(f"[ERROR] 가장 최근 날짜 조회 실패: {e}")
|
|
return None
|
|
|
|
def main():
|
|
config = db.load_config()
|
|
api_key = config['air_quality_api']['service_key']
|
|
debug = config.get('debug', False)
|
|
|
|
engine = db.engine
|
|
table = db_schema.fg_manager_static_air
|
|
|
|
with engine.connect() as conn:
|
|
latest_date = get_latest_date(conn, table)
|
|
|
|
if latest_date is None:
|
|
start_date = datetime.strptime("2023-01-01", "%Y-%m-%d").date()
|
|
else:
|
|
start_date = latest_date + timedelta(days=1)
|
|
|
|
end_date = (datetime.now() - timedelta(days=1)).date()
|
|
|
|
if debug:
|
|
print(f"[DEBUG] 최신 DB 날짜: {latest_date}")
|
|
print(f"[DEBUG] 수집 시작일: {start_date}")
|
|
print(f"[DEBUG] 수집 종료일: {end_date}")
|
|
|
|
current_date = start_date
|
|
while current_date <= end_date:
|
|
date_str = current_date.strftime('%Y%m%d')
|
|
if debug:
|
|
print(f"[DEBUG] {date_str} 데이터 요청 중...")
|
|
|
|
items = fetch_air_quality_data(date_str, api_key)
|
|
|
|
if items:
|
|
save_data_to_db(items, conn, table, debug=debug)
|
|
else:
|
|
if debug:
|
|
print(f"[DEBUG] {date_str} 데이터 없음")
|
|
|
|
current_date += timedelta(days=1)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|