Python自动化大师第6篇:邮件与通知自动化
Python Automation Master Part 6: Email and Notification Automation
前言:通知自动化的必要性
自动化系统的完整性取决于通知功能。无论你创建了多么出色的自动化脚本,如果不能及时确认其结果,价值就会大打折扣。在服务器故障、批处理任务完成、每日报告发送等各种情况下,接收适当的通知可以大大提高工作效率。
在第6篇中,我们将学习如何使用Python通过各种渠道发送通知。从传统的邮件到Slack、Telegram、Discord,我们将涵盖实际工作中最常用的通知方法。
1. 使用smtplib发送邮件
使用Python内置库smtplib,无需额外安装即可发送邮件。
1.1 基本邮件发送
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
def send_simple_email(sender, password, recipient, subject, body):
"""发送简单文本邮件"""
# 创建消息对象
msg = MIMEMultipart()
msg['From'] = sender
msg['To'] = recipient
msg['Subject'] = subject
# 添加正文
msg.attach(MIMEText(body, 'plain', 'utf-8'))
try:
# 连接Gmail SMTP服务器
server = smtplib.SMTP('smtp.gmail.com', 587)
server.starttls() # TLS加密
server.login(sender, password)
# 发送邮件
server.send_message(msg)
server.quit()
print("邮件发送成功!")
return True
except Exception as e:
print(f"邮件发送失败: {e}")
return False
# 使用示例
send_simple_email(
sender='your_email@gmail.com',
password='应用专用密码', # 开启两步验证后使用应用专用密码
recipient='recipient@example.com',
subject='[自动化] 每日报告',
body='今天的任务已正常完成。'
)
2. Gmail/QQ邮箱SMTP设置
2.1 Gmail SMTP设置
要使用Gmail,首先需要生成应用专用密码。
- 进入Google账户设置
- 在安全标签页启用两步验证
- 生成应用专用密码(选择其他应用)
- 在代码中使用生成的16位密码
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
class GmailSender:
"""使用Gmail SMTP发送邮件的类"""
def __init__(self, email, app_password):
self.email = email
self.password = app_password
self.smtp_server = 'smtp.gmail.com'
self.smtp_port = 587
def send(self, to, subject, body, html=False):
"""发送邮件"""
msg = MIMEMultipart('alternative')
msg['From'] = self.email
msg['To'] = to if isinstance(to, str) else ', '.join(to)
msg['Subject'] = subject
# 文本或HTML正文
content_type = 'html' if html else 'plain'
msg.attach(MIMEText(body, content_type, 'utf-8'))
with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
server.starttls()
server.login(self.email, self.password)
# 发送给多个收件人
recipients = [to] if isinstance(to, str) else to
server.send_message(msg)
return True
# 使用示例
gmail = GmailSender('your_email@gmail.com', '应用专用密码')
gmail.send(
to='recipient@example.com',
subject='测试邮件',
body='这是Python发送的邮件。'
)
2.2 QQ邮箱SMTP设置
class QQMailSender:
"""使用QQ邮箱SMTP发送邮件的类"""
def __init__(self, email, auth_code):
self.email = email
self.password = auth_code # QQ邮箱授权码
self.smtp_server = 'smtp.qq.com'
self.smtp_port = 587
def send(self, to, subject, body, html=False):
"""发送邮件"""
msg = MIMEMultipart('alternative')
msg['From'] = self.email
msg['To'] = to if isinstance(to, str) else ', '.join(to)
msg['Subject'] = subject
content_type = 'html' if html else 'plain'
msg.attach(MIMEText(body, content_type, 'utf-8'))
with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
server.starttls()
server.login(self.email, self.password)
server.send_message(msg)
return True
# QQ邮箱设置:
# 1. QQ邮箱 > 设置 > 账户 > POP3/IMAP/SMTP服务
# 2. 开启SMTP服务
# 3. 生成授权码并在代码中使用
qq = QQMailSender('your_qq@qq.com', 'QQ邮箱授权码')
qq.send(
to='recipient@example.com',
subject='QQ邮箱测试',
body='这是通过QQ邮箱SMTP发送的邮件。'
)
3. 邮件正文编写(文本、HTML)
3.1 HTML格式邮件
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import smtplib
def send_html_email(sender, password, recipient, subject, data):
"""发送HTML格式的报告邮件"""
# HTML模板
html_template = """
<!DOCTYPE html>
<html>
<head>
<style>
body {{ font-family: 'Microsoft YaHei', sans-serif; }}
.container {{ max-width: 600px; margin: 0 auto; padding: 20px; }}
.header {{ background: #2c3e50; color: white; padding: 20px; text-align: center; }}
.content {{ padding: 20px; background: #f9f9f9; }}
.table {{ width: 100%; border-collapse: collapse; margin: 20px 0; }}
.table th, .table td {{ border: 1px solid #ddd; padding: 12px; text-align: left; }}
.table th {{ background: #3498db; color: white; }}
.table tr:nth-child(even) {{ background: #f2f2f2; }}
.footer {{ text-align: center; padding: 20px; color: #666; font-size: 12px; }}
.status-ok {{ color: #27ae60; font-weight: bold; }}
.status-error {{ color: #e74c3c; font-weight: bold; }}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>{title}</h1>
<p>{date}</p>
</div>
<div class="content">
<h2>摘要</h2>
<p>总处理数量: <strong>{total_count}</strong></p>
<p>成功: <span class="status-ok">{success_count}</span> / 失败: <span class="status-error">{fail_count}</span></p>
<h2>详细信息</h2>
<table class="table">
<tr>
<th>项目</th>
<th>状态</th>
<th>处理时间</th>
</tr>
{table_rows}
</table>
</div>
<div class="footer">
<p>此邮件为自动发送。</p>
<p>联系方式: admin@example.com</p>
</div>
</div>
</body>
</html>
"""
# 生成表格行
table_rows = ""
for item in data['items']:
status_class = 'status-ok' if item['status'] == '成功' else 'status-error'
table_rows += f"""
<tr>
<td>{item['name']}</td>
<td class="{status_class}">{item['status']}</td>
<td>{item['time']}</td>
</tr>
"""
# 完成HTML
html_content = html_template.format(
title=data['title'],
date=data['date'],
total_count=data['total_count'],
success_count=data['success_count'],
fail_count=data['fail_count'],
table_rows=table_rows
)
# 构建消息
msg = MIMEMultipart('alternative')
msg['From'] = sender
msg['To'] = recipient
msg['Subject'] = subject
# 文本版本(用于不支持HTML的客户端)
text_content = f"""
{data['title']}
日期: {data['date']}
总处理数量: {data['total_count']}
成功: {data['success_count']} / 失败: {data['fail_count']}
"""
msg.attach(MIMEText(text_content, 'plain', 'utf-8'))
msg.attach(MIMEText(html_content, 'html', 'utf-8'))
# 发送
with smtplib.SMTP('smtp.gmail.com', 587) as server:
server.starttls()
server.login(sender, password)
server.send_message(msg)
return True
# 使用示例
report_data = {
'title': '每日批处理任务报告',
'date': '2026-01-22',
'total_count': 5,
'success_count': 4,
'fail_count': 1,
'items': [
{'name': '数据收集', 'status': '成功', 'time': '2.3秒'},
{'name': '数据转换', 'status': '成功', 'time': '5.1秒'},
{'name': '数据加载', 'status': '成功', 'time': '3.7秒'},
{'name': '备份创建', 'status': '失败', 'time': '-'},
{'name': '通知发送', 'status': '成功', 'time': '0.5秒'}
]
}
send_html_email(
sender='your_email@gmail.com',
password='应用专用密码',
recipient='recipient@example.com',
subject='[报告] 每日批处理任务结果',
data=report_data
)
4. 发送附件
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email.mime.application import MIMEApplication
from email import encoders
import smtplib
import os
def send_email_with_attachments(sender, password, recipient, subject, body, attachments):
"""发送带附件的邮件
Args:
attachments: 要附加的文件路径列表
"""
msg = MIMEMultipart()
msg['From'] = sender
msg['To'] = recipient
msg['Subject'] = subject
# 添加正文
msg.attach(MIMEText(body, 'plain', 'utf-8'))
# 添加附件
for file_path in attachments:
if not os.path.exists(file_path):
print(f"找不到文件: {file_path}")
continue
# 读取文件
with open(file_path, 'rb') as f:
file_data = f.read()
# 确定MIME类型
file_name = os.path.basename(file_path)
part = MIMEApplication(file_data, Name=file_name)
# 添加头信息
part['Content-Disposition'] = f'attachment; filename="{file_name}"'
msg.attach(part)
# 发送
with smtplib.SMTP('smtp.gmail.com', 587) as server:
server.starttls()
server.login(sender, password)
server.send_message(msg)
print(f"邮件发送完成({len(attachments)}个附件)")
return True
# 使用示例
send_email_with_attachments(
sender='your_email@gmail.com',
password='应用专用密码',
recipient='recipient@example.com',
subject='月度报告附件',
body='您好,\n\n请查收附件中的月度报告。\n\n谢谢。',
attachments=[
'reports/月度报告_202601.xlsx',
'reports/图表_202601.png'
]
)
5. 接收和解析邮件(imaplib)
import imaplib
import email
from email.header import decode_header
from datetime import datetime
class EmailReader:
"""使用IMAP接收和解析邮件的类"""
def __init__(self, email_addr, password, imap_server='imap.gmail.com'):
self.email = email_addr
self.password = password
self.imap_server = imap_server
self.mail = None
def connect(self):
"""连接IMAP服务器"""
self.mail = imaplib.IMAP4_SSL(self.imap_server)
self.mail.login(self.email, self.password)
return self
def disconnect(self):
"""断开连接"""
if self.mail:
self.mail.logout()
def get_folders(self):
"""获取文件夹列表"""
status, folders = self.mail.list()
return [f.decode().split(' "/" ')[-1].strip('"') for f in folders]
def search_emails(self, folder='INBOX', criteria='ALL', limit=10):
"""搜索邮件
criteria示例:
- 'ALL': 所有邮件
- 'UNSEEN': 未读邮件
- 'FROM "sender@example.com"': 特定发件人
- 'SUBJECT "关键词"': 主题包含关键词
- 'SINCE "01-Jan-2026"': 特定日期之后
"""
self.mail.select(folder)
status, messages = self.mail.search(None, criteria)
email_ids = messages[0].split()
# 按最新排序后应用limit
email_ids = email_ids[-limit:][::-1]
emails = []
for email_id in email_ids:
email_data = self._fetch_email(email_id)
if email_data:
emails.append(email_data)
return emails
def _fetch_email(self, email_id):
"""获取单个邮件"""
status, data = self.mail.fetch(email_id, '(RFC822)')
if status != 'OK':
return None
raw_email = data[0][1]
msg = email.message_from_bytes(raw_email)
# 解码头信息
subject = self._decode_header(msg['Subject'])
from_addr = self._decode_header(msg['From'])
date_str = msg['Date']
# 提取正文
body = self._get_body(msg)
# 附件信息
attachments = self._get_attachments(msg)
return {
'id': email_id.decode(),
'subject': subject,
'from': from_addr,
'date': date_str,
'body': body,
'attachments': attachments
}
def _decode_header(self, header):
"""解码头信息"""
if header is None:
return ""
decoded_parts = decode_header(header)
result = []
for content, encoding in decoded_parts:
if isinstance(content, bytes):
content = content.decode(encoding or 'utf-8', errors='replace')
result.append(content)
return ''.join(result)
def _get_body(self, msg):
"""提取正文"""
body = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
content_disposition = str(part.get('Content-Disposition'))
if content_type == 'text/plain' and 'attachment' not in content_disposition:
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or 'utf-8'
body = payload.decode(charset, errors='replace')
break
else:
payload = msg.get_payload(decode=True)
charset = msg.get_content_charset() or 'utf-8'
body = payload.decode(charset, errors='replace')
return body
def _get_attachments(self, msg):
"""提取附件信息"""
attachments = []
for part in msg.walk():
if part.get_content_maintype() == 'multipart':
continue
filename = part.get_filename()
if filename:
filename = self._decode_header(filename)
attachments.append({
'filename': filename,
'content_type': part.get_content_type(),
'size': len(part.get_payload(decode=True) or b'')
})
return attachments
# 使用示例
reader = EmailReader('your_email@gmail.com', '应用专用密码')
reader.connect()
# 查询未读邮件
unread_emails = reader.search_emails(criteria='UNSEEN', limit=5)
for mail in unread_emails:
print(f"主题: {mail['subject']}")
print(f"发件人: {mail['from']}")
print(f"日期: {mail['date']}")
print(f"附件: {len(mail['attachments'])}个")
print("-" * 50)
reader.disconnect()
6. 发送Slack通知(Webhook)
Slack Webhook无需特殊库,只需HTTP请求即可发送消息,非常方便。
6.1 生成Webhook URL
- 访问Slack App页面(api.slack.com/apps)
- 点击Create New App
- 选择From scratch,指定应用名称和工作区
- 启用Incoming Webhooks
- 点击Add New Webhook to Workspace
- 选择频道后复制Webhook URL
import requests
import json
from datetime import datetime
class SlackNotifier:
"""使用Slack Webhook的通知类"""
def __init__(self, webhook_url):
self.webhook_url = webhook_url
def send_simple(self, message):
"""发送简单文本消息"""
payload = {'text': message}
response = requests.post(
self.webhook_url,
data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
return response.status_code == 200
def send_formatted(self, title, message, color='#36a64f', fields=None):
"""发送格式化消息(使用attachment)"""
attachment = {
'fallback': title,
'color': color,
'title': title,
'text': message,
'ts': datetime.now().timestamp()
}
if fields:
attachment['fields'] = [
{'title': k, 'value': str(v), 'short': True}
for k, v in fields.items()
]
payload = {'attachments': [attachment]}
response = requests.post(
self.webhook_url,
data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
return response.status_code == 200
def send_alert(self, title, message, status='success'):
"""按状态发送通知"""
colors = {
'success': '#36a64f', # 绿色
'warning': '#ffcc00', # 黄色
'error': '#ff0000', # 红色
'info': '#3498db' # 蓝色
}
emojis = {
'success': ':white_check_mark:',
'warning': ':warning:',
'error': ':x:',
'info': ':information_source:'
}
blocks = [
{
'type': 'header',
'text': {
'type': 'plain_text',
'text': f"{emojis.get(status, '')} {title}"
}
},
{
'type': 'section',
'text': {
'type': 'mrkdwn',
'text': message
}
},
{
'type': 'context',
'elements': [
{
'type': 'mrkdwn',
'text': f"发生时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
}
]
}
]
payload = {'blocks': blocks}
response = requests.post(
self.webhook_url,
data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
return response.status_code == 200
# 使用示例
slack = SlackNotifier('https://hooks.slack.com/services/xxx/yyy/zzz')
# 简单消息
slack.send_simple('批处理任务已完成。')
# 格式化消息
slack.send_formatted(
title='每日报告',
message='今日销售情况如下。',
color='#3498db',
fields={
'总订单': '150单',
'总销售额': '150,000元',
'新注册': '25人',
'退款数': '3单'
}
)
# 错误通知
slack.send_alert(
title='服务器错误检测',
message='*数据库连接失败*\n```ConnectionError: Unable to connect to database```',
status='error'
)
7. 创建Telegram Bot
7.1 创建和设置Bot
- 在Telegram中搜索@BotFather
- 输入/newbot命令
- 设置bot名称和username
- 保存获得的token
- 与创建的bot开始对话并获取chat_id
import requests
from datetime import datetime
class TelegramBot:
"""Telegram Bot通知类"""
def __init__(self, token):
self.token = token
self.base_url = f'https://api.telegram.org/bot{token}'
def get_updates(self):
"""获取最近消息(用于确认chat_id)"""
response = requests.get(f'{self.base_url}/getUpdates')
return response.json()
def send_message(self, chat_id, text, parse_mode='HTML'):
"""发送消息
parse_mode: 'HTML'或'Markdown'
"""
params = {
'chat_id': chat_id,
'text': text,
'parse_mode': parse_mode
}
response = requests.post(f'{self.base_url}/sendMessage', data=params)
return response.json()
def send_photo(self, chat_id, photo_path, caption=''):
"""发送图片"""
with open(photo_path, 'rb') as photo:
params = {'chat_id': chat_id, 'caption': caption}
files = {'photo': photo}
response = requests.post(
f'{self.base_url}/sendPhoto',
data=params,
files=files
)
return response.json()
def send_document(self, chat_id, file_path, caption=''):
"""发送文件"""
with open(file_path, 'rb') as doc:
params = {'chat_id': chat_id, 'caption': caption}
files = {'document': doc}
response = requests.post(
f'{self.base_url}/sendDocument',
data=params,
files=files
)
return response.json()
def send_alert(self, chat_id, title, message, status='info'):
"""发送格式化通知"""
emojis = {
'success': '✅',
'warning': '⚠️',
'error': '❌',
'info': 'ℹ️'
}
html_message = f"""
{emojis.get(status, '')} {title}
{message}
时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
return self.send_message(chat_id, html_message)
def send_report(self, chat_id, title, data):
"""发送报告格式通知"""
lines = [f"📊 {title}\n"]
for key, value in data.items():
lines.append(f"• {key}: {value}")
lines.append(f"\n生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}")
return self.send_message(chat_id, '\n'.join(lines))
# 使用示例
bot = TelegramBot('YOUR_BOT_TOKEN')
# 确认chat_id(只需执行一次)
# 向bot发送消息后执行
# updates = bot.get_updates()
# print(updates)
chat_id = 'YOUR_CHAT_ID'
# 简单消息
bot.send_message(chat_id, '您好!这是Telegram Bot。')
# 通知
bot.send_alert(
chat_id,
'批处理任务完成',
'每日数据收集已正常完成。',
status='success'
)
# 报告
bot.send_report(
chat_id,
'每日销售情况',
{
'总订单': '150单',
'总销售额': '150,000元',
'平均客单价': '1,000元',
'新客户': '25人'
}
)
# 发送文件
bot.send_document(chat_id, 'reports/daily_report.xlsx', '每日报告文件')
8. Discord Webhook
import requests
import json
from datetime import datetime
class DiscordNotifier:
"""使用Discord Webhook的通知类"""
def __init__(self, webhook_url):
self.webhook_url = webhook_url
def send_simple(self, content):
"""发送简单消息"""
payload = {'content': content}
response = requests.post(
self.webhook_url,
data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
return response.status_code in [200, 204]
def send_embed(self, title, description, color=0x3498db, fields=None, footer=None):
"""发送嵌入消息"""
embed = {
'title': title,
'description': description,
'color': color,
'timestamp': datetime.utcnow().isoformat()
}
if fields:
embed['fields'] = [
{'name': k, 'value': str(v), 'inline': True}
for k, v in fields.items()
]
if footer:
embed['footer'] = {'text': footer}
payload = {'embeds': [embed]}
response = requests.post(
self.webhook_url,
data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
return response.status_code in [200, 204]
def send_alert(self, title, message, status='info'):
"""按状态发送通知"""
colors = {
'success': 0x2ecc71, # 绿色
'warning': 0xf39c12, # 橙色
'error': 0xe74c3c, # 红色
'info': 0x3498db # 蓝色
}
emojis = {
'success': ':white_check_mark:',
'warning': ':warning:',
'error': ':x:',
'info': ':information_source:'
}
embed = {
'title': f"{emojis.get(status, '')} {title}",
'description': message,
'color': colors.get(status, 0x3498db),
'timestamp': datetime.utcnow().isoformat(),
'footer': {'text': '自动化通知系统'}
}
payload = {'embeds': [embed]}
response = requests.post(
self.webhook_url,
data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
return response.status_code in [200, 204]
# Discord Webhook URL生成:
# 1. 服务器设置 > 整合 > Webhook
# 2. 创建新Webhook
# 3. 复制Webhook URL
discord = DiscordNotifier('https://discord.com/api/webhooks/xxx/yyy')
# 简单消息
discord.send_simple('服务器状态正常!')
# 嵌入消息
discord.send_embed(
title='每日报告',
description='今日系统状况如下。',
color=0x3498db,
fields={
'CPU使用率': '45%',
'内存使用率': '62%',
'磁盘使用率': '78%',
'活动会话': '234个'
},
footer='监控系统'
)
# 错误通知
discord.send_alert(
title='数据库错误',
message='```\nConnectionError: Unable to connect to MySQL server\n```',
status='error'
)
9. 桌面通知(plyer)
# 安装plyer
# pip install plyer
from plyer import notification
import time
def send_desktop_notification(title, message, timeout=10, app_icon=None):
"""发送桌面通知
Args:
title: 通知标题
message: 通知内容
timeout: 通知显示时间(秒)
app_icon: 图标路径(.ico文件)
"""
notification.notify(
title=title,
message=message,
timeout=timeout,
app_icon=app_icon,
app_name='Python自动化'
)
# 使用示例
send_desktop_notification(
title='任务完成',
message='数据处理已完成。\n处理数量: 1,234条',
timeout=5
)
# 多步骤任务通知
def process_with_notifications():
"""通过桌面通知显示任务进度"""
send_desktop_notification('任务开始', '开始数据收集。')
time.sleep(2) # 模拟任务
send_desktop_notification('进行中', '数据转换中... (50%)')
time.sleep(2)
send_desktop_notification('完成', '所有任务已完成!')
# Windows Toast通知(更多功能)
try:
from win10toast import ToastNotifier
toaster = ToastNotifier()
def send_windows_toast(title, message, duration=5, icon_path=None):
"""Windows Toast通知"""
toaster.show_toast(
title,
message,
icon_path=icon_path,
duration=duration,
threaded=True # 异步执行
)
except ImportError:
pass # 非Windows或未安装win10toast
10. 实战:监控通知系统
让我们综合所学内容,创建一个服务器监控通知系统。
import psutil
import requests
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from datetime import datetime
import time
import json
import logging
# 日志设置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('monitor.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class NotificationManager:
"""统一通知管理类"""
def __init__(self, config):
self.config = config
self.last_alert_time = {}
def send_email(self, subject, body, html=False):
"""发送邮件"""
try:
cfg = self.config['email']
msg = MIMEMultipart('alternative')
msg['From'] = cfg['sender']
msg['To'] = cfg['recipient']
msg['Subject'] = subject
content_type = 'html' if html else 'plain'
msg.attach(MIMEText(body, content_type, 'utf-8'))
with smtplib.SMTP(cfg['smtp_server'], cfg['smtp_port']) as server:
server.starttls()
server.login(cfg['sender'], cfg['password'])
server.send_message(msg)
logger.info(f"邮件发送完成: {subject}")
return True
except Exception as e:
logger.error(f"邮件发送失败: {e}")
return False
def send_slack(self, message, status='info'):
"""Slack通知"""
try:
webhook_url = self.config['slack']['webhook_url']
colors = {
'success': '#36a64f',
'warning': '#ffcc00',
'error': '#ff0000',
'info': '#3498db'
}
payload = {
'attachments': [{
'color': colors.get(status, '#3498db'),
'text': message,
'ts': datetime.now().timestamp()
}]
}
response = requests.post(
webhook_url,
data=json.dumps(payload),
headers={'Content-Type': 'application/json'}
)
if response.status_code == 200:
logger.info(f"Slack通知发送完成")
return True
return False
except Exception as e:
logger.error(f"Slack通知失败: {e}")
return False
def send_telegram(self, message):
"""Telegram通知"""
try:
cfg = self.config['telegram']
url = f"https://api.telegram.org/bot{cfg['token']}/sendMessage"
params = {
'chat_id': cfg['chat_id'],
'text': message,
'parse_mode': 'HTML'
}
response = requests.post(url, data=params)
if response.status_code == 200:
logger.info("Telegram通知发送完成")
return True
return False
except Exception as e:
logger.error(f"Telegram通知失败: {e}")
return False
def send_all(self, title, message, status='info', html_body=None):
"""向所有渠道发送通知"""
# 邮件
if self.config.get('email', {}).get('enabled'):
self.send_email(title, html_body or message, html=bool(html_body))
# Slack
if self.config.get('slack', {}).get('enabled'):
self.send_slack(f"*{title}*\n{message}", status)
# Telegram
if self.config.get('telegram', {}).get('enabled'):
self.send_telegram(f"{title}\n\n{message}")
def can_alert(self, alert_type, cooldown_seconds=300):
"""检查通知冷却(防止重复通知)"""
now = datetime.now()
last_time = self.last_alert_time.get(alert_type)
if last_time and (now - last_time).seconds < cooldown_seconds:
return False
self.last_alert_time[alert_type] = now
return True
class SystemMonitor:
"""系统监控类"""
def __init__(self, notification_manager, thresholds):
self.notifier = notification_manager
self.thresholds = thresholds
def get_system_stats(self):
"""收集系统状态"""
return {
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'disk_percent': psutil.disk_usage('/').percent,
'network': psutil.net_io_counters(),
'timestamp': datetime.now()
}
def check_thresholds(self, stats):
"""检查阈值并发送通知"""
alerts = []
# CPU检查
if stats['cpu_percent'] > self.thresholds['cpu_critical']:
alerts.append(('CPU危险', f"CPU使用率: {stats['cpu_percent']}%", 'error'))
elif stats['cpu_percent'] > self.thresholds['cpu_warning']:
alerts.append(('CPU警告', f"CPU使用率: {stats['cpu_percent']}%", 'warning'))
# 内存检查
if stats['memory_percent'] > self.thresholds['memory_critical']:
alerts.append(('内存危险', f"内存使用率: {stats['memory_percent']}%", 'error'))
elif stats['memory_percent'] > self.thresholds['memory_warning']:
alerts.append(('内存警告', f"内存使用率: {stats['memory_percent']}%", 'warning'))
# 磁盘检查
if stats['disk_percent'] > self.thresholds['disk_critical']:
alerts.append(('磁盘危险', f"磁盘使用率: {stats['disk_percent']}%", 'error'))
elif stats['disk_percent'] > self.thresholds['disk_warning']:
alerts.append(('磁盘警告', f"磁盘使用率: {stats['disk_percent']}%", 'warning'))
return alerts
def send_daily_report(self, stats):
"""发送每日报告"""
title = f"[每日报告] 系统状况 - {datetime.now().strftime('%Y-%m-%d')}"
message = f"""
系统状况报告
- CPU使用率: {stats['cpu_percent']}%
- 内存使用率: {stats['memory_percent']}%
- 磁盘使用率: {stats['disk_percent']}%
报告生成时间: {stats['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}
"""
self.notifier.send_all(title, message, 'info')
def run(self, interval=60):
"""运行监控"""
logger.info("监控开始")
last_report_date = None
while True:
try:
# 收集系统状态
stats = self.get_system_stats()
logger.info(f"CPU: {stats['cpu_percent']}%, MEM: {stats['memory_percent']}%, DISK: {stats['disk_percent']}%")
# 检查阈值
alerts = self.check_thresholds(stats)
for title, message, status in alerts:
if self.notifier.can_alert(title, cooldown_seconds=300):
self.notifier.send_all(title, message, status)
# 每日报告(上午9点)
now = datetime.now()
if now.hour == 9 and now.date() != last_report_date:
self.send_daily_report(stats)
last_report_date = now.date()
time.sleep(interval)
except KeyboardInterrupt:
logger.info("监控结束")
break
except Exception as e:
logger.error(f"监控错误: {e}")
time.sleep(interval)
# 配置示例
config = {
'email': {
'enabled': True,
'sender': 'your_email@gmail.com',
'password': '应用专用密码',
'recipient': 'admin@example.com',
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587
},
'slack': {
'enabled': True,
'webhook_url': 'https://hooks.slack.com/services/xxx/yyy/zzz'
},
'telegram': {
'enabled': True,
'token': 'YOUR_BOT_TOKEN',
'chat_id': 'YOUR_CHAT_ID'
}
}
thresholds = {
'cpu_warning': 70,
'cpu_critical': 90,
'memory_warning': 80,
'memory_critical': 95,
'disk_warning': 85,
'disk_critical': 95
}
# 执行
if __name__ == "__main__":
notifier = NotificationManager(config)
monitor = SystemMonitor(notifier, thresholds)
monitor.run(interval=60) # 每60秒监控一次
总结
在本篇中,我们学习了使用Python进行各种通知自动化的方法。
- 邮件:使用smtplib发送文本/HTML邮件和附件
- Slack:通过Webhook方便地发送团队通知
- Telegram:通过Bot发送个人/群组通知
- Discord:通过Webhook发送社区通知
- 桌面:使用plyer发送本地通知
在实际工作中,可以根据情况选择合适的渠道,或组合使用多个渠道。紧急故障通知通常使用Telegram或Slack,定期报告则通过邮件发送。
通过本系列,我们学习了Python自动化的核心技术。文件处理、网页爬虫、数据库连接、Excel自动化以及通知系统,结合这些技术可以自动化各种工作。请将所学内容应用到实际工作中,从重复性任务中解放出来。