대기정보 수집처리 업데이트

This commit is contained in:
2025-07-21 17:35:32 +09:00
parent 085abc38f3
commit 5e7556e673

View File

@ -1,170 +1,167 @@
import sys, os import os, sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import requests import requests
from datetime import datetime, timedelta from datetime import datetime, timedelta
from sqlalchemy import select, func, and_ from sqlalchemy import select, func, and_
from sqlalchemy.exc import IntegrityError
from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.exc import IntegrityError
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from conf import db, db_schema from conf import db, db_schema
def test_num_of_rows(api_key, station_name, date_str=None):
if date_str is None:
date_str = (datetime.now() - timedelta(days=1)).strftime('%Y%m%d')
max_rows = 1000 class AirQualityCollector:
min_rows = 100 def __init__(self, config, engine, table):
self.api_key = config['DATA_API']['serviceKey']
self.station_list = config['DATA_API']['air']['station_name']
self.start_date = datetime.strptime(config['DATA_API']['startDt'], '%Y%m%d').date()
self.debug = config.get('debug', False)
self.force_update = config.get('force_update', False)
while max_rows >= min_rows: self.engine = engine
self.table = table
self.yesterday = (datetime.now() - timedelta(days=1)).date()
def get_latest_date(self, conn, station):
try: try:
url = "http://apis.data.go.kr/B552584/ArpltnStatsSvc/getMsrstnAcctoRDyrg" stmt = select(func.max(self.table.c.date)).where(self.table.c.station == station)
params = { result = conn.execute(stmt).scalar()
'serviceKey': api_key, if result is None:
'returnType': 'json', print(f"[INFO] {station}: 테이블에 데이터가 없습니다.")
'numOfRows': str(max_rows), return result
'pageNo': '1',
'inqBginDt': date_str,
'inqEndDt': date_str,
'msrstnName': station_name,
}
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
resp.json()
print(f"[INFO] numOfRows 최대값 탐색 성공: {max_rows}")
return max_rows
except (requests.RequestException, ValueError) as e:
print(f"[WARN] numOfRows={max_rows} 에러 발생: {e}, 100 감소 후 재시도")
max_rows -= 100
print("[WARN] 최소 numOfRows 값(100)도 실패했습니다. 기본값 100 사용")
return 100
def fetch_air_quality_data_range(start_date_str, end_date_str, api_key, station_name, num_of_rows=100, page_no=1):
url = "http://apis.data.go.kr/B552584/ArpltnStatsSvc/getMsrstnAcctoRDyrg"
params = {
'serviceKey': api_key,
'returnType': 'json',
'numOfRows': str(num_of_rows),
'pageNo': str(page_no),
'inqBginDt': start_date_str,
'inqEndDt': end_date_str,
'msrstnName': station_name,
}
try:
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
data = resp.json()
items = data.get('response', {}).get('body', {}).get('items', [])
return items
except requests.RequestException as e:
print(f"[ERROR] 요청 실패: {e}")
return []
except ValueError as e:
print(f"[ERROR] JSON 파싱 실패: {e}")
return []
def save_data_to_db(items, conn, table, station, force_update=False, debug=False):
for item in items:
try:
item_date = datetime.strptime(item['msurDt'], '%Y-%m-%d').date()
except Exception as e: except Exception as e:
print(f"[ERROR] 날짜 파싱 오류: {item.get('msurDt')} {e}") print(f"[ERROR] 가장 최근 날짜 조회 실패: {e}")
continue return None
data = { def save_data_to_db(self, items, conn, station):
'date': item_date, for item in items:
'station': station, try:
'pm25': float(item.get('pm25Value')) if item.get('pm25Value') else None, item_date = datetime.strptime(item['msurDt'], '%Y-%m-%d').date()
'pm10': float(item.get('pm10Value')) if item.get('pm10Value') else None, except Exception as e:
'so2': float(item.get('so2Value')) if item.get('so2Value') else None, print(f"[ERROR] 날짜 파싱 오류: {item.get('msurDt')}{e}")
'co': float(item.get('coValue')) if item.get('coValue') else None, continue
'no2': float(item.get('no2Value')) if item.get('no2Value') else None,
'o3': float(item.get('o3Value')) if item.get('o3Value') else None, data = {
'date': item_date,
'station': station,
'pm25': float(item.get('pm25Value')) if item.get('pm25Value') else None,
'pm10': float(item.get('pm10Value')) if item.get('pm10Value') else None,
'so2': float(item.get('so2Value')) if item.get('so2Value') else None,
'co': float(item.get('coValue')) if item.get('coValue') else None,
'no2': float(item.get('no2Value')) if item.get('no2Value') else None,
'o3': float(item.get('o3Value')) if item.get('o3Value') else None,
}
try:
if self.debug:
print(f"[DEBUG] {item_date} [{station}] → DB 저장 시도: {data}")
continue
if self.force_update:
stmt = mysql_insert(self.table).values(**data)
on_dup_stmt = stmt.on_duplicate_key_update(**data)
conn.execute(on_dup_stmt)
print(f"[INFO] {item_date} [{station}] 저장/업데이트 완료 (강제업데이트)")
else:
sel = select(self.table.c.date).where(
and_(self.table.c.date == item_date, self.table.c.station == station)
)
exists = conn.execute(sel).fetchone()
if exists:
print(f"[INFO] {item_date} [{station}] 이미 존재. 저장 생략")
continue
conn.execute(self.table.insert().values(**data))
print(f"[INFO] {item_date} [{station}] 저장 완료")
except IntegrityError as e:
print(f"[ERROR] DB 중복 오류: {e}")
except Exception as e:
print(f"[ERROR] DB 저장 실패: {e}")
def fetch_air_quality_data_range(self, start_date_str, end_date_str, station_name, num_of_rows=100, page_no=1):
url = "http://apis.data.go.kr/B552584/ArpltnStatsSvc/getMsrstnAcctoRDyrg"
params = {
'serviceKey': self.api_key,
'returnType': 'json',
'numOfRows': str(num_of_rows),
'pageNo': str(page_no),
'inqBginDt': start_date_str,
'inqEndDt': end_date_str,
'msrstnName': station_name,
} }
try: try:
if debug: resp = requests.get(url, params=params, timeout=10)
print(f"[DEBUG] {item_date} [{station}] → DB 저장 시도: {data}") resp.raise_for_status()
continue data = resp.json()
return data.get('response', {}).get('body', {}).get('items', [])
except requests.RequestException as e:
print(f"[ERROR] 요청 실패: {e}")
return []
except ValueError as e:
print(f"[ERROR] JSON 파싱 실패: {e}")
return []
if force_update: def test_num_of_rows(self, station_name, date_str):
stmt = mysql_insert(table).values(**data) max_rows = 1000
on_duplicate_stmt = stmt.on_duplicate_key_update(**data) min_rows = 100
conn.execute(on_duplicate_stmt)
print(f"[INFO] {item_date} [{station}] 저장/업데이트 완료 (강제업데이트)") while max_rows >= min_rows:
else: try:
sel = select(table.c.date).where(and_(table.c.date == item_date, table.c.station == station)) url = "http://apis.data.go.kr/B552584/ArpltnStatsSvc/getMsrstnAcctoRDyrg"
exists = conn.execute(sel).fetchone() params = {
if exists: 'serviceKey': self.api_key,
print(f"[INFO] {item_date} [{station}] 이미 존재. 저장 생략") 'returnType': 'json',
'numOfRows': str(max_rows),
'pageNo': '1',
'inqBginDt': date_str,
'inqEndDt': date_str,
'msrstnName': station_name,
}
resp = requests.get(url, params=params, timeout=10)
resp.raise_for_status()
resp.json()
print(f"[INFO] numOfRows 최대값 탐색 성공: {max_rows}")
return max_rows
except Exception as e:
print(f"[WARN] numOfRows={max_rows} 실패: {e}, 100 감소 후 재시도")
max_rows -= 100
print("[WARN] 최소 numOfRows 값(100)도 실패했습니다. 기본값 100 사용")
return 100
def run(self):
with self.engine.begin() as conn:
for station_name in self.station_list:
print(f"\n[INFO] 측정소: {station_name}")
latest_date = self.get_latest_date(conn, station_name)
start_date = max(self.start_date, latest_date + timedelta(days=1)) if latest_date else self.start_date
if start_date > self.yesterday:
print(f"[INFO] {station_name}: 최신 데이터가 이미 존재합니다. ({latest_date})")
continue continue
ins = table.insert().values(**data)
conn.execute(ins)
print(f"[INFO] {item_date} [{station}] 저장 완료")
except IntegrityError as e:
print(f"[ERROR] DB 중복 오류: {e}")
except Exception as e:
print(f"[ERROR] DB 저장 실패: {e}")
def get_latest_date(conn, table, station): optimal_num_rows = self.test_num_of_rows(station_name, start_date.strftime('%Y%m%d'))
try: print(f"[INFO] 최적 numOfRows: {optimal_num_rows}")
sel = select(func.max(table.c.date)).where(table.c.station == station)
result = conn.execute(sel).scalar()
if result is None:
print(f"[INFO] {station}: 테이블에 데이터가 없습니다.")
return result
except Exception as e:
print(f"[ERROR] 가장 최근 날짜 조회 실패: {e}")
return None
def main(): current_start = start_date
config = db.load_config() while current_start <= self.yesterday:
api_key = config['DATA_API']['serviceKey'] current_end = min(current_start + timedelta(days=optimal_num_rows - 1), self.yesterday)
station_list = config['DATA_API']['air']['station_name'] start_str = current_start.strftime('%Y%m%d')
debug = config.get('debug', False) end_str = current_end.strftime('%Y%m%d')
force_update = config.get('force_update', False)
config_start_date = datetime.strptime(config['DATA_API']['startDt'], '%Y%m%d').date()
yesterday = (datetime.now() - timedelta(days=1)).date()
engine = db.engine print(f"[INFO] {station_name}: {start_str} ~ {end_str} 데이터 요청 중...")
table = db_schema.air items = self.fetch_air_quality_data_range(start_str, end_str, station_name, optimal_num_rows)
with engine.connect() as conn: if items:
for station_name in station_list: self.save_data_to_db(items, conn, station_name)
print(f"\n[INFO] 측정소: {station_name}") else:
print(f"[INFO] {station_name}: {start_str} ~ {end_str} 데이터 없음")
latest_date = get_latest_date(conn, table, station_name) current_start = current_end + timedelta(days=1)
if latest_date:
start_date = max(config_start_date, latest_date + timedelta(days=1))
else:
start_date = config_start_date
if start_date > yesterday: print("\n[INFO] 모든 측정소 데이터 처리 완료")
print(f"[INFO] {station_name}: 최신 데이터가 이미 존재합니다. ({latest_date})")
continue
optimal_num_rows = test_num_of_rows(api_key, station_name=station_name, date_str=start_date.strftime('%Y%m%d'))
print(f"[INFO] 최적 numOfRows: {optimal_num_rows}")
current_start = start_date
while current_start <= yesterday:
current_end = min(current_start + timedelta(days=optimal_num_rows - 1), yesterday)
start_str = current_start.strftime('%Y%m%d')
end_str = current_end.strftime('%Y%m%d')
print(f"[INFO] {station_name}: {start_str} ~ {end_str} 데이터 요청 중...")
items = fetch_air_quality_data_range(start_str, end_str, api_key, station_name, num_of_rows=optimal_num_rows)
if items:
save_data_to_db(items, conn, table, station=station_name, force_update=force_update, debug=debug)
else:
print(f"[INFO] {station_name}: {start_str} ~ {end_str} 데이터 없음")
current_start = current_end + timedelta(days=1)
print("[INFO] 모든 측정소 데이터 처리 완료")
if __name__ == '__main__': if __name__ == '__main__':
main() config = db.load_config()
collector = AirQualityCollector(config, db.engine, db_schema.air)
collector.run()