前言:通知自动化的必要性

自动化系统的完整性取决于通知功能。无论你创建了多么出色的自动化脚本,如果不能及时确认其结果,价值就会大打折扣。在服务器故障、批处理任务完成、每日报告发送等各种情况下,接收适当的通知可以大大提高工作效率。

在第6篇中,我们将学习如何使用Python通过各种渠道发送通知。从传统的邮件到Slack、Telegram、Discord,我们将涵盖实际工作中最常用的通知方法。

1. 使用smtplib发送邮件

使用Python内置库smtplib,无需额外安装即可发送邮件。

1.1 基本邮件发送

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

def send_simple_email(sender, password, recipient, subject, body):
    """发送简单文本邮件"""
    # 创建消息对象
    msg = MIMEMultipart()
    msg['From'] = sender
    msg['To'] = recipient
    msg['Subject'] = subject

    # 添加正文
    msg.attach(MIMEText(body, 'plain', 'utf-8'))

    try:
        # 连接Gmail SMTP服务器
        server = smtplib.SMTP('smtp.gmail.com', 587)
        server.starttls()  # TLS加密
        server.login(sender, password)

        # 发送邮件
        server.send_message(msg)
        server.quit()
        print("邮件发送成功!")
        return True
    except Exception as e:
        print(f"邮件发送失败: {e}")
        return False

# 使用示例
send_simple_email(
    sender='your_email@gmail.com',
    password='应用专用密码',  # 开启两步验证后使用应用专用密码
    recipient='recipient@example.com',
    subject='[自动化] 每日报告',
    body='今天的任务已正常完成。'
)

2. Gmail/QQ邮箱SMTP设置

2.1 Gmail SMTP设置

要使用Gmail,首先需要生成应用专用密码。

  1. 进入Google账户设置
  2. 在安全标签页启用两步验证
  3. 生成应用专用密码(选择其他应用)
  4. 在代码中使用生成的16位密码
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

class GmailSender:
    """使用Gmail SMTP发送邮件的类"""

    def __init__(self, email, app_password):
        self.email = email
        self.password = app_password
        self.smtp_server = 'smtp.gmail.com'
        self.smtp_port = 587

    def send(self, to, subject, body, html=False):
        """发送邮件"""
        msg = MIMEMultipart('alternative')
        msg['From'] = self.email
        msg['To'] = to if isinstance(to, str) else ', '.join(to)
        msg['Subject'] = subject

        # 文本或HTML正文
        content_type = 'html' if html else 'plain'
        msg.attach(MIMEText(body, content_type, 'utf-8'))

        with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
            server.starttls()
            server.login(self.email, self.password)

            # 发送给多个收件人
            recipients = [to] if isinstance(to, str) else to
            server.send_message(msg)

        return True

# 使用示例
gmail = GmailSender('your_email@gmail.com', '应用专用密码')
gmail.send(
    to='recipient@example.com',
    subject='测试邮件',
    body='这是Python发送的邮件。'
)

2.2 QQ邮箱SMTP设置

class QQMailSender:
    """使用QQ邮箱SMTP发送邮件的类"""

    def __init__(self, email, auth_code):
        self.email = email
        self.password = auth_code  # QQ邮箱授权码
        self.smtp_server = 'smtp.qq.com'
        self.smtp_port = 587

    def send(self, to, subject, body, html=False):
        """发送邮件"""
        msg = MIMEMultipart('alternative')
        msg['From'] = self.email
        msg['To'] = to if isinstance(to, str) else ', '.join(to)
        msg['Subject'] = subject

        content_type = 'html' if html else 'plain'
        msg.attach(MIMEText(body, content_type, 'utf-8'))

        with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
            server.starttls()
            server.login(self.email, self.password)
            server.send_message(msg)

        return True

# QQ邮箱设置:
# 1. QQ邮箱 > 设置 > 账户 > POP3/IMAP/SMTP服务
# 2. 开启SMTP服务
# 3. 生成授权码并在代码中使用

