引言:自动化的完成,定时调度

到目前为止学习的文件处理、网页爬虫、API集成等技术,真正变得有用的关键就是"定时调度"。按照指定时间自动执行的脚本可以完全替代手工操作,24小时不间断地执行任务。

在这最后一篇中,我们将学习Python中调度和执行任务的各种方法,以及构建稳定自动化系统所需的日志记录、错误处理、配置管理技术。最后,我们将构建一个综合本系列所学内容的实战项目。

1. 时间处理:time和datetime模块

1.1 time模块基础

import time

# 当前时间 (Unix时间戳)
timestamp = time.time()
print(f"当前时间戳: {timestamp}")  # 例: 1737529200.123456

# 程序暂停
print("等待3秒...")
time.sleep(3)
print("等待完成!")

# 执行时间测量
start_time = time.time()
# 执行操作
for i in range(1000000):
    pass
end_time = time.time()
print(f"执行时间: {end_time - start_time:.4f}秒")

# 更精确的时间测量
start = time.perf_counter()
# 执行操作
time.sleep(0.1)
end = time.perf_counter()
print(f"精确测量: {end - start:.6f}秒")

# 结构化时间
local_time = time.localtime()
print(f"当前时间: {local_time.tm_year}年{local_time.tm_mon}月{local_time.tm_mday}日")
print(f"时间: {local_time.tm_hour}:{local_time.tm_min}:{local_time.tm_sec}")

# 转换为字符串
formatted = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
print(f"格式化时间: {formatted}")

1.2 datetime模块应用

from datetime import datetime, date, time, timedelta
import pytz  # pip install pytz

# 当前日期和时间
now = datetime.now()
print(f"当前: {now}")
print(f"日期: {now.date()}")
print(f"时间: {now.time()}")

# 创建特定日期/时间
specific_date = datetime(2026, 1, 22, 9, 30, 0)
print(f"特定时间: {specific_date}")

# 日期格式化
formatted = now.strftime("%Y年%m月%d日 %H时%M分%S秒")
print(f"中文格式: {formatted}")

# 字符串解析
date_str = "2026-01-22 14:30:00"
parsed = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
print(f"解析后日期: {parsed}")

# 日期运算
tomorrow = now + timedelta(days=1)
next_week = now + timedelta(weeks=1)
two_hours_later = now + timedelta(hours=2)

print(f"明天: {tomorrow.date()}")
print(f"下周: {next_week.date()}")
print(f"2小时后: {two_hours_later.time()}")

# 两个日期之间的差异
date1 = datetime(2026, 12, 31)
date2 = datetime.now()
diff = date1 - date2
print(f"距离2026年末还有{diff.days}天")

# 时区处理
kst = pytz.timezone('Asia/Seoul')
utc = pytz.UTC

now_kst = datetime.now(kst)
now_utc = datetime.now(utc)

print(f"韩国时间: {now_kst}")
print(f"UTC时间: {now_utc}")

# UTC转换为韩国时间
utc_time = datetime(2026, 1, 22, 0, 0, 0, tzinfo=utc)
korea_time = utc_time.astimezone(kst)
print(f"UTC 00:00 = 韩国 {korea_time.strftime('%H:%M')}")

1.3 实用日期工具函数

from datetime import datetime, timedelta
import calendar

def get_weekday_name(date_obj, lang='zh'):
    """返回星期名称"""
    weekdays_zh = ['星期一', '星期二', '星期三', '星期四', '星期五', '星期六', '星期日']
    weekdays_en = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']

    weekdays = weekdays_zh if lang == 'zh' else weekdays_en
    return weekdays[date_obj.weekday()]

def get_month_range(year, month):
    """返回该月的第一天和最后一天"""
    first_day = datetime(year, month, 1)
    last_day_num = calendar.monthrange(year, month)[1]
    last_day = datetime(year, month, last_day_num)
    return first_day, last_day

def get_business_days(start_date, end_date):
    """计算两个日期之间的工作日"""
    business_days = 0
    current = start_date

    while current <= end_date:
        if current.weekday() < 5:  # 周一至周五
            business_days += 1
        current += timedelta(days=1)

    return business_days

def is_business_hour(check_time=None, start_hour=9, end_hour=18):
    """检查是否为工作时间"""
    if check_time is None:
        check_time = datetime.now()

    # 周末检查
    if check_time.weekday() >= 5:
        return False

    # 时间检查
    hour = check_time.hour
    return start_hour <= hour < end_hour

# 使用示例
today = datetime.now()
print(f"今天是{get_weekday_name(today)}。")

first, last = get_month_range(2026, 1)
print(f"2026年1月: {first.date()} ~ {last.date()}")

days = get_business_days(datetime(2026, 1, 1), datetime(2026, 1, 31))
print(f"2026年1月工作日: {days}天")

print(f"现在是工作时间: {is_business_hour()}")

2. schedule库

2.1 基本用法

import schedule
import time

# pip install schedule

def job():
    print(f"[{time.strftime('%H:%M:%S')}] 任务执行!")

def morning_report():
    print("正在生成早间报告。")

def backup_data():
    print("正在备份数据。")

# 各种调度模式
schedule.every(10).seconds.do(job)  # 每10秒
schedule.every(5).minutes.do(job)   # 每5分钟
schedule.every().hour.do(job)        # 每小时
schedule.every().day.at("09:00").do(morning_report)  # 每天上午9点
schedule.every().monday.do(job)      # 每周一
schedule.every().wednesday.at("13:15").do(job)  # 每周三13:15

# 仅在特定时间执行
schedule.every().day.at("00:00").do(backup_data)  # 午夜备份

# 限制执行次数
schedule.every().second.do(job).tag('limited')

# 主循环
print("调度器启动...")
while True:
    schedule.run_pending()
    time.sleep(1)

2.2 高级schedule模式

import schedule
import time
from functools import partial

