Files
fg-auto/app/weather_capture.py

258 lines
9.3 KiB
Python

import logging
import os
import sys
import time
import sqlite3
import re
from datetime import datetime
from config import TODAY
from selenium_manager import SeleniumManager
# 로깅 설정
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
WEATHER_URL = 'https://www.weather.go.kr/w/weather/forecast/short-term.do#dong/4148026200/37.73208578534846/126.79463099866948'
OUTPUT_DIR = '/data'
OUTPUT_FILENAME = f'weather_capture_{TODAY}.png'
DB_PATH = '/data/weather.sqlite'
def parse_rainfall(value):
"""
강수량 텍스트를 숫자(mm)로 변환
- '-'는 0.0
- '~1'은 0.5
- 숫자만 있으면 float로 변환
"""
if not value or value.strip() == '-':
return 0.0
elif '~' in value: # '~1mm' 형태
return 0.5
else:
try:
return float(re.search(r'[\d.]+', value).group())
except (AttributeError, ValueError):
logger.warning(f"강수량 파싱 실패: {value}")
return 0.0
def extract_rainfall_from_page(driver):
"""
Selenium driver에서 시간별 강수량 데이터 추출
10시~21시(오후 9시) 데이터만 수집
Returns:
dict: {시간(int): 강수량(float)} 형태, 또는 None (실패 시)
"""
try:
logger.info("페이지에서 강수량 데이터 추출 시작...")
time.sleep(1) # 페이지 로드 대기
# 테이블에서 시간별 강수량 추출
# 기상청 웹사이트 구조에 맞게 조정 필요
rainfall_data = {}
# 방법 1: 테이블 행(tr) 순회
try:
rows = driver.find_elements("xpath", "//table//tr")
if not rows:
logger.warning("테이블 행을 찾을 수 없음, 대체 방법 시도...")
return extract_rainfall_alternative(driver)
for row in rows:
try:
# 각 행에서 시간과 강수량 추출
cells = row.find_elements("tag name", "td")
if len(cells) >= 2:
time_cell = cells[0].text.strip()
rain_cell = cells[1].text.strip()
# 시간 파싱 (HH:00 형태)
time_match = re.search(r'(\d{1,2}):?00?', time_cell)
if time_match:
hour = int(time_match.group(1))
if 10 <= hour <= 21: # 10시~21시(오후 9시)
rainfall = parse_rainfall(rain_cell)
rainfall_data[hour] = rainfall
logger.info(f" {hour:02d}:00 → {rainfall}mm")
except Exception as e:
logger.debug(f"행 파싱 중 오류: {e}")
continue
except Exception as e:
logger.warning(f"테이블 파싱 실패: {e}, 대체 방법 시도...")
return extract_rainfall_alternative(driver)
if rainfall_data:
total = sum(rainfall_data.values())
logger.info(f"총 강수량: {total:.1f}mm")
return rainfall_data
else:
logger.warning("추출된 강수량 데이터가 없음")
return None
except Exception as e:
logger.error(f"강수량 데이터 추출 중 오류: {type(e).__name__}: {e}", exc_info=True)
return None
def extract_rainfall_alternative(driver):
"""
대체 방법: span/div 엘리먼트에서 강수량 추출
"""
try:
logger.info("대체 방법으로 강수량 추출 시도...")
# 기상청 사이트의 실제 구조에 맞게 조정
rainfall_data = {}
# 시간 레이블 찾기
hour_elements = driver.find_elements("xpath", "//span[contains(text(), '')]")
for elem in hour_elements:
try:
text = elem.text
match = re.search(r'(\d{1,2})시', text)
if match:
hour = int(match.group(1))
if 10 <= hour <= 21:
# 시간 엘리먼트 다음 형제에서 강수량 찾기
parent = elem.find_element("xpath", "./ancestor::*[position()=3]")
rain_elem = parent.find_element("xpath", ".//span[last()]")
rainfall = parse_rainfall(rain_elem.text)
rainfall_data[hour] = rainfall
logger.info(f" {hour:02d}:00 → {rainfall}mm")
except Exception as e:
logger.debug(f"대체 방법 파싱 중 오류: {e}")
continue
return rainfall_data if rainfall_data else None
except Exception as e:
logger.error(f"대체 방법 실패: {e}")
return None
def save_rainfall_to_db(rainfall_data):
"""
추출한 강수량 데이터를 SQLite DB에 저장
Args:
rainfall_data: {시간(int): 강수량(float)} 딕셔너리
Returns:
bool: 성공 여부
"""
if not rainfall_data:
logger.warning("저장할 강수량 데이터가 없음")
return False
try:
os.makedirs(os.path.dirname(DB_PATH) or '/data', exist_ok=True)
conn = sqlite3.connect(DB_PATH)
curs = conn.cursor()
# 테이블 생성
curs.execute('''
CREATE TABLE IF NOT EXISTS rainfall_capture (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
hour INTEGER NOT NULL,
rainfall REAL NOT NULL,
UNIQUE(date, hour)
)
''')
curs.execute('''
CREATE TABLE IF NOT EXISTS rainfall_summary (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL UNIQUE,
total_rainfall REAL NOT NULL,
capture_time TEXT NOT NULL
)
''')
# 기존 데이터 삭제
curs.execute('DELETE FROM rainfall_capture WHERE date = ?', (TODAY,))
curs.execute('DELETE FROM rainfall_summary WHERE date = ?', (TODAY,))
# 시간별 강수량 저장
total_rainfall = 0.0
for hour in sorted(rainfall_data.keys()):
rainfall = rainfall_data[hour]
curs.execute(
'INSERT INTO rainfall_capture (date, hour, rainfall) VALUES (?, ?, ?)',
(TODAY, hour, rainfall)
)
total_rainfall += rainfall
logger.info(f"DB 저장: {TODAY} {hour:02d}:00 → {rainfall}mm")
# 합계 저장
capture_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
curs.execute(
'INSERT INTO rainfall_summary (date, total_rainfall, capture_time) VALUES (?, ?, ?)',
(TODAY, total_rainfall, capture_time)
)
conn.commit()
conn.close()
logger.info(f"[DB 저장 완료] {TODAY} 총 강수량: {total_rainfall:.1f}mm")
return True
except Exception as e:
logger.error(f"DB 저장 중 오류: {type(e).__name__}: {e}", exc_info=True)
return False
def capture_weather():
"""기상청 날씨 정보 캡처 및 강수량 데이터 추출"""
# 저장 경로 설정
output_path = os.path.join(OUTPUT_DIR, OUTPUT_FILENAME)
# 저장 디렉토리 생성 (없으면)
os.makedirs(OUTPUT_DIR, exist_ok=True)
manager = SeleniumManager()
try:
with manager.managed_driver():
logger.info(f"URL 접속: {WEATHER_URL}")
manager.driver.get(WEATHER_URL)
# 첫 번째 탭 클릭
logger.info("첫 번째 탭 클릭 시도...")
if not manager.click_with_retry(manager.WEATHER_SELECTORS['tab_button']):
logger.error("첫 번째 탭 클릭 실패")
return False
# 두 번째 항목 클릭
logger.info("두 번째 항목 클릭 시도...")
if not manager.click_with_retry(manager.WEATHER_SELECTORS['list_button']):
logger.error("두 번째 항목 클릭 실패")
return False
# 페이지 반영 대기
time.sleep(2)
# 강수량 데이터 추출
rainfall_data = extract_rainfall_from_page(manager.driver)
if rainfall_data:
save_rainfall_to_db(rainfall_data)
else:
logger.warning("강수량 데이터 추출 실패 (캡처는 진행)")
# 스크린샷 저장
logger.info(f"스크린샷 저장 시도: {output_path}")
if manager.take_element_screenshot(manager.WEATHER_SELECTORS['target_element'], output_path):
logger.info(f"[캡처 완료] 저장 위치: {output_path}")
return True
else:
logger.error("스크린샷 저장 실패")
return False
except Exception as e:
logger.error(f"[오류] 날씨 캡처 중 오류 발생: {type(e).__name__}: {e}", exc_info=True)
return False
if __name__ == '__main__':
success = capture_weather()
sys.exit(0 if success else 1)