qq = QQMailSender('your_qq@qq.com', 'QQ邮箱授权码')
qq.send(
    to='recipient@example.com',
    subject='QQ邮箱测试',
    body='这是通过QQ邮箱SMTP发送的邮件。'
)

3. 邮件正文编写(文本、HTML)

3.1 HTML格式邮件

from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import smtplib

def send_html_email(sender, password, recipient, subject, data):
    """发送HTML格式的报告邮件"""

    # HTML模板
    html_template = """
    <!DOCTYPE html>
    <html>
    <head>
        <style>
            body {{ font-family: 'Microsoft YaHei', sans-serif; }}
            .container {{ max-width: 600px; margin: 0 auto; padding: 20px; }}
            .header {{ background: #2c3e50; color: white; padding: 20px; text-align: center; }}
            .content {{ padding: 20px; background: #f9f9f9; }}
            .table {{ width: 100%; border-collapse: collapse; margin: 20px 0; }}
            .table th, .table td {{ border: 1px solid #ddd; padding: 12px; text-align: left; }}
            .table th {{ background: #3498db; color: white; }}
            .table tr:nth-child(even) {{ background: #f2f2f2; }}
            .footer {{ text-align: center; padding: 20px; color: #666; font-size: 12px; }}
            .status-ok {{ color: #27ae60; font-weight: bold; }}
            .status-error {{ color: #e74c3c; font-weight: bold; }}
        </style>
    </head>
    <body>
        <div class="container">
            <div class="header">
                <h1>{title}</h1>
                <p>{date}</p>
            </div>
            <div class="content">
                <h2>摘要</h2>
                <p>总处理数量: <strong>{total_count}</strong></p>
                <p>成功: <span class="status-ok">{success_count}</span> / 失败: <span class="status-error">{fail_count}</span></p>

                <h2>详细信息</h2>
                <table class="table">
                    <tr>
                        <th>项目</th>
                        <th>状态</th>
                        <th>处理时间</th>
                    </tr>
                    {table_rows}
                </table>
            </div>
            <div class="footer">
                <p>此邮件为自动发送。</p>
                <p>联系方式: admin@example.com</p>
            </div>
        </div>
    </body>
    </html>
    """

    # 生成表格行
    table_rows = ""
    for item in data['items']:
        status_class = 'status-ok' if item['status'] == '成功' else 'status-error'
        table_rows += f"""
        <tr>
            <td>{item['name']}</td>
            <td class="{status_class}">{item['status']}</td>
            <td>{item['time']}</td>
        </tr>
        """

    # 完成HTML
    html_content = html_template.format(
        title=data['title'],
        date=data['date'],
        total_count=data['total_count'],
        success_count=data['success_count'],
        fail_count=data['fail_count'],
        table_rows=table_rows
    )

    # 构建消息
    msg = MIMEMultipart('alternative')
    msg['From'] = sender
    msg['To'] = recipient
    msg['Subject'] = subject

    # 文本版本(用于不支持HTML的客户端)
    text_content = f"""
    {data['title']}
    日期: {data['date']}

    总处理数量: {data['total_count']}
    成功: {data['success_count']} / 失败: {data['fail_count']}
    """

    msg.attach(MIMEText(text_content, 'plain', 'utf-8'))
    msg.attach(MIMEText(html_content, 'html', 'utf-8'))

    # 发送
    with smtplib.SMTP('smtp.gmail.com', 587) as server:
        server.starttls()
        server.login(sender, password)
        server.send_message(msg)

    return True

# 使用示例
report_data = {
    'title': '每日批处理任务报告',
    'date': '2026-01-22',
    'total_count': 5,
    'success_count': 4,
    'fail_count': 1,
    'items': [
        {'name': '数据收集', 'status': '成功', 'time': '2.3秒'},
        {'name': '数据转换', 'status': '成功', 'time': '5.1秒'},
        {'name': '数据加载', 'status': '成功', 'time': '3.7秒'},
        {'name': '备份创建', 'status': '失败', 'time': '-'},
        {'name': '通知发送', 'status': '成功', 'time': '0.5秒'}
    ]
}

