427 lines
14 KiB
Python
427 lines
14 KiB
Python
# ===================================================================
|
|
# services/analytics/air_quality.py
|
|
# 대기질 데이터 수집 서비스 모듈
|
|
# ===================================================================
|
|
# 한국환경공단 API를 통해 대기질(미세먼지) 데이터를 수집합니다.
|
|
# 측정소별 PM2.5, PM10, SO2, CO, NO2, O3 데이터를 저장합니다.
|
|
# ===================================================================
|
|
"""
|
|
대기질 데이터 수집 서비스 모듈
|
|
|
|
한국환경공단 공공데이터 API를 통해 대기질 데이터를 수집합니다.
|
|
측정소별 일평균 대기오염물질 농도를 조회할 수 있습니다.
|
|
|
|
사용 예시:
|
|
from services.analytics.air_quality import AirQualityCollector, get_air_quality
|
|
|
|
# 간단한 데이터 조회
|
|
data = get_air_quality(service_key, '운정', '20240101', '20240131')
|
|
|
|
# 자동 데이터 수집 및 저장
|
|
collector = AirQualityCollector(config, engine, table)
|
|
collector.run()
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import traceback
|
|
from datetime import datetime, timedelta, date
|
|
from typing import Dict, List, Optional, Any
|
|
|
|
from sqlalchemy import select, func, and_, Table
|
|
from sqlalchemy.dialects.mysql import insert as mysql_insert
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.engine import Engine, Connection
|
|
|
|
from core.logging_utils import get_logger
|
|
from core.http_client import create_retry_session
|
|
from core.config import get_config
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
# API URL
|
|
AIR_QUALITY_API_URL = "http://apis.data.go.kr/B552584/ArpltnStatsSvc/getMsrstnAcctoRDyrg"
|
|
|
|
|
|
def get_air_quality(
|
|
service_key: str,
|
|
station_name: str,
|
|
start_date: str,
|
|
end_date: str,
|
|
num_of_rows: int = 100,
|
|
page_no: int = 1
|
|
) -> List[Dict]:
|
|
"""
|
|
대기질 데이터 조회
|
|
|
|
한국환경공단 API를 호출하여 측정소별 대기질 데이터를 조회합니다.
|
|
|
|
Args:
|
|
service_key: 공공데이터포털 API 키
|
|
station_name: 측정소명 (예: '운정', '서울')
|
|
start_date: 시작 날짜 (YYYYMMDD)
|
|
end_date: 종료 날짜 (YYYYMMDD)
|
|
num_of_rows: 페이지당 결과 수
|
|
page_no: 페이지 번호
|
|
|
|
Returns:
|
|
대기질 데이터 리스트
|
|
|
|
데이터 항목:
|
|
- msurDt: 측정일 (YYYY-MM-DD)
|
|
- pm25Value: 초미세먼지 농도 (㎍/㎥)
|
|
- pm10Value: 미세먼지 농도 (㎍/㎥)
|
|
- so2Value: 아황산가스 농도 (ppm)
|
|
- coValue: 일산화탄소 농도 (ppm)
|
|
- no2Value: 이산화질소 농도 (ppm)
|
|
- o3Value: 오존 농도 (ppm)
|
|
"""
|
|
params = {
|
|
'serviceKey': service_key,
|
|
'returnType': 'json',
|
|
'numOfRows': str(num_of_rows),
|
|
'pageNo': str(page_no),
|
|
'inqBginDt': start_date,
|
|
'inqEndDt': end_date,
|
|
'msrstnName': station_name,
|
|
}
|
|
|
|
session = create_retry_session(retries=3)
|
|
|
|
try:
|
|
response = session.get(AIR_QUALITY_API_URL, params=params, timeout=20)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
items = data.get('response', {}).get('body', {}).get('items', [])
|
|
logger.debug(f"대기질 데이터 조회: {station_name}, {len(items)}건")
|
|
return items if items else []
|
|
|
|
except Exception as e:
|
|
logger.error(f"대기질 API 요청 실패: {e}")
|
|
traceback.print_exc()
|
|
return []
|
|
finally:
|
|
session.close()
|
|
|
|
|
|
class AirQualityCollector:
|
|
"""
|
|
대기질 데이터 자동 수집기
|
|
|
|
설정에 따라 대기질 데이터를 자동으로 수집하고 DB에 저장합니다.
|
|
|
|
Attributes:
|
|
api_key: API 서비스 키
|
|
station_list: 측정소 목록
|
|
engine: SQLAlchemy 엔진
|
|
table: 대상 테이블
|
|
start_date: 수집 시작일
|
|
force_update: 강제 업데이트 여부
|
|
debug: 디버그 모드
|
|
"""
|
|
|
|
# 캐시 파일 경로
|
|
CACHE_FILE = 'cache/air_num_rows.json'
|
|
|
|
def __init__(
|
|
self,
|
|
engine: Engine,
|
|
table: Table,
|
|
api_key: Optional[str] = None,
|
|
station_list: Optional[List[str]] = None,
|
|
start_date: Optional[str] = None,
|
|
force_update: bool = False,
|
|
debug: bool = False
|
|
):
|
|
"""
|
|
Args:
|
|
engine: SQLAlchemy 엔진
|
|
table: 대상 테이블
|
|
api_key: API 키 (None이면 설정에서 로드)
|
|
station_list: 측정소 목록 (None이면 설정에서 로드)
|
|
start_date: 수집 시작일 (YYYYMMDD)
|
|
force_update: 기존 데이터 덮어쓰기 여부
|
|
debug: 디버그 모드
|
|
"""
|
|
config = get_config()
|
|
|
|
self.api_key = api_key or config.data_api.get('service_key', '')
|
|
self.station_list = station_list or config.data_api.get('air_stations', ['운정'])
|
|
|
|
if start_date:
|
|
self.start_date = datetime.strptime(start_date, '%Y%m%d').date()
|
|
else:
|
|
self.start_date = datetime.strptime(
|
|
config.data_api.get('start_date', '20170101'), '%Y%m%d'
|
|
).date()
|
|
|
|
self.engine = engine
|
|
self.table = table
|
|
self.force_update = force_update
|
|
self.debug = debug
|
|
|
|
self.yesterday = (datetime.now() - timedelta(days=1)).date()
|
|
self.session = create_retry_session(retries=3)
|
|
|
|
# 캐시 로드
|
|
self._num_rows_cache = self._load_cache()
|
|
|
|
def _load_cache(self) -> Dict:
|
|
"""캐시 파일 로드"""
|
|
try:
|
|
cache_path = os.path.join(
|
|
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
|
|
self.CACHE_FILE
|
|
)
|
|
if os.path.exists(cache_path):
|
|
with open(cache_path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except Exception:
|
|
pass
|
|
return {}
|
|
|
|
def _save_cache(self):
|
|
"""캐시 파일 저장"""
|
|
try:
|
|
cache_path = os.path.join(
|
|
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
|
|
self.CACHE_FILE
|
|
)
|
|
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
|
|
with open(cache_path, 'w', encoding='utf-8') as f:
|
|
json.dump(self._num_rows_cache, f, ensure_ascii=False, indent=2)
|
|
except Exception as e:
|
|
logger.warning(f"캐시 저장 실패: {e}")
|
|
|
|
def get_latest_date(self, conn: Connection, station: str) -> Optional[date]:
|
|
"""
|
|
특정 측정소의 가장 최근 저장 날짜 조회
|
|
|
|
Args:
|
|
conn: DB 연결
|
|
station: 측정소명
|
|
|
|
Returns:
|
|
최근 날짜 또는 None
|
|
"""
|
|
try:
|
|
stmt = select(func.max(self.table.c.date)).where(
|
|
self.table.c.station == station
|
|
)
|
|
result = conn.execute(stmt).scalar()
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"최근 날짜 조회 실패: {e}")
|
|
return None
|
|
|
|
def parse_item_to_record(self, item: Dict, station: str) -> Optional[Dict]:
|
|
"""
|
|
API 응답 아이템을 DB 레코드로 변환
|
|
|
|
Args:
|
|
item: API 응답 아이템
|
|
station: 측정소명
|
|
|
|
Returns:
|
|
DB 레코드 딕셔너리 또는 None
|
|
"""
|
|
try:
|
|
item_date = datetime.strptime(item['msurDt'], '%Y-%m-%d').date()
|
|
except Exception as e:
|
|
logger.warning(f"날짜 파싱 오류: {item.get('msurDt')} - {e}")
|
|
return None
|
|
|
|
def safe_float(val):
|
|
"""안전한 float 변환"""
|
|
try:
|
|
return float(val) if val else None
|
|
except (ValueError, TypeError):
|
|
return None
|
|
|
|
return {
|
|
'date': item_date,
|
|
'station': station,
|
|
'pm25': safe_float(item.get('pm25Value')),
|
|
'pm10': safe_float(item.get('pm10Value')),
|
|
'so2': safe_float(item.get('so2Value')),
|
|
'co': safe_float(item.get('coValue')),
|
|
'no2': safe_float(item.get('no2Value')),
|
|
'o3': safe_float(item.get('o3Value')),
|
|
}
|
|
|
|
def save_items_to_db(
|
|
self,
|
|
items: List[Dict],
|
|
conn: Connection,
|
|
station: str
|
|
) -> int:
|
|
"""
|
|
데이터 항목들을 DB에 저장
|
|
|
|
Args:
|
|
items: 저장할 데이터 리스트
|
|
conn: DB 연결
|
|
station: 측정소명
|
|
|
|
Returns:
|
|
저장된 레코드 수
|
|
"""
|
|
saved_count = 0
|
|
|
|
for item in items:
|
|
data = self.parse_item_to_record(item, station)
|
|
if not data:
|
|
continue
|
|
|
|
item_date = data['date']
|
|
|
|
if self.debug:
|
|
logger.debug(f"[DEBUG] {item_date} [{station}] 저장 시도: {data}")
|
|
continue
|
|
|
|
try:
|
|
if self.force_update:
|
|
# UPSERT
|
|
stmt = mysql_insert(self.table).values(**data)
|
|
stmt = stmt.on_duplicate_key_update(**data)
|
|
conn.execute(stmt)
|
|
logger.info(f"{item_date} [{station}] 저장/업데이트 완료")
|
|
else:
|
|
# 중복 확인 후 삽입
|
|
sel = select(self.table.c.date).where(
|
|
and_(
|
|
self.table.c.date == item_date,
|
|
self.table.c.station == station
|
|
)
|
|
)
|
|
if conn.execute(sel).fetchone():
|
|
logger.debug(f"{item_date} [{station}] 이미 존재, 생략")
|
|
continue
|
|
|
|
conn.execute(self.table.insert().values(**data))
|
|
logger.info(f"{item_date} [{station}] 저장 완료")
|
|
|
|
saved_count += 1
|
|
|
|
except IntegrityError as e:
|
|
logger.error(f"중복 오류: {e}")
|
|
except Exception as e:
|
|
logger.error(f"저장 실패: {e}")
|
|
traceback.print_exc()
|
|
|
|
return saved_count
|
|
|
|
def find_optimal_num_rows(self, station_name: str, date_str: str) -> int:
|
|
"""
|
|
최적의 numOfRows 파라미터 값 탐색
|
|
|
|
API 서버 상태에 따라 최대 허용 rows 수가 다를 수 있어
|
|
적절한 값을 탐색합니다.
|
|
|
|
Args:
|
|
station_name: 측정소명
|
|
date_str: 날짜 (YYYYMMDD)
|
|
|
|
Returns:
|
|
최적의 numOfRows 값 (100~1000)
|
|
"""
|
|
# 캐시 확인
|
|
cache_key = f"{station_name}:{date_str}"
|
|
if cache_key in self._num_rows_cache:
|
|
cached_val = int(self._num_rows_cache[cache_key])
|
|
logger.debug(f"캐시된 numOfRows 사용: {cached_val}")
|
|
return cached_val
|
|
|
|
# 점진적으로 감소하며 테스트
|
|
max_rows = 1000
|
|
min_rows = 100
|
|
|
|
while max_rows >= min_rows:
|
|
try:
|
|
params = {
|
|
'serviceKey': self.api_key,
|
|
'returnType': 'json',
|
|
'numOfRows': str(max_rows),
|
|
'pageNo': '1',
|
|
'inqBginDt': date_str,
|
|
'inqEndDt': date_str,
|
|
'msrstnName': station_name,
|
|
}
|
|
response = self.session.get(AIR_QUALITY_API_URL, params=params, timeout=20)
|
|
response.raise_for_status()
|
|
response.json() # JSON 파싱 테스트
|
|
|
|
# 성공 - 캐시에 저장
|
|
self._num_rows_cache[cache_key] = max_rows
|
|
self._save_cache()
|
|
|
|
logger.debug(f"numOfRows 최대값: {max_rows}")
|
|
return max_rows
|
|
|
|
except Exception as e:
|
|
logger.warning(f"numOfRows={max_rows} 실패: {e}, 재시도...")
|
|
max_rows -= 100
|
|
|
|
logger.warning("기본값 100 사용")
|
|
return 100
|
|
|
|
def run(self) -> int:
|
|
"""
|
|
데이터 수집 및 저장 실행
|
|
|
|
모든 측정소에 대해 데이터를 수집하고 DB에 저장합니다.
|
|
|
|
Returns:
|
|
총 저장된 레코드 수
|
|
"""
|
|
total_saved = 0
|
|
|
|
with self.engine.begin() as conn:
|
|
for station_name in self.station_list:
|
|
logger.info(f"측정소 처리 시작: {station_name}")
|
|
|
|
# 시작일 결정
|
|
latest_date = self.get_latest_date(conn, station_name)
|
|
if latest_date:
|
|
start_date = latest_date + timedelta(days=1)
|
|
else:
|
|
start_date = self.start_date
|
|
|
|
if start_date > self.yesterday:
|
|
logger.info(f"{station_name}: 최신 데이터 존재 ({latest_date})")
|
|
continue
|
|
|
|
# 최적 numOfRows 탐색
|
|
optimal_rows = self.find_optimal_num_rows(
|
|
station_name,
|
|
start_date.strftime('%Y%m%d')
|
|
)
|
|
|
|
# 청크 단위로 데이터 수집
|
|
current_start = start_date
|
|
while current_start <= self.yesterday:
|
|
current_end = min(
|
|
current_start + timedelta(days=optimal_rows - 1),
|
|
self.yesterday
|
|
)
|
|
|
|
logger.info(f"{station_name}: {current_start} ~ {current_end} 수집")
|
|
|
|
items = get_air_quality(
|
|
self.api_key,
|
|
station_name,
|
|
current_start.strftime('%Y%m%d'),
|
|
current_end.strftime('%Y%m%d'),
|
|
num_of_rows=optimal_rows
|
|
)
|
|
|
|
if items:
|
|
saved = self.save_items_to_db(items, conn, station_name)
|
|
total_saved += saved
|
|
|
|
current_start = current_end + timedelta(days=1)
|
|
|
|
logger.info(f"대기질 데이터 수집 완료: 총 {total_saved}건 저장")
|
|
return total_saved
|