작업 폴더 변경

This commit is contained in:
2025-06-30 14:32:52 +09:00
parent 97d804e801
commit ed56635eb9
6 changed files with 17 additions and 11 deletions

View File

@ -1,31 +0,0 @@
from datetime import datetime
TODAY = datetime.now().strftime('%Y%m%d')
FTP_CONFIG = {
'HOST' : 'FTP_HOST',
'USER' : 'FTP_USER',
'PASS' : 'FTP_PW',
'UPLOAD_DIR' : 'UPLOAD_DIR'
}
DB_CONFIG = {
'HOST' : 'HOST',
'USER' : 'DBUSER',
'DBNAME' : 'DBNAME',
'PASS' : 'DB_PW'
'CHARSET'='utf8mb4',
}
###
subject, content, file1, file2는 스크립트 진행중 설정함
###
MAIN = {
'board' : '게시판 ID',
'ca_name' : '카테고리가 있는경우 카테고리 이름, 없으면 비움',
'subject' : '',
'content' : '',
'mb_id' : '게시자 ID',
'nickname' : '닉네임',
'file1' : '',
'file2' : '',
}

View File

@ -1,203 +0,0 @@
# -*- coding: utf-8 -*-
"""
gnu_autoupload.py
기능:
1. Selenium을 이용해 날씨 정보를 캡처 (weather_capture.py 호출)
2. FTP를 이용해 이미지 업로드
3. 그누보드 DB에 게시글 및 첨부파일 정보 자동 등록
"""
import os
import sys
import time
import subprocess
import tempfile
import hashlib
from datetime import datetime
from PIL import Image
import pymysql
import ftputil
from config import DB_CONFIG, FTP_CONFIG, MAIN
from weather import get_precipitation_summary
# ---------------------------
# 이미지 캡처 함수
# ---------------------------
def capture_image(script_path, output_path, max_attempts=5):
for attempt in range(max_attempts):
print(f"[{datetime.now().strftime('%H:%M:%S')}] 이미지 캡처 시도 {attempt + 1}/{max_attempts}")
try:
subprocess.run([sys.executable, script_path], check=True)
except subprocess.CalledProcessError as e:
print(f"[오류] weather_capture.py 실행 실패: {e}")
if os.path.isfile(output_path):
print(f"[성공] 이미지가 정상적으로 캡처되었습니다: {output_path}")
return True
time.sleep(2)
print(f"[실패] {max_attempts}회 시도 후에도 이미지가 생성되지 않았습니다.")
return False
# ---------------------------
# 파일 관련 유틸 함수
# ---------------------------
def file_type(ext):
return {
'gif': '1', 'jpeg': '2', 'jpg': '2', 'png': '3', 'swf': '4',
'psd': '5', 'bmp': '6', 'tif': '7', 'tiff': '7', 'jpc': '9',
'jp2': '10', 'jpx': '11', 'jb2': '12', 'swc': '13', 'iff': '14',
'wbmp': '15', 'xbm': '16'
}.get(ext.lower(), '0')
def get_filename(filename):
ms = datetime.now().microsecond
encoded_name = filename.encode('utf-8')
return f'{ms}_{hashlib.sha1(encoded_name).hexdigest()}'
def file_upload(filename, bf_file):
try:
with ftputil.FTPHost(FTP_CONFIG['HOST'], FTP_CONFIG['USER'], FTP_CONFIG['PASS']) as fh:
fh.chdir(FTP_CONFIG['UPLOAD_DIR'])
fh.upload(filename, bf_file)
print(f"[업로드 완료] '{filename}''{bf_file}' 로 FTP 업로드 완료")
return True
except Exception as e:
print(f"[FTP 오류] {type(e).__name__}: {e}")
return False
# ---------------------------
# 게시글 작성 함수
# ---------------------------
def write_board(board, subject, content, mb_id, nickname, ca_name=None, file_list=None):
try:
conn = pymysql.connect(
host=DB_CONFIG['HOST'],
user=DB_CONFIG['USER'],
db=DB_CONFIG['DBNAME'],
password=DB_CONFIG['PASS'],
charset=DB_CONFIG['CHARSET']
)
curs = conn.cursor()
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
curs.execute(f"SELECT wr_num FROM g5_write_{board}")
wr_num = str(int(curs.fetchone()[0]) - 1)
curs.execute(f"""INSERT INTO g5_write_{board} SET wr_num = {wr_num},
wr_reply = '', wr_comment = 0, ca_name = %s, wr_option = 'html1', wr_subject = %s,
wr_content = %s, wr_link1 = '', wr_link2 = '', wr_link1_hit = 0, wr_link2_hit = 0,
wr_hit = 1, wr_good = 0, wr_nogood = 0, mb_id = %s, wr_password = '',
wr_name = %s, wr_email = '', wr_homepage = '',
wr_datetime = %s, wr_last = %s,
wr_ip = '111.111.111.111',
wr_comment_reply = '', wr_facebook_user = '', wr_twitter_user = '',
wr_1 = '', wr_2 = '', wr_3 = '', wr_4 = '', wr_5 = '', wr_6 = '', wr_7 = '', wr_8 = '', wr_9 = '', wr_10 = ''
""",
(ca_name, subject, content, mb_id, nickname, now, now))
curs.execute(f"SELECT wr_id FROM g5_write_{board} ORDER BY wr_id DESC LIMIT 1")
wr_id = str(curs.fetchone()[0])
curs.execute(f"UPDATE g5_write_{board} SET wr_parent = {wr_id} WHERE wr_id = {wr_id}")
curs.execute(f"""INSERT INTO g5_board_new (bo_table, wr_id, wr_parent, bn_datetime, mb_id)
VALUES (%s, %s, %s, %s, %s)""", (board, wr_id, wr_id, now, mb_id))
curs.execute(f"SELECT bo_count_write FROM g5_board WHERE bo_table = %s", (board,))
bo_count_write = int(curs.fetchone()[0])
curs.execute(f"UPDATE g5_board SET bo_count_write = %s WHERE bo_table = %s",
(bo_count_write + 1, board))
file_count = 0
if file_list:
for idx, file in enumerate(file_list):
ext = os.path.splitext(file)[1].lstrip('.')
bf_file = f"{get_filename(file)}.{ext}"
if file_upload(file, bf_file):
img_type = file_type(ext)
width, height = (0, 0)
if img_type != '0':
with Image.open(file) as img:
width, height = img.size
size = os.path.getsize(file)
curs.execute(f"""INSERT INTO g5_board_file SET bo_table = %s, wr_id = %s, bf_no = %s,
bf_source = %s, bf_file = %s, bf_content = '', bf_download = 0,
bf_filesize = %s, bf_width = %s, bf_height = %s, bf_type = %s, bf_datetime = %s""",
(board, wr_id, idx, os.path.basename(file), bf_file,
size, width, height, img_type, now))
file_count += 1
else:
raise Exception(f"[FTP 오류] 파일 업로드 실패: {file}")
curs.execute(f"UPDATE g5_write_{board} SET wr_file = %s WHERE wr_id = %s", (file_count, wr_id))
conn.commit()
print("[성공] 게시글과 첨부파일 등록 완료")
except Exception as e:
conn.rollback()
if "[FTP 오류]" in str(e):
print(f"[FTP 오류] {e}")
else:
print(f"[DB 오류] {type(e).__name__}: {e}")
raise
finally:
conn.close()
# ---------------------------
# 메인 실행 함수
# ---------------------------
def main():
# 날씨 정보 문자열 얻기
weather_content = get_precipitation_summary()
# MAIN['content'] 업데이트
MAIN['content'] = weather_content
today = datetime.today().strftime('%Y%m%d')
script_dir = os.path.dirname(os.path.abspath(__file__))
capture_script = os.path.join(script_dir, 'weather_capture.py')
weather_filename = f'weather_capture_{today}.png'
weather_file = os.path.join(script_dir, weather_filename)
thumb_file = os.path.join(script_dir, 'thumb.jpg')
if not capture_image(capture_script, weather_file):
return
FTP_CONFIG['UPLOAD_DIR'] = f"/www/data/file/{MAIN['board']}/"
MAIN['subject'] = f"[{datetime.now().strftime('%Y-%m-%d')}] 날씨 정보"
MAIN['file1'] = thumb_file
MAIN['file2'] = weather_file
file_list = [MAIN['file1'], MAIN['file2']]
write_board(
board=MAIN['board'],
subject=MAIN['subject'],
content=MAIN['content'],
mb_id=MAIN['mb_id'],
nickname=MAIN['nickname'],
ca_name=MAIN['ca_name'],
file_list=file_list
)
try:
# weather_file만 삭제, thumb.jpg는 삭제하지 않음
if os.path.isfile(weather_file):
os.remove(weather_file)
print(f"[정리 완료] 캡처 이미지 삭제됨: {weather_file}")
except Exception as e:
print(f"[삭제 오류] {type(e).__name__}: {e}")
if __name__ == "__main__":
main()