send_html_email(
    sender='your_email@gmail.com',
    password='应用专用密码',
    recipient='recipient@example.com',
    subject='[报告] 每日批处理任务结果',
    data=report_data
)

4. 发送附件

from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email.mime.application import MIMEApplication
from email import encoders
import smtplib
import os

def send_email_with_attachments(sender, password, recipient, subject, body, attachments):
    """发送带附件的邮件

    Args:
        attachments: 要附加的文件路径列表
    """
    msg = MIMEMultipart()
    msg['From'] = sender
    msg['To'] = recipient
    msg['Subject'] = subject

    # 添加正文
    msg.attach(MIMEText(body, 'plain', 'utf-8'))

    # 添加附件
    for file_path in attachments:
        if not os.path.exists(file_path):
            print(f"找不到文件: {file_path}")
            continue

        # 读取文件
        with open(file_path, 'rb') as f:
            file_data = f.read()

        # 确定MIME类型
        file_name = os.path.basename(file_path)
        part = MIMEApplication(file_data, Name=file_name)

        # 添加头信息
        part['Content-Disposition'] = f'attachment; filename="{file_name}"'
        msg.attach(part)

    # 发送
    with smtplib.SMTP('smtp.gmail.com', 587) as server:
        server.starttls()
        server.login(sender, password)
        server.send_message(msg)

    print(f"邮件发送完成({len(attachments)}个附件)")
    return True

# 使用示例
send_email_with_attachments(
    sender='your_email@gmail.com',
    password='应用专用密码',
    recipient='recipient@example.com',
    subject='月度报告附件',
    body='您好,\n\n请查收附件中的月度报告。\n\n谢谢。',
    attachments=[
        'reports/月度报告_202601.xlsx',
        'reports/图表_202601.png'
    ]
)

5. 接收和解析邮件(imaplib)

import imaplib
import email
from email.header import decode_header
from datetime import datetime

class EmailReader:
    """使用IMAP接收和解析邮件的类"""

    def __init__(self, email_addr, password, imap_server='imap.gmail.com'):
        self.email = email_addr
        self.password = password
        self.imap_server = imap_server
        self.mail = None

    def connect(self):
        """连接IMAP服务器"""
        self.mail = imaplib.IMAP4_SSL(self.imap_server)
        self.mail.login(self.email, self.password)
        return self

    def disconnect(self):
        """断开连接"""
        if self.mail:
            self.mail.logout()

    def get_folders(self):
        """获取文件夹列表"""
        status, folders = self.mail.list()
        return [f.decode().split(' "/" ')[-1].strip('"') for f in folders]

    def search_emails(self, folder='INBOX', criteria='ALL', limit=10):
        """搜索邮件

        criteria示例:
        - 'ALL': 所有邮件
        - 'UNSEEN': 未读邮件
        - 'FROM "sender@example.com"': 特定发件人
        - 'SUBJECT "关键词"': 主题包含关键词
        - 'SINCE "01-Jan-2026"': 特定日期之后
        """
        self.mail.select(folder)
        status, messages = self.mail.search(None, criteria)

        email_ids = messages[0].split()
        # 按最新排序后应用limit
        email_ids = email_ids[-limit:][::-1]

        emails = []
        for email_id in email_ids:
            email_data = self._fetch_email(email_id)
            if email_data:
                emails.append(email_data)

        return emails

    def _fetch_email(self, email_id):
        """获取单个邮件"""
        status, data = self.mail.fetch(email_id, '(RFC822)')

        if status != 'OK':
            return None

        raw_email = data[0][1]
        msg = email.message_from_bytes(raw_email)

        # 解码头信息
        subject = self._decode_header(msg['Subject'])
        from_addr = self._decode_header(msg['From'])
        date_str = msg['Date']

        # 提取正文
        body = self._get_body(msg)

        # 附件信息
        attachments = self._get_attachments(msg)

        return {
            'id': email_id.decode(),
            'subject': subject,
            'from': from_addr,
            'date': date_str,
            'body': body,
            'attachments': attachments
        }

    def _decode_header(self, header):
        """解码头信息"""
        if header is None:
            return ""

        decoded_parts = decode_header(header)
        result = []
        for content, encoding in decoded_parts:
            if isinstance(content, bytes):
                content = content.decode(encoding or 'utf-8', errors='replace')
            result.append(content)
        return ''.join(result)

    def _get_body(self, msg):
        """提取正文"""
        body = ""

        if msg.is_multipart():
            for part in msg.walk():
                content_type = part.get_content_type()
                content_disposition = str(part.get('Content-Disposition'))

                if content_type == 'text/plain' and 'attachment' not in content_disposition:
                    payload = part.get_payload(decode=True)
                    charset = part.get_content_charset() or 'utf-8'
                    body = payload.decode(charset, errors='replace')
                    break
        else:
            payload = msg.get_payload(decode=True)
            charset = msg.get_content_charset() or 'utf-8'
            body = payload.decode(charset, errors='replace')

        return body

    def _get_attachments(self, msg):
        """提取附件信息"""
        attachments = []

        for part in msg.walk():
            if part.get_content_maintype() == 'multipart':
                continue

            filename = part.get_filename()
            if filename:
                filename = self._decode_header(filename)
                attachments.append({
                    'filename': filename,
                    'content_type': part.get_content_type(),
                    'size': len(part.get_payload(decode=True) or b'')
                })

        return attachments

