.env를 crontab에서 인식하지 못하는 문제 수정
This commit is contained in:
119
app/send_message.py
Normal file
119
app/send_message.py
Normal file
@ -0,0 +1,119 @@
|
||||
import requests
|
||||
|
||||
class MessageSender:
|
||||
def __init__(self,
|
||||
mattermost_url: str = "", mattermost_token: str = "", mattermost_channel_id: str = "",
|
||||
synology_webhook_url: str = "",
|
||||
telegram_bot_token: str = "", telegram_chat_id: str = ""):
|
||||
self.mattermost_url = mattermost_url.rstrip('/')
|
||||
self.mattermost_token = mattermost_token
|
||||
self.mattermost_channel_id = mattermost_channel_id
|
||||
self.synology_webhook_url = synology_webhook_url
|
||||
self.telegram_bot_token = telegram_bot_token
|
||||
self.telegram_chat_id = telegram_chat_id
|
||||
|
||||
def send(self, message: str, platforms=None, use_webhook: bool = False):
|
||||
"""
|
||||
메시지 전송
|
||||
|
||||
:param message: 전송할 메시지
|
||||
:param platforms: 전송 플랫폼 리스트 (예: ['mattermost', 'telegram'])
|
||||
:param use_webhook: mattermost에서 웹훅 사용 여부 (mattermost 전용)
|
||||
"""
|
||||
if not platforms:
|
||||
print("[WARN] 전송할 플랫폼이 지정되지 않았습니다. 메시지를 보내지 않습니다.")
|
||||
return False
|
||||
|
||||
if isinstance(platforms, str):
|
||||
platforms = [platforms]
|
||||
|
||||
success = True
|
||||
for platform in platforms:
|
||||
p = platform.lower()
|
||||
if p == "mattermost":
|
||||
result = self._send_to_mattermost(message, use_webhook)
|
||||
elif p == "synology":
|
||||
result = self._send_to_synology_chat(message)
|
||||
elif p == "telegram":
|
||||
result = self._send_to_telegram(message)
|
||||
else:
|
||||
print(f"[ERROR] 지원하지 않는 플랫폼입니다: {p}")
|
||||
result = False
|
||||
if not result:
|
||||
success = False
|
||||
return success
|
||||
|
||||
|
||||
def _send_to_mattermost(self, message: str, use_webhook: bool):
|
||||
try:
|
||||
# Mattermost URL 검증
|
||||
if not self.mattermost_url or not self.mattermost_url.startswith(('http://', 'https://')):
|
||||
print(f"[ERROR] Mattermost URL이 유효하지 않습니다: {self.mattermost_url}")
|
||||
return False
|
||||
|
||||
if use_webhook:
|
||||
response = requests.post(
|
||||
self.mattermost_url,
|
||||
json={"text": message},
|
||||
headers={"Content-Type": "application/json"}
|
||||
)
|
||||
else:
|
||||
url = f"{self.mattermost_url}/api/v4/posts"
|
||||
headers = {
|
||||
"Authorization": f"Bearer {self.mattermost_token}",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
payload = {
|
||||
"channel_id": self.mattermost_channel_id,
|
||||
"message": message
|
||||
}
|
||||
response = requests.post(url, json=payload, headers=headers)
|
||||
|
||||
if response.status_code not in [200, 201]:
|
||||
print(f"[ERROR] Mattermost 전송 실패: {response.status_code} {response.text}")
|
||||
return False
|
||||
else:
|
||||
print("[INFO] Mattermost 메시지 전송 완료")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Mattermost 전송 예외: {e}")
|
||||
return False
|
||||
|
||||
def _send_to_synology_chat(self, message: str):
|
||||
try:
|
||||
payload = {"text": message}
|
||||
headers = {"Content-Type": "application/json"}
|
||||
response = requests.post(self.synology_webhook_url, json=payload, headers=headers)
|
||||
|
||||
if response.status_code != 200:
|
||||
print(f"[ERROR] Synology Chat 전송 실패: {response.status_code} {response.text}")
|
||||
return False
|
||||
else:
|
||||
print("[INFO] Synology Chat 메시지 전송 완료")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Synology Chat 전송 예외: {e}")
|
||||
return False
|
||||
|
||||
def _send_to_telegram(self, message: str):
|
||||
try:
|
||||
url = f"https://api.telegram.org/bot{self.telegram_bot_token}/sendMessage"
|
||||
payload = {
|
||||
"chat_id": self.telegram_chat_id,
|
||||
"text": message,
|
||||
"parse_mode": "Markdown"
|
||||
}
|
||||
response = requests.post(url, data=payload)
|
||||
|
||||
if response.status_code != 200:
|
||||
print(f"[ERROR] Telegram 전송 실패: {response.status_code} {response.text}")
|
||||
return False
|
||||
else:
|
||||
print("[INFO] Telegram 메시지 전송 완료")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(f"[ERROR] Telegram 전송 예외: {e}")
|
||||
return False
|
||||
Reference in New Issue
Block a user