# 带参数的任务
def greet(name):
    print(f"你好,{name}!")

# 使用partial传递参数
schedule.every().day.at("08:00").do(partial(greet, "张三"))

# 使用lambda传递参数
schedule.every().day.at("08:30").do(lambda: greet("李四"))

# 使用标签管理任务
def data_sync():
    print("正在同步数据...")

def send_notification():
    print("正在发送通知...")

schedule.every(30).minutes.do(data_sync).tag('sync', 'database')
schedule.every().hour.do(send_notification).tag('notification')

# 取消特定标签的任务
# schedule.clear('sync')

# 查询特定标签的任务
sync_jobs = schedule.get_jobs('sync')
print(f"同步任务数量: {len(sync_jobs)}")

# 取消任务(仅执行一次)
def run_once():
    print("仅执行一次的任务")
    return schedule.CancelJob

schedule.every().second.do(run_once)

# 检查下次执行时间
next_run = schedule.next_run()
print(f"下次任务执行时间: {next_run}")

# 检查等待时间
idle_seconds = schedule.idle_seconds()
print(f"距离下次任务还有{idle_seconds}秒")

# 运行调度器
while True:
    schedule.run_pending()
    time.sleep(1)

2.3 包含异常处理的调度器

import schedule
import time
import traceback
from functools import wraps

def catch_exceptions(job_func):
    """捕获异常并记录的装饰器"""
    @wraps(job_func)
    def wrapper(*args, **kwargs):
        try:
            return job_func(*args, **kwargs)
        except Exception as e:
            print(f"任务执行中发生错误: {e}")
            traceback.print_exc()
            # 即使发生错误,调度器也会继续运行
            return None
    return wrapper

@catch_exceptions
def risky_job():
    """可能发生错误的任务"""
    import random
    if random.random() < 0.3:
        raise Exception("随机错误发生!")
    print("任务成功!")

schedule.every(5).seconds.do(risky_job)

# 安全的调度器循环
def run_scheduler():
    while True:
        try:
            schedule.run_pending()
        except Exception as e:
            print(f"调度器错误: {e}")
        time.sleep(1)

run_scheduler()

3. APScheduler库

3.1 APScheduler介绍及安装

# pip install apscheduler

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
from datetime import datetime, timedelta

def my_job(message="默认消息"):
    print(f"[{datetime.now().strftime('%H:%M:%S')}] {message}")

# BlockingScheduler - 在主线程中运行(阻塞)
scheduler = BlockingScheduler()

# BackgroundScheduler - 在后台运行(非阻塞)
# scheduler = BackgroundScheduler()

# Interval触发器 - 按固定间隔执行
scheduler.add_job(
    my_job,
    IntervalTrigger(seconds=10),
    args=["每10秒执行"],
    id='interval_job'
)

# Cron触发器 - 使用cron表达式
scheduler.add_job(
    my_job,
    CronTrigger(hour=9, minute=0),
    args=["每天上午9点"],
    id='daily_job'
)

# Date触发器 - 在特定日期/时间执行一次
scheduler.add_job(
    my_job,
    DateTrigger(run_date=datetime.now() + timedelta(seconds=5)),
    args=["5秒后仅执行一次"],
    id='one_time_job'
)

# 启动调度器
try:
    print("APScheduler启动...")
    scheduler.start()
except KeyboardInterrupt:
    print("调度器终止")
    scheduler.shutdown()

3.2 Cron表达式应用

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime

scheduler = BlockingScheduler()

def job(name):
    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {name} 执行")

# Cron表达式模式
# ┌───────────── 分 (0 - 59)
# │ ┌───────────── 时 (0 - 23)
# │ │ ┌───────────── 日 (1 - 31)
# │ │ │ ┌───────────── 月 (1 - 12)
# │ │ │ │ ┌───────────── 星期 (0 - 6, 0=星期一)
# │ │ │ │ │
# * * * * *

# 每分钟执行
scheduler.add_job(job, CronTrigger(minute='*'), args=["每分钟"])

# 每小时30分执行
scheduler.add_job(job, CronTrigger(minute=30), args=["每小时30分"])

# 每天上午9点、下午6点执行
scheduler.add_job(job, CronTrigger(hour='9,18', minute=0), args=["上午9点,下午6点"])

# 工作日上午9点执行(周一至周五)
scheduler.add_job(job, CronTrigger(day_of_week='mon-fri', hour=9, minute=0), args=["工作日9点"])

# 每月1日午夜执行
scheduler.add_job(job, CronTrigger(day=1, hour=0, minute=0), args=["每月1日"])

# 每周一上午8点30分
scheduler.add_job(job, CronTrigger(day_of_week='mon', hour=8, minute=30), args=["周一8:30"])

# 每15分钟执行
scheduler.add_job(job, CronTrigger(minute='*/15'), args=["每15分钟"])

# 工作时间(9-18点)每30分钟
scheduler.add_job(
    job,
    CronTrigger(hour='9-17', minute='0,30'),
    args=["工作时间每30分钟"]
)

# 以字符串形式传递cron表达式
scheduler.add_job(
    job,
    CronTrigger.from_crontab('0 9 * * 1-5'),  # 工作日9点
    args=["crontab格式"]
)

scheduler.start()

3.3 任务存储和事件监听器

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from datetime import datetime
import time

# 任务存储设置(使用SQLite)
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}

# 执行器设置
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}

# 默认设置
job_defaults = {
    'coalesce': False,  # 是否合并错过的任务
    'max_instances': 3,  # 同时执行的最大实例数
    'misfire_grace_time': 60  # 错过任务的允许时间(秒)
}

scheduler = BackgroundScheduler(
    jobstores=jobstores,
    executors=executors,
    job_defaults=job_defaults,
    timezone='Asia/Seoul'
)