# 使用示例
reader = EmailReader('your_email@gmail.com', '应用专用密码')
reader.connect()

# 查询未读邮件
unread_emails = reader.search_emails(criteria='UNSEEN', limit=5)
for mail in unread_emails:
    print(f"主题: {mail['subject']}")
    print(f"发件人: {mail['from']}")
    print(f"日期: {mail['date']}")
    print(f"附件: {len(mail['attachments'])}个")
    print("-" * 50)

reader.disconnect()

6. 发送Slack通知(Webhook)

Slack Webhook无需特殊库,只需HTTP请求即可发送消息,非常方便。

6.1 生成Webhook URL

  1. 访问Slack App页面(api.slack.com/apps)
  2. 点击Create New App
  3. 选择From scratch,指定应用名称和工作区
  4. 启用Incoming Webhooks
  5. 点击Add New Webhook to Workspace
  6. 选择频道后复制Webhook URL
import requests
import json
from datetime import datetime

class SlackNotifier:
    """使用Slack Webhook的通知类"""

    def __init__(self, webhook_url):
        self.webhook_url = webhook_url

    def send_simple(self, message):
        """发送简单文本消息"""
        payload = {'text': message}
        response = requests.post(
            self.webhook_url,
            data=json.dumps(payload),
            headers={'Content-Type': 'application/json'}
        )
        return response.status_code == 200

    def send_formatted(self, title, message, color='#36a64f', fields=None):
        """发送格式化消息(使用attachment)"""
        attachment = {
            'fallback': title,
            'color': color,
            'title': title,
            'text': message,
            'ts': datetime.now().timestamp()
        }

        if fields:
            attachment['fields'] = [
                {'title': k, 'value': str(v), 'short': True}
                for k, v in fields.items()
            ]

        payload = {'attachments': [attachment]}
        response = requests.post(
            self.webhook_url,
            data=json.dumps(payload),
            headers={'Content-Type': 'application/json'}
        )
        return response.status_code == 200

    def send_alert(self, title, message, status='success'):
        """按状态发送通知"""
        colors = {
            'success': '#36a64f',  # 绿色
            'warning': '#ffcc00',  # 黄色
            'error': '#ff0000',    # 红色
            'info': '#3498db'      # 蓝色
        }

        emojis = {
            'success': ':white_check_mark:',
            'warning': ':warning:',
            'error': ':x:',
            'info': ':information_source:'
        }

        blocks = [
            {
                'type': 'header',
                'text': {
                    'type': 'plain_text',
                    'text': f"{emojis.get(status, '')} {title}"
                }
            },
            {
                'type': 'section',
                'text': {
                    'type': 'mrkdwn',
                    'text': message
                }
            },
            {
                'type': 'context',
                'elements': [
                    {
                        'type': 'mrkdwn',
                        'text': f"发生时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
                    }
                ]
            }
        ]

        payload = {'blocks': blocks}
        response = requests.post(
            self.webhook_url,
            data=json.dumps(payload),
            headers={'Content-Type': 'application/json'}
        )
        return response.status_code == 200

