262 lines
9.5 KiB
Python
262 lines
9.5 KiB
Python
# biz_crawler.py
|
|
import os, sys
|
|
import time
|
|
from datetime import datetime
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
|
|
from conf.config import (
|
|
HEADLESS, BIZ_ID, NAVER_ID, NAVER_PW,
|
|
START_DATE as CFG_START, END_DATE as CFG_END,
|
|
COOKIE_FILE, DEBUG,
|
|
MESSAGE_PLATFORMS, MATTERMOST_URL, MATTERMOST_BOT_TOKEN, MATTERMOST_CHANNEL_ID
|
|
)
|
|
from lib.send_message import MessageSender
|
|
from lib.common import (
|
|
create_mobile_driver,
|
|
save_cookies,
|
|
load_cookies,
|
|
get_start_end_dates,
|
|
send_failure_message,
|
|
clean_html_text
|
|
)
|
|
|
|
def debug(msg):
|
|
if DEBUG:
|
|
print(f"[DEBUG] {msg}")
|
|
|
|
class NaverReviewCollector:
|
|
def __init__(self, headless=HEADLESS):
|
|
self.headless = headless
|
|
self.driver = None
|
|
self.total_reviews = 0
|
|
self.start_date, self.end_date = get_start_end_dates(DEBUG, CFG_START, CFG_END)
|
|
self.reviews_by_place = {}
|
|
|
|
def create_driver(self):
|
|
self.driver = create_mobile_driver(self.headless)
|
|
|
|
def perform_login(self):
|
|
wait = WebDriverWait(self.driver, 20)
|
|
self.driver.get(f"https://new.smartplace.naver.com/bizes/place/{BIZ_ID[0]}/reviews")
|
|
time.sleep(2)
|
|
|
|
try:
|
|
modal = wait.until(EC.presence_of_element_located((By.ID, "modal-root")))
|
|
modal.find_element(By.XPATH, './/button').click()
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
wait.until(EC.presence_of_element_located((By.ID, 'id'))).send_keys(NAVER_ID)
|
|
self.driver.find_element(By.ID, 'pw').send_keys(NAVER_PW)
|
|
self.driver.find_element(By.XPATH, '//button[@type="submit"]').click()
|
|
except Exception:
|
|
return False
|
|
|
|
time.sleep(3)
|
|
if "captcha" in self.driver.page_source.lower():
|
|
input("CAPTCHA 수동 입력 후 Enter: ")
|
|
|
|
save_cookies(self.driver, COOKIE_FILE)
|
|
return True
|
|
|
|
def is_login_required(self):
|
|
return "로그인이 필요한 기능" in self.driver.page_source
|
|
|
|
def access_review_page(self, biz_id):
|
|
self.driver.get(f"https://new.smartplace.naver.com/bizes/place/{biz_id}/reviews")
|
|
time.sleep(2)
|
|
try:
|
|
el = WebDriverWait(self.driver, 10).until(
|
|
EC.presence_of_element_located((By.XPATH, '//*[starts-with(@class, "Header_btn_select_")]'))
|
|
)
|
|
return el.text.strip()
|
|
except Exception:
|
|
return "알수없음"
|
|
|
|
def extract_written_date(self, spans, li):
|
|
labels = [s.text.strip() for s in spans]
|
|
try:
|
|
if "작성일" in labels:
|
|
idx = labels.index("작성일")
|
|
return spans[idx + 1].find_element(By.TAG_NAME, "time").text.strip()
|
|
elif "예약자" in labels:
|
|
return li.find_element(By.XPATH, ".//div[3]/div[1]/span[2]/time").text.strip()
|
|
except Exception:
|
|
return None
|
|
|
|
def extract_review_text(self, li):
|
|
for i in range(4, 7):
|
|
try:
|
|
el = li.find_element(By.XPATH, f"./div[{i}]/a")
|
|
if el:
|
|
text = el.text.strip()
|
|
return clean_html_text(el.get_attribute("innerHTML")) if text else "내용 없음"
|
|
except Exception:
|
|
continue
|
|
return "내용 없음"
|
|
|
|
|
|
def extract_reviews(self):
|
|
reviews = []
|
|
try:
|
|
WebDriverWait(self.driver, 10).until(
|
|
EC.presence_of_element_located((By.XPATH, "//ul[starts-with(@class, 'Review_columns_list')]"))
|
|
)
|
|
lis = self.driver.find_elements(By.XPATH, "//ul[starts-with(@class, 'Review_columns_list')]/li")
|
|
for li in lis:
|
|
try:
|
|
if "Review_banner__" in li.get_attribute("class"):
|
|
continue
|
|
|
|
author = li.find_element(By.XPATH, ".//div[1]/a[2]/div/span/span").text.strip()
|
|
visit_text = li.find_element(By.XPATH, ".//div[2]/div[1]/span[2]/time").text.strip()
|
|
visit_date = datetime.strptime(
|
|
visit_text.split("(")[0].replace(". ", "-").replace(".", ""), "%Y-%m-%d"
|
|
).strftime("%Y-%m-%d")
|
|
|
|
spans = li.find_elements(By.XPATH, ".//div[2]/div[2]/span")
|
|
written_text = self.extract_written_date(spans, li)
|
|
if not written_text:
|
|
continue
|
|
|
|
try:
|
|
written_date = datetime.strptime(
|
|
written_text.split("(")[0].replace(". ", "-").replace(".", ""), "%Y-%m-%d"
|
|
).date()
|
|
except ValueError:
|
|
continue
|
|
|
|
if not (self.start_date <= written_date <= self.end_date):
|
|
continue
|
|
|
|
text = self.extract_review_text(li)
|
|
#if not text:
|
|
# continue
|
|
|
|
reviews.append({
|
|
"작성자": author,
|
|
"방문일": visit_date,
|
|
"작성일": written_date,
|
|
"내용": text
|
|
})
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
return reviews
|
|
|
|
def send_to_message(self):
|
|
today_str = datetime.today().strftime("%Y년 %m월 %d일")
|
|
now_str = datetime.now().strftime("%H:%M:%S")
|
|
lines = [f"##### {today_str} 네이버 리뷰 크롤링 결과", ""]
|
|
lines.append(f"**수집 시간 :** {now_str}")
|
|
lines.append(f"**총 리뷰 수 :** {self.total_reviews}")
|
|
lines.append("")
|
|
|
|
for place_name, reviews in self.reviews_by_place.items():
|
|
lines.append(f"- {place_name}: {len(reviews)}건 ")
|
|
lines.append("\n---\n")
|
|
|
|
for idx, (place_name, reviews) in enumerate(self.reviews_by_place.items(), start=1):
|
|
lines.append(f"**{idx}. {place_name}** ")
|
|
lines.append("")
|
|
if not reviews:
|
|
lines.append("- 리뷰 없음\n")
|
|
else:
|
|
for r in reviews:
|
|
lines.append(f"- **작성일** : {r['작성일']} ")
|
|
lines.append(f" **방문일** : {r['방문일']} ")
|
|
lines.append(f" **작성자** : {r['작성자']} ")
|
|
lines.append(f" **내용** : {r['내용']}\n")
|
|
lines.append("---")
|
|
|
|
message = "\n".join(lines)
|
|
|
|
if not MESSAGE_PLATFORMS:
|
|
print("[WARN] 메시지 전송 플랫폼이 지정되지 않음. 미전송")
|
|
debug(message)
|
|
return
|
|
|
|
sender = MessageSender(
|
|
mattermost_url=MATTERMOST_URL,
|
|
mattermost_token=MATTERMOST_BOT_TOKEN,
|
|
mattermost_channel_id=MATTERMOST_CHANNEL_ID,
|
|
)
|
|
|
|
if DEBUG:
|
|
debug(f"메시지 플랫폼: {MESSAGE_PLATFORMS}")
|
|
debug("디버그 모드: 메시지 전송 생략")
|
|
debug(message)
|
|
else:
|
|
sender.send(message, platforms=MESSAGE_PLATFORMS, use_webhook=False)
|
|
|
|
def run(self):
|
|
while True:
|
|
self.create_driver()
|
|
self.driver.get("https://naver.com")
|
|
time.sleep(1)
|
|
|
|
if os.path.exists(COOKIE_FILE):
|
|
try:
|
|
load_cookies(self.driver, COOKIE_FILE)
|
|
self.driver.get("https://naver.com")
|
|
time.sleep(1)
|
|
except Exception:
|
|
os.remove(COOKIE_FILE)
|
|
self.driver.quit()
|
|
self.headless = False
|
|
continue
|
|
else:
|
|
if self.headless:
|
|
self.driver.quit()
|
|
self.headless = False
|
|
continue
|
|
if not self.perform_login():
|
|
self.driver.quit()
|
|
return
|
|
self.driver.quit()
|
|
continue
|
|
|
|
break # 쿠키 로딩 또는 로그인 성공 시 루프 종료
|
|
|
|
for biz_id in BIZ_ID:
|
|
place_name = self.access_review_page(biz_id)
|
|
print(f"\n=== [{place_name}({biz_id})] 리뷰 수집 시작 ===")
|
|
if self.is_login_required():
|
|
print("[WARN] 세션 만료 또는 쿠키 무효. 로그인 재진행")
|
|
os.remove(COOKIE_FILE)
|
|
self.driver.quit()
|
|
self.headless = False
|
|
return self.run()
|
|
|
|
try:
|
|
reviews = self.extract_reviews()
|
|
print(f"[RESULT] 리뷰 {len(reviews)}개 수집됨")
|
|
self.total_reviews += len(reviews)
|
|
self.reviews_by_place[place_name] = reviews
|
|
except Exception as e:
|
|
print(f"[ERROR] {biz_id} 처리 중 오류:", e)
|
|
self.reviews_by_place[place_name] = []
|
|
|
|
self.driver.quit()
|
|
|
|
if not self.reviews_by_place:
|
|
sender = MessageSender(
|
|
mattermost_url=MATTERMOST_URL,
|
|
mattermost_token=MATTERMOST_BOT_TOKEN,
|
|
mattermost_channel_id=MATTERMOST_CHANNEL_ID,
|
|
)
|
|
|
|
send_failure_message(sender, MESSAGE_PLATFORMS)
|
|
else:
|
|
self.send_to_message()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
collector = NaverReviewCollector(headless=HEADLESS)
|
|
collector.run()
|