# 事件监听器
def job_executed_listener(event):
    """任务完成时调用"""
    print(f"任务完成: {event.job_id}")
    print(f"执行时间: {event.scheduled_run_time}")
    print(f"返回值: {event.retval}")

def job_error_listener(event):
    """任务出错时调用"""
    print(f"任务出错: {event.job_id}")
    print(f"异常: {event.exception}")
    print(f"堆栈跟踪: {event.traceback}")

scheduler.add_listener(job_executed_listener, EVENT_JOB_EXECUTED)
scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR)

# 添加任务
def sample_job():
    print(f"任务执行: {datetime.now()}")
    return "完成"

scheduler.add_job(
    sample_job,
    'interval',
    seconds=10,
    id='sample_job',
    replace_existing=True
)

# 启动调度器
scheduler.start()

# 任务管理
print("已注册的任务:")
for job in scheduler.get_jobs():
    print(f"  - {job.id}: 下次执行 {job.next_run_time}")

# 暂停任务
# scheduler.pause_job('sample_job')

# 恢复任务
# scheduler.resume_job('sample_job')

# 修改任务
# scheduler.reschedule_job('sample_job', trigger='interval', seconds=30)

# 移除任务
# scheduler.remove_job('sample_job')

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    scheduler.shutdown()

4. Windows任务计划程序集成

4.1 通过批处理文件运行Python脚本

@echo off
REM run_python_script.bat

REM 激活Python虚拟环境(可选)
call C:\path\to\venv\Scripts\activate.bat

REM 运行Python脚本
python C:\path\to\your_script.py

REM 记录日志
echo %date% %time% - Script executed >> C:\logs\execution.log

REM 停用虚拟环境
deactivate

4.2 使用Python控制Windows任务计划程序

import subprocess
import os

def create_scheduled_task(
    task_name,
    python_script,
    schedule_type="DAILY",
    start_time="09:00",
    python_path=None
):
    """在Windows任务计划程序中注册任务"""

    if python_path is None:
        python_path = os.sys.executable

    # 创建批处理文件
    batch_content = f'''@echo off
"{python_path}" "{python_script}"
'''
    batch_path = python_script.replace('.py', '.bat')
    with open(batch_path, 'w') as f:
        f.write(batch_content)

    # 使用schtasks命令注册任务
    cmd = [
        'schtasks', '/create',
        '/tn', task_name,          # 任务名称
        '/tr', batch_path,          # 要运行的程序
        '/sc', schedule_type,       # 调度类型(DAILY, WEEKLY, MONTHLY等)
        '/st', start_time,          # 开始时间
        '/f'                        # 覆盖现有任务
    ]

    try:
        result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
        if result.returncode == 0:
            print(f"任务'{task_name}'已成功注册。")
        else:
            print(f"错误: {result.stderr}")
    except Exception as e:
        print(f"任务注册失败: {e}")

def delete_scheduled_task(task_name):
    """删除已注册的任务"""
    cmd = ['schtasks', '/delete', '/tn', task_name, '/f']

    try:
        result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
        if result.returncode == 0:
            print(f"任务'{task_name}'已删除。")
        else:
            print(f"错误: {result.stderr}")
    except Exception as e:
        print(f"任务删除失败: {e}")

def list_scheduled_tasks(filter_name=None):
    """查询已注册的任务列表"""
    cmd = ['schtasks', '/query', '/fo', 'LIST']

    if filter_name:
        cmd.extend(['/tn', filter_name])

    try:
        result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
        print(result.stdout)
    except Exception as e:
        print(f"列表查询失败: {e}")

# 使用示例
# create_scheduled_task(
#     "MyPythonTask",
#     r"C:\scripts\daily_report.py",
#     schedule_type="DAILY",
#     start_time="09:00"
# )

5. Linux cron集成

5.1 crontab基本用法

# 编辑crontab
crontab -e

# 查看crontab列表
crontab -l

# 删除crontab
crontab -r

# cron表达式
# ┌───────────── 分 (0 - 59)
# │ ┌───────────── 时 (0 - 23)
# │ │ ┌───────────── 日 (1 - 31)
# │ │ │ ┌───────────── 月 (1 - 12)
# │ │ │ │ ┌───────────── 星期 (0 - 7, 0和7都是星期日)
# │ │ │ │ │
# * * * * *  命令

# 示例:
# 每分钟执行
* * * * * /usr/bin/python3 /home/user/script.py

# 每天上午9点执行
0 9 * * * /usr/bin/python3 /home/user/daily_report.py

# 每周一上午8点30分
30 8 * * 1 /usr/bin/python3 /home/user/weekly_report.py

# 每5分钟执行
*/5 * * * * /usr/bin/python3 /home/user/check_status.py

# 工作日工作时间(9-18点)每小时执行
0 9-18 * * 1-5 /usr/bin/python3 /home/user/hourly_check.py

# 日志输出重定向
0 9 * * * /usr/bin/python3 /home/user/script.py >> /home/user/logs/cron.log 2>&1

5.2 使用Python管理crontab

# pip install python-crontab
from crontab import CronTab
import os

class CronManager:
    """crontab管理类"""

    def __init__(self, user=None):
        self.cron = CronTab(user=user or os.getlogin())

    def add_job(self, command, schedule, comment=None):
        """添加新任务"""
        job = self.cron.new(command=command, comment=comment)
        job.setall(schedule)
        self.cron.write()
        print(f"任务已添加: {comment or command}")

    def remove_job(self, comment):
        """通过注释查找并删除任务"""
        removed = self.cron.remove_all(comment=comment)
        self.cron.write()
        print(f"{removed}个任务已删除")

    def list_jobs(self):
        """列出所有任务"""
        for job in self.cron:
            print(f"{job.slices} - {job.command} ({job.comment})")

    def enable_job(self, comment):
        """启用任务"""
        for job in self.cron.find_comment(comment):
            job.enable()
        self.cron.write()

    def disable_job(self, comment):
        """禁用任务"""
        for job in self.cron.find_comment(comment):
            job.enable(False)
        self.cron.write()

