# send_message.py import requests class MessageSender: def __init__(self, mattermost_url: str = "", mattermost_token: str = "", mattermost_channel_id: str = "", synology_webhook_url: str = "", telegram_bot_token: str = "", telegram_chat_id: str = ""): self.mattermost_url = mattermost_url.rstrip('/') self.mattermost_token = mattermost_token self.mattermost_channel_id = mattermost_channel_id self.synology_webhook_url = synology_webhook_url self.telegram_bot_token = telegram_bot_token self.telegram_chat_id = telegram_chat_id def send(self, message: str, platforms=None, use_webhook: bool = False): """ 메시지 전송 :param message: 전송할 메시지 :param platforms: 전송 플랫폼 리스트 (예: ['mattermost', 'telegram']) :param use_webhook: mattermost에서 웹훅 사용 여부 (mattermost 전용) """ if not platforms: print("[WARN] 전송할 플랫폼이 지정되지 않았습니다. 메시지를 보내지 않습니다.") return False if isinstance(platforms, str): platforms = [platforms] success = True for platform in platforms: p = platform.lower() if p == "mattermost": result = self._send_to_mattermost(message, use_webhook) elif p == "synology": result = self._send_to_synology_chat(message) elif p == "telegram": result = self._send_to_telegram(message) else: print(f"[ERROR] 지원하지 않는 플랫폼입니다: {p}") result = False if not result: success = False return success def _send_to_mattermost(self, message: str, use_webhook: bool): try: if use_webhook: response = requests.post( self.mattermost_url, json={"text": message}, headers={"Content-Type": "application/json"} ) else: url = f"{self.mattermost_url}/api/v4/posts" headers = { "Authorization": f"Bearer {self.mattermost_token}", "Content-Type": "application/json" } payload = { "channel_id": self.mattermost_channel_id, "message": message } response = requests.post(url, json=payload, headers=headers) if response.status_code not in [200, 201]: print(f"[ERROR] Mattermost 전송 실패: {response.status_code} {response.text}") return False else: print("[INFO] Mattermost 메시지 전송 완료") return True except Exception as e: print(f"[ERROR] Mattermost 전송 예외: {e}") return False def _send_to_synology_chat(self, message: str): try: payload = {"text": message} headers = {"Content-Type": "application/json"} response = requests.post(self.synology_webhook_url, json=payload, headers=headers) if response.status_code != 200: print(f"[ERROR] Synology Chat 전송 실패: {response.status_code} {response.text}") return False else: print("[INFO] Synology Chat 메시지 전송 완료") return True except Exception as e: print(f"[ERROR] Synology Chat 전송 예외: {e}") return False def _send_to_telegram(self, message: str): try: url = f"https://api.telegram.org/bot{self.telegram_bot_token}/sendMessage" payload = { "chat_id": self.telegram_chat_id, "text": message, "parse_mode": "Markdown" } response = requests.post(url, data=payload) if response.status_code != 200: print(f"[ERROR] Telegram 전송 실패: {response.status_code} {response.text}") return False else: print("[INFO] Telegram 메시지 전송 완료") return True except Exception as e: print(f"[ERROR] Telegram 전송 예외: {e}") return False