View File

@ -1,3 +0,0 @@
#!/bin/bash
echo "run.sh 시작: $(date '+%Y-%m-%d %H:%M:%S')"
python3 /data/gnu_autoupload.py >> /proc/1/fd/1 2>&1

View File

@ -1,78 +0,0 @@
# weather.py
import requests
import json
import re
from datetime import datetime
from config import serviceKey, TODAY
def parse_precip(value):
if value == '강수없음':
return 0.0
elif '1mm 미만' in value:
return 0.5
else:
match = re.search(r"[\d.]+", value)
if match:
return float(match.group())
else:
return 0.0
def get_precipitation_summary(retry=True):
url = "http://apis.data.go.kr/1360000/VilageFcstInfoService_2.0/getVilageFcst"
params = {
'serviceKey': serviceKey,
'numOfRows': '1000',
'pageNo': '1',
'dataType': 'JSON',
'base_date': TODAY,
'base_time': '0800',
'nx': '57',
'ny': '130'
}
response = requests.get(url, params=params)
try:
data = response.json()
total_rainfall = 0.0
lines = [
'<div class="weatherinfo" style="max-width: 100%; overflow-x: auto; padding: 10px; box-sizing: border-box;">',
'<h3 style="font-size: 1.8em; text-align: center; margin: 20px 0;">[시간대별 예상 강수량]</h3>',
'<table style="border-collapse: collapse; width: 100%; max-width: 400px; margin: 0 auto; font-size: 1em;">',
'<thead>',
'<tr>',
'<th style="border: 1px solid #333; padding: 2px;background-color: #f0f0f0;">시간</th>',
'<th style="border: 1px solid #333; padding: 2px;background-color: #f0f0f0;">강수량</th>',
'</tr>',
'</thead>',
'<tbody>'
]
for item in data['response']['body']['items']['item']:
if item['category'] == 'PCP' and item['fcstDate'] == TODAY:
time = item['fcstTime']
if 900 < int(time) < 2300:
mm = parse_precip(item['fcstValue'])
time_str = f"{time[:2]}:{time[2:]}"
lines.append(f'<tr><td style="border: 1px solid #333; padding: 2px;text-align: center;">{time_str}</td><td style="border: 1px solid #333; padding: 2px;text-align: center;">{mm}mm</td></tr>')
total_rainfall += mm
lines.append(f'<tr><td colspan="2" style="border: 1px solid #333; padding: 2px;text-align: center; font-weight: bold;">영업시간 중 총 예상 강수량: {total_rainfall:.1f}mm</td></tr>')
lines.append('</tbody></table><p style="text-align:right; font-size: 0.8em;">08:00 파주 조리읍 기상청 단기예보 기준</div>')
return ''.join(lines)
except json.decoder.JSONDecodeError:
if retry:
print("JSON 디코드 오류 발생, 재시도 중...")
time.sleep(3) # 3초 대기
return get_precipitation_summary(retry=False)
else:
print("응답이 JSON 형식이 아닙니다.")
return ''
# 테스트용
if __name__ == "__main__":
print(get_precipitation_summary())