# 使用示例(仅在Linux上运行)
# manager = CronManager()
#
# # 每天上午9点运行Python脚本
# manager.add_job(
#     '/usr/bin/python3 /home/user/daily_report.py',
#     '0 9 * * *',
#     comment='daily_report'
# )
#
# # 查看任务列表
# manager.list_jobs()

6. 日志记录(logging模块)

6.1 基本日志设置

import logging

# 基本设置
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

# 创建日志记录器
logger = logging.getLogger(__name__)

# 按级别记录消息
logger.debug("调试消息 - 详细诊断信息")
logger.info("信息消息 - 正常运行确认")
logger.warning("警告消息 - 需要注意的情况")
logger.error("错误消息 - 功能失败")
logger.critical("严重错误 - 程序无法运行")

# 异常日志
try:
    result = 1 / 0
except Exception as e:
    logger.exception("异常发生!")  # 包含堆栈跟踪

6.2 文件日志和日志轮转

import logging
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
from pathlib import Path

def setup_logger(name, log_dir="logs", level=logging.DEBUG):
    """日志记录器设置函数"""
    log_path = Path(log_dir)
    log_path.mkdir(parents=True, exist_ok=True)

    logger = logging.getLogger(name)
    logger.setLevel(level)

    # 格式化器
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )

    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    console_handler.setFormatter(formatter)

    # 文件处理器(基于大小的轮转)
    file_handler = RotatingFileHandler(
        log_path / f"{name}.log",
        maxBytes=10*1024*1024,  # 10MB
        backupCount=5,
        encoding='utf-8'
    )
    file_handler.setLevel(logging.DEBUG)
    file_handler.setFormatter(formatter)

    # 错误专用文件处理器
    error_handler = RotatingFileHandler(
        log_path / f"{name}_error.log",
        maxBytes=10*1024*1024,
        backupCount=5,
        encoding='utf-8'
    )
    error_handler.setLevel(logging.ERROR)
    error_handler.setFormatter(formatter)

    # 基于时间的轮转(每天午夜)
    daily_handler = TimedRotatingFileHandler(
        log_path / f"{name}_daily.log",
        when='midnight',
        interval=1,
        backupCount=30,
        encoding='utf-8'
    )
    daily_handler.setLevel(logging.INFO)
    daily_handler.setFormatter(formatter)

    # 添加处理器
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    logger.addHandler(error_handler)
    logger.addHandler(daily_handler)

    return logger

# 使用示例
logger = setup_logger("automation")

logger.debug("调试消息")
logger.info("信息消息")
logger.warning("警告消息")
logger.error("错误消息")

6.3 结构化日志(JSON日志)

import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    """JSON格式日志格式化器"""

    def format(self, record):
        log_data = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
            "line": record.lineno
        }

        # 添加异常信息
        if record.exc_info:
            log_data["exception"] = self.formatException(record.exc_info)

        # 附加字段
        if hasattr(record, 'extra_data'):
            log_data["extra"] = record.extra_data

        return json.dumps(log_data, ensure_ascii=False)

class ContextLogger:
    """包含上下文信息的日志记录器"""

    def __init__(self, logger):
        self.logger = logger
        self.context = {}

    def set_context(self, **kwargs):
        """设置上下文"""
        self.context.update(kwargs)

    def clear_context(self):
        """清除上下文"""
        self.context = {}

    def _log(self, level, message, **kwargs):
        extra_data = {**self.context, **kwargs}
        record = self.logger.makeRecord(
            self.logger.name, level, "", 0, message, (), None
        )
        record.extra_data = extra_data
        self.logger.handle(record)

    def info(self, message, **kwargs):
        self._log(logging.INFO, message, **kwargs)

    def error(self, message, **kwargs):
        self._log(logging.ERROR, message, **kwargs)

# 设置
logger = logging.getLogger("json_logger")
handler = logging.FileHandler("app.json.log")
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)

# 使用
ctx_logger = ContextLogger(logger)
ctx_logger.set_context(user_id="user123", session_id="sess456")
ctx_logger.info("用户登录", ip="192.168.1.1")
ctx_logger.info("数据查询", query="SELECT * FROM users")

7. 错误处理和重试逻辑

7.1 重试装饰器

import time
import logging
from functools import wraps
import random

logger = logging.getLogger(__name__)

