120 lines
4.6 KiB
Python
120 lines
4.6 KiB
Python
import requests
|
|
|
|
class MessageSender:
|
|
def __init__(self,
|
|
mattermost_url: str = "", mattermost_token: str = "", mattermost_channel_id: str = "",
|
|
synology_webhook_url: str = "",
|
|
telegram_bot_token: str = "", telegram_chat_id: str = ""):
|
|
self.mattermost_url = mattermost_url.rstrip('/')
|
|
self.mattermost_token = mattermost_token
|
|
self.mattermost_channel_id = mattermost_channel_id
|
|
self.synology_webhook_url = synology_webhook_url
|
|
self.telegram_bot_token = telegram_bot_token
|
|
self.telegram_chat_id = telegram_chat_id
|
|
|
|
def send(self, message: str, platforms=None, use_webhook: bool = False):
|
|
"""
|
|
메시지 전송
|
|
|
|
:param message: 전송할 메시지
|
|
:param platforms: 전송 플랫폼 리스트 (예: ['mattermost', 'telegram'])
|
|
:param use_webhook: mattermost에서 웹훅 사용 여부 (mattermost 전용)
|
|
"""
|
|
if not platforms:
|
|
print("[WARN] 전송할 플랫폼이 지정되지 않았습니다. 메시지를 보내지 않습니다.")
|
|
return False
|
|
|
|
if isinstance(platforms, str):
|
|
platforms = [platforms]
|
|
|
|
success = True
|
|
for platform in platforms:
|
|
p = platform.lower()
|
|
if p == "mattermost":
|
|
result = self._send_to_mattermost(message, use_webhook)
|
|
elif p == "synology":
|
|
result = self._send_to_synology_chat(message)
|
|
elif p == "telegram":
|
|
result = self._send_to_telegram(message)
|
|
else:
|
|
print(f"[ERROR] 지원하지 않는 플랫폼입니다: {p}")
|
|
result = False
|
|
if not result:
|
|
success = False
|
|
return success
|
|
|
|
|
|
def _send_to_mattermost(self, message: str, use_webhook: bool):
|
|
try:
|
|
# Mattermost URL 검증
|
|
if not self.mattermost_url or not self.mattermost_url.startswith(('http://', 'https://')):
|
|
print(f"[ERROR] Mattermost URL이 유효하지 않습니다: {self.mattermost_url}")
|
|
return False
|
|
|
|
if use_webhook:
|
|
response = requests.post(
|
|
self.mattermost_url,
|
|
json={"text": message},
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
else:
|
|
url = f"{self.mattermost_url}/api/v4/posts"
|
|
headers = {
|
|
"Authorization": f"Bearer {self.mattermost_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
payload = {
|
|
"channel_id": self.mattermost_channel_id,
|
|
"message": message
|
|
}
|
|
response = requests.post(url, json=payload, headers=headers)
|
|
|
|
if response.status_code not in [200, 201]:
|
|
print(f"[ERROR] Mattermost 전송 실패: {response.status_code} {response.text}")
|
|
return False
|
|
else:
|
|
print("[INFO] Mattermost 메시지 전송 완료")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Mattermost 전송 예외: {e}")
|
|
return False
|
|
|
|
def _send_to_synology_chat(self, message: str):
|
|
try:
|
|
payload = {"text": message}
|
|
headers = {"Content-Type": "application/json"}
|
|
response = requests.post(self.synology_webhook_url, json=payload, headers=headers)
|
|
|
|
if response.status_code != 200:
|
|
print(f"[ERROR] Synology Chat 전송 실패: {response.status_code} {response.text}")
|
|
return False
|
|
else:
|
|
print("[INFO] Synology Chat 메시지 전송 완료")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Synology Chat 전송 예외: {e}")
|
|
return False
|
|
|
|
def _send_to_telegram(self, message: str):
|
|
try:
|
|
url = f"https://api.telegram.org/bot{self.telegram_bot_token}/sendMessage"
|
|
payload = {
|
|
"chat_id": self.telegram_chat_id,
|
|
"text": message,
|
|
"parse_mode": "Markdown"
|
|
}
|
|
response = requests.post(url, data=payload)
|
|
|
|
if response.status_code != 200:
|
|
print(f"[ERROR] Telegram 전송 실패: {response.status_code} {response.text}")
|
|
return False
|
|
else:
|
|
print("[INFO] Telegram 메시지 전송 완료")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"[ERROR] Telegram 전송 예외: {e}")
|
|
return False
|