View File

@ -1,76 +0,0 @@
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
from datetime import datetime
import os
import time
import tempfile
from config import TODAY
# 크롬 옵션 설정
options = Options()
options.add_argument('--headless')
options.add_argument('--window-size=1802,1467')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')
# 임시 사용자 데이터 디렉토리 생성 및 지정 (중복 실행 문제 방지)
temp_dir = tempfile.mkdtemp()
options.add_argument(f'--user-data-dir={temp_dir}')
driver = webdriver.Chrome(options=options)
try:
script_dir = os.path.dirname(os.path.abspath(__file__))
driver.get('https://www.weather.go.kr/w/weather/forecast/short-term.do#dong/4148026200/37.73208578534846/126.79463099866948')
wait = WebDriverWait(driver, 10)
# 첫 번째 탭 클릭 (안전하게 클릭 대기)
tab_button = wait.until(EC.element_to_be_clickable(
(By.XPATH, '//*[@id="digital-forecast"]/div[1]/div[3]/div[1]/div/div/a[2]')
))
tab_button.click()
# 두 번째 항목 클릭 - stale element 대비 최대 5회 재시도
list_button_xpath = '//*[@id="digital-forecast"]/div[1]/div[3]/ul/div[1]/a[2]'
for attempt in range(5):
try:
list_button = wait.until(EC.presence_of_element_located((By.XPATH, list_button_xpath)))
wait.until(EC.element_to_be_clickable((By.XPATH, list_button_xpath)))
list_button.click()
break
except StaleElementReferenceException:
print(f"시도 {attempt+1}: stale element 참조 오류 발생, 재시도 중...")
time.sleep(1)
except TimeoutException:
print(f"시도 {attempt+1}: 요소 대기 시간 초과, 재시도 중...")
time.sleep(1)
else:
print("두 번째 항목 클릭 실패. 스크립트 종료.")
driver.quit()
exit(1)
time.sleep(2) # 페이지 반영 대기
# 캡처 대상 요소 대기 후 찾기
target_element = wait.until(EC.presence_of_element_located(
(By.XPATH, '/html/body/div[2]/section/div/div[2]')
))
# 저장 경로 설정
save_path = os.path.join(script_dir, f'weather_capture_{TODAY}.png')
# 요소 스크린샷 저장
target_element.screenshot(save_path)
print(f'[캡처 완료] 저장 위치: {save_path}')
finally:
driver.quit()