def retry(max_attempts=3, delay=1, backoff=2, exceptions=(Exception,)):
    """
    重试装饰器

    Args:
        max_attempts: 最大尝试次数
        delay: 初始等待时间(秒)
        backoff: 等待时间增长倍数
        exceptions: 要重试的异常元组
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            current_delay = delay

            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    if attempt == max_attempts:
                        logger.error(f"{func.__name__} 最终失败: {e}")
                        raise

                    logger.warning(
                        f"{func.__name__} 失败 (尝试 {attempt}/{max_attempts}): {e}"
                    )
                    logger.info(f"{current_delay}秒后重试...")

                    time.sleep(current_delay)
                    current_delay *= backoff

            return None
        return wrapper
    return decorator

# 使用示例
@retry(max_attempts=3, delay=1, backoff=2, exceptions=(ConnectionError, TimeoutError))
def fetch_data(url):
    """网络请求(可能失败)"""
    if random.random() < 0.7:  # 70%概率失败
        raise ConnectionError("连接失败")
    return "数据"

# 测试
try:
    result = fetch_data("https://example.com")
    print(f"结果: {result}")
except ConnectionError:
    print("所有重试失败")

7.2 tenacity库应用

# pip install tenacity
from tenacity import (
    retry,
    stop_after_attempt,
    stop_after_delay,
    wait_fixed,
    wait_exponential,
    wait_random,
    retry_if_exception_type,
    before_sleep_log,
    after_log
)
import logging

logger = logging.getLogger(__name__)

# 3次尝试,固定2秒等待
@retry(
    stop=stop_after_attempt(3),
    wait=wait_fixed(2)
)
def simple_retry():
    print("尝试中...")
    raise Exception("失败!")

# 指数退避(1, 2, 4, 8... 秒)
@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=1, max=60)
)
def exponential_retry():
    print("尝试中...")
    raise Exception("失败!")

# 仅重试特定异常
@retry(
    retry=retry_if_exception_type((ConnectionError, TimeoutError)),
    stop=stop_after_attempt(3)
)
def selective_retry():
    raise ValueError("此异常不重试")

# 带日志
@retry(
    stop=stop_after_attempt(3),
    wait=wait_fixed(1),
    before_sleep=before_sleep_log(logger, logging.WARNING),
    after=after_log(logger, logging.INFO)
)
def logged_retry():
    raise Exception("失败!")

# 30秒超时
@retry(
    stop=stop_after_delay(30),
    wait=wait_fixed(2)
)
def timeout_retry():
    raise Exception("失败!")

# 复合条件
@retry(
    stop=(stop_after_attempt(5) | stop_after_delay(30)),
    wait=wait_exponential(multiplier=1, max=10) + wait_random(0, 2)
)
def complex_retry():
    raise Exception("失败!")

7.3 Circuit Breaker模式

import time
from enum import Enum
from threading import Lock

class CircuitState(Enum):
    CLOSED = "closed"      # 正常状态
    OPEN = "open"          # 断开状态
    HALF_OPEN = "half_open"  # 测试状态

class CircuitBreaker:
    """
    Circuit Breaker模式实现

    连续失败时,在一定时间内阻止请求,
    防止系统过载。
    """

    def __init__(
        self,
        failure_threshold=5,
        recovery_timeout=30,
        expected_exception=Exception
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception

        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = None
        self.lock = Lock()

    def __call__(self, func):
        def wrapper(*args, **kwargs):
            return self.call(func, *args, **kwargs)
        return wrapper

    def call(self, func, *args, **kwargs):
        with self.lock:
            if self.state == CircuitState.OPEN:
                if self._should_try_reset():
                    self.state = CircuitState.HALF_OPEN
                else:
                    raise CircuitBreakerOpen(
                        f"电路断开中。{self._time_until_reset()}秒后可重试"
                    )

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except self.expected_exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        with self.lock:
            self.failure_count = 0
            self.state = CircuitState.CLOSED

    def _on_failure(self):
        with self.lock:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN

    def _should_try_reset(self):
        return time.time() - self.last_failure_time >= self.recovery_timeout

    def _time_until_reset(self):
        elapsed = time.time() - self.last_failure_time
        return max(0, int(self.recovery_timeout - elapsed))

class CircuitBreakerOpen(Exception):
    pass

# 使用示例
circuit_breaker = CircuitBreaker(
    failure_threshold=3,
    recovery_timeout=10
)

@circuit_breaker
def unreliable_api():
    import random
    if random.random() < 0.8:
        raise ConnectionError("API连接失败")
    return "成功"

# 测试
for i in range(10):
    try:
        result = unreliable_api()
        print(f"尝试 {i+1}: {result}")
    except CircuitBreakerOpen as e:
        print(f"尝试 {i+1}: 电路断开 - {e}")
    except ConnectionError as e:
        print(f"尝试 {i+1}: 连接错误 - {e}")
    time.sleep(1)

8. 配置文件管理

8.1 使用configparser

import configparser
from pathlib import Path

# config.ini文件示例:
"""
[DEFAULT]
debug = false
log_level = INFO

[database]
host = localhost
port = 5432
name = mydb
user = admin
password = secret123

[api]
base_url = https://api.example.com
timeout = 30
max_retries = 3

[email]
smtp_server = smtp.gmail.com
smtp_port = 587
sender = noreply@example.com
"""

class Config:
    """配置文件管理类"""

    def __init__(self, config_path="config.ini"):
        self.config = configparser.ConfigParser()
        self.config_path = Path(config_path)

        if self.config_path.exists():
            self.config.read(config_path, encoding='utf-8')
        else:
            self._create_default_config()

    def _create_default_config(self):
        """创建默认配置文件"""
        self.config['DEFAULT'] = {
            'debug': 'false',
            'log_level': 'INFO'
        }
        self.config['database'] = {
            'host': 'localhost',
            'port': '5432',
            'name': 'mydb'
        }
        self.config['api'] = {
            'base_url': 'https://api.example.com',
            'timeout': '30'
        }
        self.save()

    def get(self, section, key, fallback=None):
        """获取配置值"""
        return self.config.get(section, key, fallback=fallback)

    def getint(self, section, key, fallback=0):
        """获取整数配置值"""
        return self.config.getint(section, key, fallback=fallback)

    def getboolean(self, section, key, fallback=False):
        """获取布尔配置值"""
        return self.config.getboolean(section, key, fallback=fallback)

    def set(self, section, key, value):
        """保存配置值"""
        if section not in self.config:
            self.config[section] = {}
        self.config[section][key] = str(value)

    def save(self):
        """保存配置文件"""
        with open(self.config_path, 'w', encoding='utf-8') as f:
            self.config.write(f)

    @property
    def database(self):
        """数据库配置字典"""
        return dict(self.config['database'])

    @property
    def api(self):
        """API配置字典"""
        return dict(self.config['api'])

# 使用示例
config = Config()
print(f"DB主机: {config.get('database', 'host')}")
print(f"API超时: {config.getint('api', 'timeout')}")
print(f"调试模式: {config.getboolean('DEFAULT', 'debug')}")

8.2 使用python-dotenv

# pip install python-dotenv
from dotenv import load_dotenv
import os
from pathlib import Path

# .env文件示例:
"""
# 数据库设置
DB_HOST=localhost
DB_PORT=5432
DB_NAME=mydb
DB_USER=admin
DB_PASSWORD=secret123