# 使用示例
slack = SlackNotifier('https://hooks.slack.com/services/xxx/yyy/zzz')

# 简单消息
slack.send_simple('批处理任务已完成。')

# 格式化消息
slack.send_formatted(
    title='每日报告',
    message='今日销售情况如下。',
    color='#3498db',
    fields={
        '总订单': '150单',
        '总销售额': '150,000元',
        '新注册': '25人',
        '退款数': '3单'
    }
)

# 错误通知
slack.send_alert(
    title='服务器错误检测',
    message='*数据库连接失败*\n```ConnectionError: Unable to connect to database```',
    status='error'
)

7. 创建Telegram Bot

7.1 创建和设置Bot

  1. 在Telegram中搜索@BotFather
  2. 输入/newbot命令
  3. 设置bot名称和username
  4. 保存获得的token
  5. 与创建的bot开始对话并获取chat_id
import requests
from datetime import datetime

class TelegramBot:
    """Telegram Bot通知类"""

    def __init__(self, token):
        self.token = token
        self.base_url = f'https://api.telegram.org/bot{token}'

    def get_updates(self):
        """获取最近消息(用于确认chat_id)"""
        response = requests.get(f'{self.base_url}/getUpdates')
        return response.json()

    def send_message(self, chat_id, text, parse_mode='HTML'):
        """发送消息

        parse_mode: 'HTML'或'Markdown'
        """
        params = {
            'chat_id': chat_id,
            'text': text,
            'parse_mode': parse_mode
        }
        response = requests.post(f'{self.base_url}/sendMessage', data=params)
        return response.json()

    def send_photo(self, chat_id, photo_path, caption=''):
        """发送图片"""
        with open(photo_path, 'rb') as photo:
            params = {'chat_id': chat_id, 'caption': caption}
            files = {'photo': photo}
            response = requests.post(
                f'{self.base_url}/sendPhoto',
                data=params,
                files=files
            )
        return response.json()

    def send_document(self, chat_id, file_path, caption=''):
        """发送文件"""
        with open(file_path, 'rb') as doc:
            params = {'chat_id': chat_id, 'caption': caption}
            files = {'document': doc}
            response = requests.post(
                f'{self.base_url}/sendDocument',
                data=params,
                files=files
            )
        return response.json()

    def send_alert(self, chat_id, title, message, status='info'):
        """发送格式化通知"""
        emojis = {
            'success': '✅',
            'warning': '⚠️',
            'error': '❌',
            'info': 'ℹ️'
        }

        html_message = f"""
{emojis.get(status, '')} {title}

{message}

时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
        return self.send_message(chat_id, html_message)

    def send_report(self, chat_id, title, data):
        """发送报告格式通知"""
        lines = [f"📊 {title}\n"]

        for key, value in data.items():
            lines.append(f"• {key}: {value}")

        lines.append(f"\n生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}")

        return self.send_message(chat_id, '\n'.join(lines))

# 使用示例
bot = TelegramBot('YOUR_BOT_TOKEN')

# 确认chat_id(只需执行一次)
# 向bot发送消息后执行
# updates = bot.get_updates()
# print(updates)

chat_id = 'YOUR_CHAT_ID'

# 简单消息
bot.send_message(chat_id, '您好!这是Telegram Bot。')

# 通知
bot.send_alert(
    chat_id,
    '批处理任务完成',
    '每日数据收集已正常完成。',
    status='success'
)

# 报告
bot.send_report(
    chat_id,
    '每日销售情况',
    {
        '总订单': '150单',
        '总销售额': '150,000元',
        '平均客单价': '1,000元',
        '新客户': '25人'
    }
)

# 发送文件
bot.send_document(chat_id, 'reports/daily_report.xlsx', '每日报告文件')

8. Discord Webhook

import requests
import json
from datetime import datetime

class DiscordNotifier:
    """使用Discord Webhook的通知类"""

    def __init__(self, webhook_url):
        self.webhook_url = webhook_url

    def send_simple(self, content):
        """发送简单消息"""
        payload = {'content': content}
        response = requests.post(
            self.webhook_url,
            data=json.dumps(payload),
            headers={'Content-Type': 'application/json'}
        )
        return response.status_code in [200, 204]

    def send_embed(self, title, description, color=0x3498db, fields=None, footer=None):
        """发送嵌入消息"""
        embed = {
            'title': title,
            'description': description,
            'color': color,
            'timestamp': datetime.utcnow().isoformat()
        }

        if fields:
            embed['fields'] = [
                {'name': k, 'value': str(v), 'inline': True}
                for k, v in fields.items()
            ]

        if footer:
            embed['footer'] = {'text': footer}

        payload = {'embeds': [embed]}
        response = requests.post(
            self.webhook_url,
            data=json.dumps(payload),
            headers={'Content-Type': 'application/json'}
        )
        return response.status_code in [200, 204]

    def send_alert(self, title, message, status='info'):
        """按状态发送通知"""
        colors = {
            'success': 0x2ecc71,  # 绿色
            'warning': 0xf39c12,  # 橙色
            'error': 0xe74c3c,    # 红色
            'info': 0x3498db      # 蓝色
        }

        emojis = {
            'success': ':white_check_mark:',
            'warning': ':warning:',
            'error': ':x:',
            'info': ':information_source:'
        }

        embed = {
            'title': f"{emojis.get(status, '')} {title}",
            'description': message,
            'color': colors.get(status, 0x3498db),
            'timestamp': datetime.utcnow().isoformat(),
            'footer': {'text': '自动化通知系统'}
        }

        payload = {'embeds': [embed]}
        response = requests.post(
            self.webhook_url,
            data=json.dumps(payload),
            headers={'Content-Type': 'application/json'}
        )
        return response.status_code in [200, 204]

# Discord Webhook URL生成:
# 1. 服务器设置 > 整合 > Webhook
# 2. 创建新Webhook
# 3. 复制Webhook URL

discord = DiscordNotifier('https://discord.com/api/webhooks/xxx/yyy')

# 简单消息
discord.send_simple('服务器状态正常!')

# 嵌入消息
discord.send_embed(
    title='每日报告',
    description='今日系统状况如下。',
    color=0x3498db,
    fields={
        'CPU使用率': '45%',
        '内存使用率': '62%',
        '磁盘使用率': '78%',
        '活动会话': '234个'
    },
    footer='监控系统'
)

# 错误通知
discord.send_alert(
    title='数据库错误',
    message='```\nConnectionError: Unable to connect to MySQL server\n```',
    status='error'
)

9. 桌面通知(plyer)

# 安装plyer
# pip install plyer

from plyer import notification
import time

def send_desktop_notification(title, message, timeout=10, app_icon=None):
    """发送桌面通知

    Args:
        title: 通知标题
        message: 通知内容
        timeout: 通知显示时间(秒)
        app_icon: 图标路径(.ico文件)
    """
    notification.notify(
        title=title,
        message=message,
        timeout=timeout,
        app_icon=app_icon,
        app_name='Python自动化'
    )

# 使用示例
send_desktop_notification(
    title='任务完成',
    message='数据处理已完成。\n处理数量: 1,234条',
    timeout=5
)

# 多步骤任务通知
def process_with_notifications():
    """通过桌面通知显示任务进度"""

    send_desktop_notification('任务开始', '开始数据收集。')
    time.sleep(2)  # 模拟任务

    send_desktop_notification('进行中', '数据转换中... (50%)')
    time.sleep(2)

    send_desktop_notification('完成', '所有任务已完成!')

# Windows Toast通知(更多功能)
try:
    from win10toast import ToastNotifier

    toaster = ToastNotifier()

    def send_windows_toast(title, message, duration=5, icon_path=None):
        """Windows Toast通知"""
        toaster.show_toast(
            title,
            message,
            icon_path=icon_path,
            duration=duration,
            threaded=True  # 异步执行
        )

except ImportError:
    pass  # 非Windows或未安装win10toast

10. 实战:监控通知系统

让我们综合所学内容,创建一个服务器监控通知系统。

import psutil
import requests
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from datetime import datetime
import time
import json
import logging

# 日志设置
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('monitor.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

class NotificationManager:
    """统一通知管理类"""

    def __init__(self, config):
        self.config = config
        self.last_alert_time = {}

    def send_email(self, subject, body, html=False):
        """发送邮件"""
        try:
            cfg = self.config['email']
            msg = MIMEMultipart('alternative')
            msg['From'] = cfg['sender']
            msg['To'] = cfg['recipient']
            msg['Subject'] = subject

            content_type = 'html' if html else 'plain'
            msg.attach(MIMEText(body, content_type, 'utf-8'))

            with smtplib.SMTP(cfg['smtp_server'], cfg['smtp_port']) as server:
                server.starttls()
                server.login(cfg['sender'], cfg['password'])
                server.send_message(msg)

            logger.info(f"邮件发送完成: {subject}")
            return True
        except Exception as e:
            logger.error(f"邮件发送失败: {e}")
            return False

    def send_slack(self, message, status='info'):
        """Slack通知"""
        try:
            webhook_url = self.config['slack']['webhook_url']

            colors = {
                'success': '#36a64f',
                'warning': '#ffcc00',
                'error': '#ff0000',
                'info': '#3498db'
            }

            payload = {
                'attachments': [{
                    'color': colors.get(status, '#3498db'),
                    'text': message,
                    'ts': datetime.now().timestamp()
                }]
            }

            response = requests.post(
                webhook_url,
                data=json.dumps(payload),
                headers={'Content-Type': 'application/json'}
            )

            if response.status_code == 200:
                logger.info(f"Slack通知发送完成")
                return True
            return False
        except Exception as e:
            logger.error(f"Slack通知失败: {e}")
            return False

    def send_telegram(self, message):
        """Telegram通知"""
        try:
            cfg = self.config['telegram']
            url = f"https://api.telegram.org/bot{cfg['token']}/sendMessage"
            params = {
                'chat_id': cfg['chat_id'],
                'text': message,
                'parse_mode': 'HTML'
            }
            response = requests.post(url, data=params)

            if response.status_code == 200:
                logger.info("Telegram通知发送完成")
                return True
            return False
        except Exception as e:
            logger.error(f"Telegram通知失败: {e}")
            return False

    def send_all(self, title, message, status='info', html_body=None):
        """向所有渠道发送通知"""
        # 邮件
        if self.config.get('email', {}).get('enabled'):
            self.send_email(title, html_body or message, html=bool(html_body))

        # Slack
        if self.config.get('slack', {}).get('enabled'):
            self.send_slack(f"*{title}*\n{message}", status)

        # Telegram
        if self.config.get('telegram', {}).get('enabled'):
            self.send_telegram(f"{title}\n\n{message}")

    def can_alert(self, alert_type, cooldown_seconds=300):
        """检查通知冷却(防止重复通知)"""
        now = datetime.now()
        last_time = self.last_alert_time.get(alert_type)

        if last_time and (now - last_time).seconds < cooldown_seconds:
            return False

        self.last_alert_time[alert_type] = now
        return True


class SystemMonitor:
    """系统监控类"""

    def __init__(self, notification_manager, thresholds):
        self.notifier = notification_manager
        self.thresholds = thresholds

    def get_system_stats(self):
        """收集系统状态"""
        return {
            'cpu_percent': psutil.cpu_percent(interval=1),
            'memory_percent': psutil.virtual_memory().percent,
            'disk_percent': psutil.disk_usage('/').percent,
            'network': psutil.net_io_counters(),
            'timestamp': datetime.now()
        }

    def check_thresholds(self, stats):
        """检查阈值并发送通知"""
        alerts = []

        # CPU检查
        if stats['cpu_percent'] > self.thresholds['cpu_critical']:
            alerts.append(('CPU危险', f"CPU使用率: {stats['cpu_percent']}%", 'error'))
        elif stats['cpu_percent'] > self.thresholds['cpu_warning']:
            alerts.append(('CPU警告', f"CPU使用率: {stats['cpu_percent']}%", 'warning'))

        # 内存检查
        if stats['memory_percent'] > self.thresholds['memory_critical']:
            alerts.append(('内存危险', f"内存使用率: {stats['memory_percent']}%", 'error'))
        elif stats['memory_percent'] > self.thresholds['memory_warning']:
            alerts.append(('内存警告', f"内存使用率: {stats['memory_percent']}%", 'warning'))

        # 磁盘检查
        if stats['disk_percent'] > self.thresholds['disk_critical']:
            alerts.append(('磁盘危险', f"磁盘使用率: {stats['disk_percent']}%", 'error'))
        elif stats['disk_percent'] > self.thresholds['disk_warning']:
            alerts.append(('磁盘警告', f"磁盘使用率: {stats['disk_percent']}%", 'warning'))

        return alerts

    def send_daily_report(self, stats):
        """发送每日报告"""
        title = f"[每日报告] 系统状况 - {datetime.now().strftime('%Y-%m-%d')}"

        message = f"""
