Files
naver-review-crawler/lib/send_message.py
2025-07-10 15:05:12 +09:00

115 lines
4.3 KiB
Python

import requests
class MessageSender:
def __init__(self,
mattermost_url: str = "", mattermost_token: str = "", mattermost_channel_id: str = "",
synology_webhook_url: str = "",
telegram_bot_token: str = "", telegram_chat_id: str = ""):
self.mattermost_url = mattermost_url.rstrip('/')
self.mattermost_token = mattermost_token
self.mattermost_channel_id = mattermost_channel_id
self.synology_webhook_url = synology_webhook_url
self.telegram_bot_token = telegram_bot_token
self.telegram_chat_id = telegram_chat_id
def send(self, message: str, platforms=None, use_webhook: bool = False):
"""
메시지 전송
:param message: 전송할 메시지
:param platforms: 전송 플랫폼 리스트 (예: ['mattermost', 'telegram'])
:param use_webhook: mattermost에서 웹훅 사용 여부 (mattermost 전용)
"""
if not platforms:
print("[WARN] 전송할 플랫폼이 지정되지 않았습니다. 메시지를 보내지 않습니다.")
return False
if isinstance(platforms, str):
platforms = [platforms]
success = True
for platform in platforms:
p = platform.lower()
if p == "mattermost":
result = self._send_to_mattermost(message, use_webhook)
elif p == "synology":
result = self._send_to_synology_chat(message)
elif p == "telegram":
result = self._send_to_telegram(message)
else:
print(f"[ERROR] 지원하지 않는 플랫폼입니다: {p}")
result = False
if not result:
success = False
return success
def _send_to_mattermost(self, message: str, use_webhook: bool):
try:
if use_webhook:
response = requests.post(
self.mattermost_url,
json={"text": message},
headers={"Content-Type": "application/json"}
)
else:
url = f"{self.mattermost_url}/api/v4/posts"
headers = {
"Authorization": f"Bearer {self.mattermost_token}",
"Content-Type": "application/json"
}
payload = {
"channel_id": self.mattermost_channel_id,
"message": message
}
response = requests.post(url, json=payload, headers=headers)
if response.status_code not in [200, 201]:
print(f"[ERROR] Mattermost 전송 실패: {response.status_code} {response.text}")
return False
else:
print("[INFO] Mattermost 메시지 전송 완료")
return True
except Exception as e:
print(f"[ERROR] Mattermost 전송 예외: {e}")
return False
def _send_to_synology_chat(self, message: str):
try:
payload = {"text": message}
headers = {"Content-Type": "application/json"}
response = requests.post(self.synology_webhook_url, json=payload, headers=headers)
if response.status_code != 200:
print(f"[ERROR] Synology Chat 전송 실패: {response.status_code} {response.text}")
return False
else:
print("[INFO] Synology Chat 메시지 전송 완료")
return True
except Exception as e:
print(f"[ERROR] Synology Chat 전송 예외: {e}")
return False
def _send_to_telegram(self, message: str):
try:
url = f"https://api.telegram.org/bot{self.telegram_bot_token}/sendMessage"
payload = {
"chat_id": self.telegram_chat_id,
"text": message,
"parse_mode": "Markdown"
}
response = requests.post(url, data=payload)
if response.status_code != 200:
print(f"[ERROR] Telegram 전송 실패: {response.status_code} {response.text}")
return False
else:
print("[INFO] Telegram 메시지 전송 완료")
return True
except Exception as e:
print(f"[ERROR] Telegram 전송 예외: {e}")
return False