# API密钥
OPENAI_API_KEY=sk-xxxxxxxxxxxxx
NAVER_CLIENT_ID=your_client_id
NAVER_CLIENT_SECRET=your_secret

# 环境
ENVIRONMENT=development
DEBUG=true
"""

class EnvConfig:
    """基于环境变量的配置管理"""

    def __init__(self, env_file=".env"):
        # 加载.env文件
        env_path = Path(env_file)
        load_dotenv(env_path)

    @staticmethod
    def get(key, default=None):
        """获取环境变量"""
        return os.getenv(key, default)

    @staticmethod
    def get_bool(key, default=False):
        """布尔环境变量"""
        value = os.getenv(key, str(default)).lower()
        return value in ('true', '1', 'yes', 'on')

    @staticmethod
    def get_int(key, default=0):
        """整数环境变量"""
        try:
            return int(os.getenv(key, default))
        except ValueError:
            return default

    @staticmethod
    def get_list(key, default=None, separator=','):
        """列表环境变量"""
        value = os.getenv(key)
        if value is None:
            return default or []
        return [item.strip() for item in value.split(separator)]

    @property
    def is_debug(self):
        return self.get_bool('DEBUG')

    @property
    def environment(self):
        return self.get('ENVIRONMENT', 'development')

    @property
    def database_url(self):
        """生成数据库连接URL"""
        host = self.get('DB_HOST', 'localhost')
        port = self.get('DB_PORT', '5432')
        name = self.get('DB_NAME', 'mydb')
        user = self.get('DB_USER')
        password = self.get('DB_PASSWORD')

        if user and password:
            return f"postgresql://{user}:{password}@{host}:{port}/{name}"
        return f"postgresql://{host}:{port}/{name}"

# 按环境加载配置文件
def load_env_for_environment():
    """根据环境加载配置文件"""
    env = os.getenv('ENVIRONMENT', 'development')

    # 加载基本.env
    load_dotenv('.env')

    # 加载环境特定的.env(如果存在则覆盖)
    env_file = f'.env.{env}'
    if Path(env_file).exists():
        load_dotenv(env_file, override=True)

# 使用示例
config = EnvConfig()
print(f"环境: {config.environment}")
print(f"调试: {config.is_debug}")
print(f"DB URL: {config.database_url}")
print(f"OpenAI API Key: {config.get('OPENAI_API_KEY', 'Not set')[:20]}...")

9. 后台运行

9.1 守护进程(Linux)

import os
import sys
import time
import signal
import atexit
from pathlib import Path

class Daemon:
    """Unix守护进程"""

    def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
        self.pidfile = pidfile
        self.stdin = stdin
        self.stdout = stdout
        self.stderr = stderr

    def daemonize(self):
        """守护进程化(双重fork)"""
        # 第一次fork
        try:
            pid = os.fork()
            if pid > 0:
                sys.exit(0)
        except OSError as e:
            sys.stderr.write(f"Fork #1 失败: {e}\n")
            sys.exit(1)

        # 成为会话领导者
        os.chdir("/")
        os.setsid()
        os.umask(0)

        # 第二次fork
        try:
            pid = os.fork()
            if pid > 0:
                sys.exit(0)
        except OSError as e:
            sys.stderr.write(f"Fork #2 失败: {e}\n")
            sys.exit(1)

        # 重定向标准输入输出
        sys.stdout.flush()
        sys.stderr.flush()

        with open(self.stdin, 'r') as si:
            os.dup2(si.fileno(), sys.stdin.fileno())
        with open(self.stdout, 'a+') as so:
            os.dup2(so.fileno(), sys.stdout.fileno())
        with open(self.stderr, 'a+') as se:
            os.dup2(se.fileno(), sys.stderr.fileno())

        # 创建PID文件
        atexit.register(self.delpid)
        pid = str(os.getpid())
        with open(self.pidfile, 'w+') as f:
            f.write(f"{pid}\n")

    def delpid(self):
        """删除PID文件"""
        if os.path.exists(self.pidfile):
            os.remove(self.pidfile)

    def start(self):
        """启动守护进程"""
        # 检查PID文件
        try:
            with open(self.pidfile, 'r') as f:
                pid = int(f.read().strip())
        except (IOError, ValueError):
            pid = None

        if pid:
            sys.stderr.write(f"PID文件 {self.pidfile} 已存在。守护进程是否已运行?\n")
            sys.exit(1)

        self.daemonize()
        self.run()

    def stop(self):
        """停止守护进程"""
        try:
            with open(self.pidfile, 'r') as f:
                pid = int(f.read().strip())
        except (IOError, ValueError):
            pid = None

        if not pid:
            sys.stderr.write("PID文件不存在。守护进程未运行。\n")
            return

        try:
            while True:
                os.kill(pid, signal.SIGTERM)
                time.sleep(0.1)
        except OSError as e:
            if str(e).find("No such process") > 0:
                if os.path.exists(self.pidfile):
                    os.remove(self.pidfile)
            else:
                print(str(e))
                sys.exit(1)

    def restart(self):
        """重启守护进程"""
        self.stop()
        self.start()

    def run(self):
        """主要工作(在子类中实现)"""
        raise NotImplementedError

# 使用示例
class MyDaemon(Daemon):
    def run(self):
        while True:
            # 执行实际工作
            with open('/tmp/daemon.log', 'a') as f:
                f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - 守护进程运行中\n")
            time.sleep(60)

# if __name__ == "__main__":
#     daemon = MyDaemon('/tmp/mydaemon.pid')
#     if len(sys.argv) == 2:
#         if sys.argv[1] == 'start':
#             daemon.start()
#         elif sys.argv[1] == 'stop':
#             daemon.stop()
#         elif sys.argv[1] == 'restart':
#             daemon.restart()

9.2 Windows服务

# pip install pywin32
import win32serviceutil
import win32service
import win32event
import servicemanager
import socket
import time
import logging

class MyWindowsService(win32serviceutil.ServiceFramework):
    """Windows服务类"""

    _svc_name_ = 'MyPythonService'
    _svc_display_name_ = 'My Python Service'
    _svc_description_ = '使用Python创建的Windows服务。'

    def __init__(self, args):
        win32serviceutil.ServiceFramework.__init__(self, args)
        self.stop_event = win32event.CreateEvent(None, 0, 0, None)
        self.running = True

        # 日志设置
        logging.basicConfig(
            filename='C:\\logs\\myservice.log',
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )

    def SvcStop(self):
        """停止服务"""
        self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
        win32event.SetEvent(self.stop_event)
        self.running = False
        logging.info('服务停止请求')

    def SvcDoRun(self):
        """服务主循环"""
        servicemanager.LogMsg(
            servicemanager.EVENTLOG_INFORMATION_TYPE,
            servicemanager.PYS_SERVICE_STARTED,
            (self._svc_name_, '')
        )
        logging.info('服务启动')
        self.main()

    def main(self):
        """执行实际工作"""
        while self.running:
            # 在这里实现实际的工作逻辑
            logging.info('任务执行中...')
            time.sleep(60)  # 每分钟执行

if __name__ == '__main__':
    win32serviceutil.HandleCommandLine(MyWindowsService)

# 命令:
# python service.py install   # 安装服务
# python service.py start     # 启动服务
# python service.py stop      # 停止服务
# python service.py remove    # 删除服务

10. 实战综合项目:每日工作自动化系统

"""
每日工作自动化系统
- 新闻收集和摘要
- 天气信息查询
- 每日报告生成
- 邮件发送
"""

import os
import json
import logging
from datetime import datetime
from pathlib import Path
from dotenv import load_dotenv
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
import requests
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from functools import wraps
import time

# 加载环境变量
load_dotenv()

# 目录设置
BASE_DIR = Path(__file__).parent
LOG_DIR = BASE_DIR / "logs"
DATA_DIR = BASE_DIR / "data"
REPORT_DIR = BASE_DIR / "reports"

for dir_path in [LOG_DIR, DATA_DIR, REPORT_DIR]:
    dir_path.mkdir(parents=True, exist_ok=True)

# 日志设置
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(LOG_DIR / "automation.log", encoding='utf-8'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

def retry(max_attempts=3, delay=5):
    """重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        logger.error(f"{func.__name__} 最终失败: {e}")
                        raise
                    logger.warning(f"{func.__name__} 失败,重试 {attempt + 1}/{max_attempts}")
                    time.sleep(delay)
        return wrapper
    return decorator