系统状况报告

- CPU使用率: {stats['cpu_percent']}%
- 内存使用率: {stats['memory_percent']}%
- 磁盘使用率: {stats['disk_percent']}%

报告生成时间: {stats['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}
"""

        self.notifier.send_all(title, message, 'info')

    def run(self, interval=60):
        """运行监控"""
        logger.info("监控开始")

        last_report_date = None

        while True:
            try:
                # 收集系统状态
                stats = self.get_system_stats()
                logger.info(f"CPU: {stats['cpu_percent']}%, MEM: {stats['memory_percent']}%, DISK: {stats['disk_percent']}%")

                # 检查阈值
                alerts = self.check_thresholds(stats)
                for title, message, status in alerts:
                    if self.notifier.can_alert(title, cooldown_seconds=300):
                        self.notifier.send_all(title, message, status)

                # 每日报告(上午9点)
                now = datetime.now()
                if now.hour == 9 and now.date() != last_report_date:
                    self.send_daily_report(stats)
                    last_report_date = now.date()

                time.sleep(interval)

            except KeyboardInterrupt:
                logger.info("监控结束")
                break
            except Exception as e:
                logger.error(f"监控错误: {e}")
                time.sleep(interval)


# 配置示例
config = {
    'email': {
        'enabled': True,
        'sender': 'your_email@gmail.com',
        'password': '应用专用密码',
        'recipient': 'admin@example.com',
        'smtp_server': 'smtp.gmail.com',
        'smtp_port': 587
    },
    'slack': {
        'enabled': True,
        'webhook_url': 'https://hooks.slack.com/services/xxx/yyy/zzz'
    },
    'telegram': {
        'enabled': True,
        'token': 'YOUR_BOT_TOKEN',
        'chat_id': 'YOUR_CHAT_ID'
    }
}

thresholds = {
    'cpu_warning': 70,
    'cpu_critical': 90,
    'memory_warning': 80,
    'memory_critical': 95,
    'disk_warning': 85,
    'disk_critical': 95
}

# 执行
if __name__ == "__main__":
    notifier = NotificationManager(config)
    monitor = SystemMonitor(notifier, thresholds)
    monitor.run(interval=60)  # 每60秒监控一次

总结

在本篇中,我们学习了使用Python进行各种通知自动化的方法。

  • 邮件:使用smtplib发送文本/HTML邮件和附件
  • Slack:通过Webhook方便地发送团队通知
  • Telegram:通过Bot发送个人/群组通知
  • Discord:通过Webhook发送社区通知
  • 桌面:使用plyer发送本地通知

在实际工作中,可以根据情况选择合适的渠道,或组合使用多个渠道。紧急故障通知通常使用Telegram或Slack,定期报告则通过邮件发送。

通过本系列,我们学习了Python自动化的核心技术。文件处理、网页爬虫、数据库连接、Excel自动化以及通知系统,结合这些技术可以自动化各种工作。请将所学内容应用到实际工作中,从重复性任务中解放出来。