class NewsCollector:
    """新闻收集器"""

    def __init__(self):
        self.client_id = os.getenv("NAVER_CLIENT_ID")
        self.client_secret = os.getenv("NAVER_CLIENT_SECRET")

    @retry(max_attempts=3)
    def collect(self, query, count=10):
        """Naver新闻搜索"""
        if not self.client_id or not self.client_secret:
            logger.warning("Naver API密钥未设置。")
            return []

        url = "https://openapi.naver.com/v1/search/news.json"
        headers = {
            "X-Naver-Client-Id": self.client_id,
            "X-Naver-Client-Secret": self.client_secret
        }
        params = {"query": query, "display": count, "sort": "date"}

        response = requests.get(url, headers=headers, params=params, timeout=10)
        response.raise_for_status()

        items = response.json().get("items", [])
        logger.info(f"新闻收集完成 {len(items)}条: {query}")
        return items

class WeatherCollector:
    """天气信息收集器"""

    def __init__(self):
        self.api_key = os.getenv("OPENWEATHERMAP_API_KEY")

    @retry(max_attempts=3)
    def collect(self, city="Seoul"):
        """天气信息查询"""
        if not self.api_key:
            logger.warning("OpenWeatherMap API密钥未设置。")
            return None

        url = "https://api.openweathermap.org/data/2.5/weather"
        params = {
            "q": city,
            "appid": self.api_key,
            "units": "metric",
            "lang": "zh_cn"
        }

        response = requests.get(url, params=params, timeout=10)
        response.raise_for_status()

        data = response.json()
        weather_info = {
            "city": city,
            "temperature": data["main"]["temp"],
            "humidity": data["main"]["humidity"],
            "description": data["weather"][0]["description"],
            "wind_speed": data["wind"]["speed"]
        }
        logger.info(f"天气信息收集完成: {city}")
        return weather_info

class ReportGenerator:
    """报告生成器"""

    def __init__(self):
        self.news_collector = NewsCollector()
        self.weather_collector = WeatherCollector()

    def generate_daily_report(self):
        """生成每日报告"""
        logger.info("开始生成每日报告")

        report_data = {
            "date": datetime.now().strftime("%Y-%m-%d"),
            "generated_at": datetime.now().isoformat(),
            "weather": None,
            "news": {}
        }

        # 天气信息
        try:
            report_data["weather"] = self.weather_collector.collect("Seoul")
        except Exception as e:
            logger.error(f"天气收集失败: {e}")

        # 新闻收集(关注关键词)
        keywords = ["人工智能", "Python", "IT"]
        for keyword in keywords:
            try:
                report_data["news"][keyword] = self.news_collector.collect(keyword, 5)
            except Exception as e:
                logger.error(f"新闻收集失败 ({keyword}): {e}")

        # 保存报告
        report_path = REPORT_DIR / f"report_{datetime.now().strftime('%Y%m%d')}.json"
        with open(report_path, 'w', encoding='utf-8') as f:
            json.dump(report_data, f, ensure_ascii=False, indent=2)

        logger.info(f"报告已保存: {report_path}")
        return report_data

    def format_report_html(self, report_data):
        """生成HTML格式报告"""
        html = f"""
        
        
        
            
            
        
        
            

每日工作报告 - {report_data['date']}

""" # 天气信息 if report_data.get('weather'): w = report_data['weather'] html += f"""

今日天气 ({w['city']})

温度: {w['temperature']}C / 湿度: {w['humidity']}%

天气: {w['description']} / 风速: {w['wind_speed']}m/s

""" # 新闻 for keyword, news_list in report_data.get('news', {}).items(): html += f"

{keyword}相关新闻

" for news in news_list[:5]: title = news.get('title', '').replace('', '').replace('', '') link = news.get('link', '#') pubDate = news.get('pubDate', '') html += f"""
{pubDate}
""" html += """

此报告由系统自动生成。

""" return html class EmailSender: """邮件发送器""" def __init__(self): self.smtp_server = os.getenv("SMTP_SERVER", "smtp.gmail.com") self.smtp_port = int(os.getenv("SMTP_PORT", 587)) self.sender_email = os.getenv("SENDER_EMAIL") self.sender_password = os.getenv("SENDER_PASSWORD") @retry(max_attempts=2) def send(self, to_email, subject, html_content): """发送邮件""" if not self.sender_email or not self.sender_password: logger.warning("邮件设置未完成。") return False msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = self.sender_email msg['To'] = to_email html_part = MIMEText(html_content, 'html', 'utf-8') msg.attach(html_part) try: with smtplib.SMTP(self.smtp_server, self.smtp_port) as server: server.starttls() server.login(self.sender_email, self.sender_password) server.sendmail(self.sender_email, to_email, msg.as_string()) logger.info(f"邮件发送完成: {to_email}") return True except Exception as e: logger.error(f"邮件发送失败: {e}") raise class DailyAutomation: """每日自动化主类""" def __init__(self): self.report_generator = ReportGenerator() self.email_sender = EmailSender() self.scheduler = BlockingScheduler(timezone='Asia/Seoul') def morning_routine(self): """早间例程""" logger.info("=" * 50) logger.info("早间例程开始") try: # 生成报告 report_data = self.report_generator.generate_daily_report() # 转换为HTML html_content = self.report_generator.format_report_html(report_data) # 发送邮件 recipient = os.getenv("REPORT_RECIPIENT") if recipient: self.email_sender.send( recipient, f"[每日报告] {report_data['date']}", html_content ) logger.info("早间例程完成") except Exception as e: logger.error(f"早间例程失败: {e}") def evening_routine(self): """晚间例程""" logger.info("=" * 50) logger.info("晚间例程开始") try: # 日志清理、数据备份等 self._cleanup_old_files() logger.info("晚间例程完成") except Exception as e: logger.error(f"晚间例程失败: {e}") def _cleanup_old_files(self, days=30): """清理旧文件""" import time cutoff = time.time() - (days * 86400) for dir_path in [LOG_DIR, REPORT_DIR]: for file_path in dir_path.glob("*"): if file_path.stat().st_mtime < cutoff: file_path.unlink() logger.info(f"已删除旧文件: {file_path}") def setup_schedule(self): """设置调度""" # 每天上午8点早间例程 self.scheduler.add_job( self.morning_routine, CronTrigger(hour=8, minute=0), id='morning_routine', replace_existing=True ) # 每天下午6点晚间例程 self.scheduler.add_job( self.evening_routine, CronTrigger(hour=18, minute=0), id='evening_routine', replace_existing=True ) logger.info("调度设置完成") logger.info("已注册的任务:") for job in self.scheduler.get_jobs(): logger.info(f" - {job.id}: 下次执行 {job.next_run_time}") def run(self): """运行自动化系统""" logger.info("每日工作自动化系统启动") self.setup_schedule() try: self.scheduler.start() except KeyboardInterrupt: logger.info("系统终止请求") self.scheduler.shutdown() logger.info("系统终止完成") # 主程序 if __name__ == "__main__": import sys automation = DailyAutomation() if len(sys.argv) > 1: if sys.argv[1] == "test": # 测试运行 print("测试模式:立即执行早间例程") automation.morning_routine() elif sys.argv[1] == "schedule": # 运行调度器 automation.run() else: print("使用方法:") print(" python automation.py test # 测试运行") print(" python automation.py schedule # 启动调度器") print() print(".env文件示例:") print(" NAVER_CLIENT_ID=your_id") print(" NAVER_CLIENT_SECRET=your_secret") print(" OPENWEATHERMAP_API_KEY=your_key") print(" SMTP_SERVER=smtp.gmail.com") print(" SMTP_PORT=587") print(" SENDER_EMAIL=your@email.com") print(" SENDER_PASSWORD=your_app_password") print(" REPORT_RECIPIENT=recipient@email.com")

总结

本篇结束了Python自动化大师系列。在8篇文章中,我们学习了使用Python进行各种自动化的技术。

本系列涵盖的内容总结:

  • 1-2篇:Python基础和文件/文件夹自动化
  • 3-4篇:Excel自动化和网页爬虫
  • 5-6篇:邮件自动化和数据库集成
  • 7篇:API应用和数据收集
  • 8篇:定时调度和实战项目

自动化的核心是用代码替代重复性工作,从而节省时间、减少人为错误,让我们能够专注于更有价值的工作。结合本系列所学的技术,几乎可以自动化任何类型的工作。

在实际构建自动化系统时,请始终记住以下原则:

自动化系统构建核心原则
1. 渐进式构建:从小处开始,逐步扩展。
2. 完善日志:记录日志以便在出现问题时查找原因。
3. 错误处理:确保系统在遇到意外错误时不会停止。
4. 监控:定期检查系统是否正常运行。
5. 配置分离:将环境配置与代码分离。

Python自动化具有无限的可能性。希望您基于本系列所学内容构建自己的自动化系统,从重复性工作中解放出来,将时间投入到更有创意和价值的工作中。