Python自动化大师第8篇:定时调度与实战项目
Python Automation Master Part 8: Scheduling and Real-World Projects
引言:自动化的完成,定时调度
到目前为止学习的文件处理、网页爬虫、API集成等技术,真正变得有用的关键就是"定时调度"。按照指定时间自动执行的脚本可以完全替代手工操作,24小时不间断地执行任务。
在这最后一篇中,我们将学习Python中调度和执行任务的各种方法,以及构建稳定自动化系统所需的日志记录、错误处理、配置管理技术。最后,我们将构建一个综合本系列所学内容的实战项目。
1. 时间处理:time和datetime模块
1.1 time模块基础
import time
# 当前时间 (Unix时间戳)
timestamp = time.time()
print(f"当前时间戳: {timestamp}") # 例: 1737529200.123456
# 程序暂停
print("等待3秒...")
time.sleep(3)
print("等待完成!")
# 执行时间测量
start_time = time.time()
# 执行操作
for i in range(1000000):
pass
end_time = time.time()
print(f"执行时间: {end_time - start_time:.4f}秒")
# 更精确的时间测量
start = time.perf_counter()
# 执行操作
time.sleep(0.1)
end = time.perf_counter()
print(f"精确测量: {end - start:.6f}秒")
# 结构化时间
local_time = time.localtime()
print(f"当前时间: {local_time.tm_year}年{local_time.tm_mon}月{local_time.tm_mday}日")
print(f"时间: {local_time.tm_hour}:{local_time.tm_min}:{local_time.tm_sec}")
# 转换为字符串
formatted = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
print(f"格式化时间: {formatted}")
1.2 datetime模块应用
from datetime import datetime, date, time, timedelta
import pytz # pip install pytz
# 当前日期和时间
now = datetime.now()
print(f"当前: {now}")
print(f"日期: {now.date()}")
print(f"时间: {now.time()}")
# 创建特定日期/时间
specific_date = datetime(2026, 1, 22, 9, 30, 0)
print(f"特定时间: {specific_date}")
# 日期格式化
formatted = now.strftime("%Y年%m月%d日 %H时%M分%S秒")
print(f"中文格式: {formatted}")
# 字符串解析
date_str = "2026-01-22 14:30:00"
parsed = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
print(f"解析后日期: {parsed}")
# 日期运算
tomorrow = now + timedelta(days=1)
next_week = now + timedelta(weeks=1)
two_hours_later = now + timedelta(hours=2)
print(f"明天: {tomorrow.date()}")
print(f"下周: {next_week.date()}")
print(f"2小时后: {two_hours_later.time()}")
# 两个日期之间的差异
date1 = datetime(2026, 12, 31)
date2 = datetime.now()
diff = date1 - date2
print(f"距离2026年末还有{diff.days}天")
# 时区处理
kst = pytz.timezone('Asia/Seoul')
utc = pytz.UTC
now_kst = datetime.now(kst)
now_utc = datetime.now(utc)
print(f"韩国时间: {now_kst}")
print(f"UTC时间: {now_utc}")
# UTC转换为韩国时间
utc_time = datetime(2026, 1, 22, 0, 0, 0, tzinfo=utc)
korea_time = utc_time.astimezone(kst)
print(f"UTC 00:00 = 韩国 {korea_time.strftime('%H:%M')}")
1.3 实用日期工具函数
from datetime import datetime, timedelta
import calendar
def get_weekday_name(date_obj, lang='zh'):
"""返回星期名称"""
weekdays_zh = ['星期一', '星期二', '星期三', '星期四', '星期五', '星期六', '星期日']
weekdays_en = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
weekdays = weekdays_zh if lang == 'zh' else weekdays_en
return weekdays[date_obj.weekday()]
def get_month_range(year, month):
"""返回该月的第一天和最后一天"""
first_day = datetime(year, month, 1)
last_day_num = calendar.monthrange(year, month)[1]
last_day = datetime(year, month, last_day_num)
return first_day, last_day
def get_business_days(start_date, end_date):
"""计算两个日期之间的工作日"""
business_days = 0
current = start_date
while current <= end_date:
if current.weekday() < 5: # 周一至周五
business_days += 1
current += timedelta(days=1)
return business_days
def is_business_hour(check_time=None, start_hour=9, end_hour=18):
"""检查是否为工作时间"""
if check_time is None:
check_time = datetime.now()
# 周末检查
if check_time.weekday() >= 5:
return False
# 时间检查
hour = check_time.hour
return start_hour <= hour < end_hour
# 使用示例
today = datetime.now()
print(f"今天是{get_weekday_name(today)}。")
first, last = get_month_range(2026, 1)
print(f"2026年1月: {first.date()} ~ {last.date()}")
days = get_business_days(datetime(2026, 1, 1), datetime(2026, 1, 31))
print(f"2026年1月工作日: {days}天")
print(f"现在是工作时间: {is_business_hour()}")
2. schedule库
2.1 基本用法
import schedule
import time
# pip install schedule
def job():
print(f"[{time.strftime('%H:%M:%S')}] 任务执行!")
def morning_report():
print("正在生成早间报告。")
def backup_data():
print("正在备份数据。")
# 各种调度模式
schedule.every(10).seconds.do(job) # 每10秒
schedule.every(5).minutes.do(job) # 每5分钟
schedule.every().hour.do(job) # 每小时
schedule.every().day.at("09:00").do(morning_report) # 每天上午9点
schedule.every().monday.do(job) # 每周一
schedule.every().wednesday.at("13:15").do(job) # 每周三13:15
# 仅在特定时间执行
schedule.every().day.at("00:00").do(backup_data) # 午夜备份
# 限制执行次数
schedule.every().second.do(job).tag('limited')
# 主循环
print("调度器启动...")
while True:
schedule.run_pending()
time.sleep(1)
2.2 高级schedule模式
import schedule
import time
from functools import partial
# 带参数的任务
def greet(name):
print(f"你好,{name}!")
# 使用partial传递参数
schedule.every().day.at("08:00").do(partial(greet, "张三"))
# 使用lambda传递参数
schedule.every().day.at("08:30").do(lambda: greet("李四"))
# 使用标签管理任务
def data_sync():
print("正在同步数据...")
def send_notification():
print("正在发送通知...")
schedule.every(30).minutes.do(data_sync).tag('sync', 'database')
schedule.every().hour.do(send_notification).tag('notification')
# 取消特定标签的任务
# schedule.clear('sync')
# 查询特定标签的任务
sync_jobs = schedule.get_jobs('sync')
print(f"同步任务数量: {len(sync_jobs)}")
# 取消任务(仅执行一次)
def run_once():
print("仅执行一次的任务")
return schedule.CancelJob
schedule.every().second.do(run_once)
# 检查下次执行时间
next_run = schedule.next_run()
print(f"下次任务执行时间: {next_run}")
# 检查等待时间
idle_seconds = schedule.idle_seconds()
print(f"距离下次任务还有{idle_seconds}秒")
# 运行调度器
while True:
schedule.run_pending()
time.sleep(1)
2.3 包含异常处理的调度器
import schedule
import time
import traceback
from functools import wraps
def catch_exceptions(job_func):
"""捕获异常并记录的装饰器"""
@wraps(job_func)
def wrapper(*args, **kwargs):
try:
return job_func(*args, **kwargs)
except Exception as e:
print(f"任务执行中发生错误: {e}")
traceback.print_exc()
# 即使发生错误,调度器也会继续运行
return None
return wrapper
@catch_exceptions
def risky_job():
"""可能发生错误的任务"""
import random
if random.random() < 0.3:
raise Exception("随机错误发生!")
print("任务成功!")
schedule.every(5).seconds.do(risky_job)
# 安全的调度器循环
def run_scheduler():
while True:
try:
schedule.run_pending()
except Exception as e:
print(f"调度器错误: {e}")
time.sleep(1)
run_scheduler()
3. APScheduler库
3.1 APScheduler介绍及安装
# pip install apscheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
from datetime import datetime, timedelta
def my_job(message="默认消息"):
print(f"[{datetime.now().strftime('%H:%M:%S')}] {message}")
# BlockingScheduler - 在主线程中运行(阻塞)
scheduler = BlockingScheduler()
# BackgroundScheduler - 在后台运行(非阻塞)
# scheduler = BackgroundScheduler()
# Interval触发器 - 按固定间隔执行
scheduler.add_job(
my_job,
IntervalTrigger(seconds=10),
args=["每10秒执行"],
id='interval_job'
)
# Cron触发器 - 使用cron表达式
scheduler.add_job(
my_job,
CronTrigger(hour=9, minute=0),
args=["每天上午9点"],
id='daily_job'
)
# Date触发器 - 在特定日期/时间执行一次
scheduler.add_job(
my_job,
DateTrigger(run_date=datetime.now() + timedelta(seconds=5)),
args=["5秒后仅执行一次"],
id='one_time_job'
)
# 启动调度器
try:
print("APScheduler启动...")
scheduler.start()
except KeyboardInterrupt:
print("调度器终止")
scheduler.shutdown()
3.2 Cron表达式应用
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
scheduler = BlockingScheduler()
def job(name):
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {name} 执行")
# Cron表达式模式
# ┌───────────── 分 (0 - 59)
# │ ┌───────────── 时 (0 - 23)
# │ │ ┌───────────── 日 (1 - 31)
# │ │ │ ┌───────────── 月 (1 - 12)
# │ │ │ │ ┌───────────── 星期 (0 - 6, 0=星期一)
# │ │ │ │ │
# * * * * *
# 每分钟执行
scheduler.add_job(job, CronTrigger(minute='*'), args=["每分钟"])
# 每小时30分执行
scheduler.add_job(job, CronTrigger(minute=30), args=["每小时30分"])
# 每天上午9点、下午6点执行
scheduler.add_job(job, CronTrigger(hour='9,18', minute=0), args=["上午9点,下午6点"])
# 工作日上午9点执行(周一至周五)
scheduler.add_job(job, CronTrigger(day_of_week='mon-fri', hour=9, minute=0), args=["工作日9点"])
# 每月1日午夜执行
scheduler.add_job(job, CronTrigger(day=1, hour=0, minute=0), args=["每月1日"])
# 每周一上午8点30分
scheduler.add_job(job, CronTrigger(day_of_week='mon', hour=8, minute=30), args=["周一8:30"])
# 每15分钟执行
scheduler.add_job(job, CronTrigger(minute='*/15'), args=["每15分钟"])
# 工作时间(9-18点)每30分钟
scheduler.add_job(
job,
CronTrigger(hour='9-17', minute='0,30'),
args=["工作时间每30分钟"]
)
# 以字符串形式传递cron表达式
scheduler.add_job(
job,
CronTrigger.from_crontab('0 9 * * 1-5'), # 工作日9点
args=["crontab格式"]
)
scheduler.start()
3.3 任务存储和事件监听器
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from datetime import datetime
import time
# 任务存储设置(使用SQLite)
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
# 执行器设置
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
# 默认设置
job_defaults = {
'coalesce': False, # 是否合并错过的任务
'max_instances': 3, # 同时执行的最大实例数
'misfire_grace_time': 60 # 错过任务的允许时间(秒)
}
scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone='Asia/Seoul'
)
# 事件监听器
def job_executed_listener(event):
"""任务完成时调用"""
print(f"任务完成: {event.job_id}")
print(f"执行时间: {event.scheduled_run_time}")
print(f"返回值: {event.retval}")
def job_error_listener(event):
"""任务出错时调用"""
print(f"任务出错: {event.job_id}")
print(f"异常: {event.exception}")
print(f"堆栈跟踪: {event.traceback}")
scheduler.add_listener(job_executed_listener, EVENT_JOB_EXECUTED)
scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR)
# 添加任务
def sample_job():
print(f"任务执行: {datetime.now()}")
return "完成"
scheduler.add_job(
sample_job,
'interval',
seconds=10,
id='sample_job',
replace_existing=True
)
# 启动调度器
scheduler.start()
# 任务管理
print("已注册的任务:")
for job in scheduler.get_jobs():
print(f" - {job.id}: 下次执行 {job.next_run_time}")
# 暂停任务
# scheduler.pause_job('sample_job')
# 恢复任务
# scheduler.resume_job('sample_job')
# 修改任务
# scheduler.reschedule_job('sample_job', trigger='interval', seconds=30)
# 移除任务
# scheduler.remove_job('sample_job')
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
scheduler.shutdown()
4. Windows任务计划程序集成
4.1 通过批处理文件运行Python脚本
@echo off
REM run_python_script.bat
REM 激活Python虚拟环境(可选)
call C:\path\to\venv\Scripts\activate.bat
REM 运行Python脚本
python C:\path\to\your_script.py
REM 记录日志
echo %date% %time% - Script executed >> C:\logs\execution.log
REM 停用虚拟环境
deactivate
4.2 使用Python控制Windows任务计划程序
import subprocess
import os
def create_scheduled_task(
task_name,
python_script,
schedule_type="DAILY",
start_time="09:00",
python_path=None
):
"""在Windows任务计划程序中注册任务"""
if python_path is None:
python_path = os.sys.executable
# 创建批处理文件
batch_content = f'''@echo off
"{python_path}" "{python_script}"
'''
batch_path = python_script.replace('.py', '.bat')
with open(batch_path, 'w') as f:
f.write(batch_content)
# 使用schtasks命令注册任务
cmd = [
'schtasks', '/create',
'/tn', task_name, # 任务名称
'/tr', batch_path, # 要运行的程序
'/sc', schedule_type, # 调度类型(DAILY, WEEKLY, MONTHLY等)
'/st', start_time, # 开始时间
'/f' # 覆盖现有任务
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
if result.returncode == 0:
print(f"任务'{task_name}'已成功注册。")
else:
print(f"错误: {result.stderr}")
except Exception as e:
print(f"任务注册失败: {e}")
def delete_scheduled_task(task_name):
"""删除已注册的任务"""
cmd = ['schtasks', '/delete', '/tn', task_name, '/f']
try:
result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
if result.returncode == 0:
print(f"任务'{task_name}'已删除。")
else:
print(f"错误: {result.stderr}")
except Exception as e:
print(f"任务删除失败: {e}")
def list_scheduled_tasks(filter_name=None):
"""查询已注册的任务列表"""
cmd = ['schtasks', '/query', '/fo', 'LIST']
if filter_name:
cmd.extend(['/tn', filter_name])
try:
result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
print(result.stdout)
except Exception as e:
print(f"列表查询失败: {e}")
# 使用示例
# create_scheduled_task(
# "MyPythonTask",
# r"C:\scripts\daily_report.py",
# schedule_type="DAILY",
# start_time="09:00"
# )
5. Linux cron集成
5.1 crontab基本用法
# 编辑crontab
crontab -e
# 查看crontab列表
crontab -l
# 删除crontab
crontab -r
# cron表达式
# ┌───────────── 分 (0 - 59)
# │ ┌───────────── 时 (0 - 23)
# │ │ ┌───────────── 日 (1 - 31)
# │ │ │ ┌───────────── 月 (1 - 12)
# │ │ │ │ ┌───────────── 星期 (0 - 7, 0和7都是星期日)
# │ │ │ │ │
# * * * * * 命令
# 示例:
# 每分钟执行
* * * * * /usr/bin/python3 /home/user/script.py
# 每天上午9点执行
0 9 * * * /usr/bin/python3 /home/user/daily_report.py
# 每周一上午8点30分
30 8 * * 1 /usr/bin/python3 /home/user/weekly_report.py
# 每5分钟执行
*/5 * * * * /usr/bin/python3 /home/user/check_status.py
# 工作日工作时间(9-18点)每小时执行
0 9-18 * * 1-5 /usr/bin/python3 /home/user/hourly_check.py
# 日志输出重定向
0 9 * * * /usr/bin/python3 /home/user/script.py >> /home/user/logs/cron.log 2>&1
5.2 使用Python管理crontab
# pip install python-crontab
from crontab import CronTab
import os
class CronManager:
"""crontab管理类"""
def __init__(self, user=None):
self.cron = CronTab(user=user or os.getlogin())
def add_job(self, command, schedule, comment=None):
"""添加新任务"""
job = self.cron.new(command=command, comment=comment)
job.setall(schedule)
self.cron.write()
print(f"任务已添加: {comment or command}")
def remove_job(self, comment):
"""通过注释查找并删除任务"""
removed = self.cron.remove_all(comment=comment)
self.cron.write()
print(f"{removed}个任务已删除")
def list_jobs(self):
"""列出所有任务"""
for job in self.cron:
print(f"{job.slices} - {job.command} ({job.comment})")
def enable_job(self, comment):
"""启用任务"""
for job in self.cron.find_comment(comment):
job.enable()
self.cron.write()
def disable_job(self, comment):
"""禁用任务"""
for job in self.cron.find_comment(comment):
job.enable(False)
self.cron.write()
# 使用示例(仅在Linux上运行)
# manager = CronManager()
#
# # 每天上午9点运行Python脚本
# manager.add_job(
# '/usr/bin/python3 /home/user/daily_report.py',
# '0 9 * * *',
# comment='daily_report'
# )
#
# # 查看任务列表
# manager.list_jobs()
6. 日志记录(logging模块)
6.1 基本日志设置
import logging
# 基本设置
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 创建日志记录器
logger = logging.getLogger(__name__)
# 按级别记录消息
logger.debug("调试消息 - 详细诊断信息")
logger.info("信息消息 - 正常运行确认")
logger.warning("警告消息 - 需要注意的情况")
logger.error("错误消息 - 功能失败")
logger.critical("严重错误 - 程序无法运行")
# 异常日志
try:
result = 1 / 0
except Exception as e:
logger.exception("异常发生!") # 包含堆栈跟踪
6.2 文件日志和日志轮转
import logging
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
from pathlib import Path
def setup_logger(name, log_dir="logs", level=logging.DEBUG):
"""日志记录器设置函数"""
log_path = Path(log_dir)
log_path.mkdir(parents=True, exist_ok=True)
logger = logging.getLogger(name)
logger.setLevel(level)
# 格式化器
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
# 文件处理器(基于大小的轮转)
file_handler = RotatingFileHandler(
log_path / f"{name}.log",
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
# 错误专用文件处理器
error_handler = RotatingFileHandler(
log_path / f"{name}_error.log",
maxBytes=10*1024*1024,
backupCount=5,
encoding='utf-8'
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(formatter)
# 基于时间的轮转(每天午夜)
daily_handler = TimedRotatingFileHandler(
log_path / f"{name}_daily.log",
when='midnight',
interval=1,
backupCount=30,
encoding='utf-8'
)
daily_handler.setLevel(logging.INFO)
daily_handler.setFormatter(formatter)
# 添加处理器
logger.addHandler(console_handler)
logger.addHandler(file_handler)
logger.addHandler(error_handler)
logger.addHandler(daily_handler)
return logger
# 使用示例
logger = setup_logger("automation")
logger.debug("调试消息")
logger.info("信息消息")
logger.warning("警告消息")
logger.error("错误消息")
6.3 结构化日志(JSON日志)
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
"""JSON格式日志格式化器"""
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
# 添加异常信息
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
# 附加字段
if hasattr(record, 'extra_data'):
log_data["extra"] = record.extra_data
return json.dumps(log_data, ensure_ascii=False)
class ContextLogger:
"""包含上下文信息的日志记录器"""
def __init__(self, logger):
self.logger = logger
self.context = {}
def set_context(self, **kwargs):
"""设置上下文"""
self.context.update(kwargs)
def clear_context(self):
"""清除上下文"""
self.context = {}
def _log(self, level, message, **kwargs):
extra_data = {**self.context, **kwargs}
record = self.logger.makeRecord(
self.logger.name, level, "", 0, message, (), None
)
record.extra_data = extra_data
self.logger.handle(record)
def info(self, message, **kwargs):
self._log(logging.INFO, message, **kwargs)
def error(self, message, **kwargs):
self._log(logging.ERROR, message, **kwargs)
# 设置
logger = logging.getLogger("json_logger")
handler = logging.FileHandler("app.json.log")
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
# 使用
ctx_logger = ContextLogger(logger)
ctx_logger.set_context(user_id="user123", session_id="sess456")
ctx_logger.info("用户登录", ip="192.168.1.1")
ctx_logger.info("数据查询", query="SELECT * FROM users")
7. 错误处理和重试逻辑
7.1 重试装饰器
import time
import logging
from functools import wraps
import random
logger = logging.getLogger(__name__)
def retry(max_attempts=3, delay=1, backoff=2, exceptions=(Exception,)):
"""
重试装饰器
Args:
max_attempts: 最大尝试次数
delay: 初始等待时间(秒)
backoff: 等待时间增长倍数
exceptions: 要重试的异常元组
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
current_delay = delay
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
if attempt == max_attempts:
logger.error(f"{func.__name__} 最终失败: {e}")
raise
logger.warning(
f"{func.__name__} 失败 (尝试 {attempt}/{max_attempts}): {e}"
)
logger.info(f"{current_delay}秒后重试...")
time.sleep(current_delay)
current_delay *= backoff
return None
return wrapper
return decorator
# 使用示例
@retry(max_attempts=3, delay=1, backoff=2, exceptions=(ConnectionError, TimeoutError))
def fetch_data(url):
"""网络请求(可能失败)"""
if random.random() < 0.7: # 70%概率失败
raise ConnectionError("连接失败")
return "数据"
# 测试
try:
result = fetch_data("https://example.com")
print(f"结果: {result}")
except ConnectionError:
print("所有重试失败")
7.2 tenacity库应用
# pip install tenacity
from tenacity import (
retry,
stop_after_attempt,
stop_after_delay,
wait_fixed,
wait_exponential,
wait_random,
retry_if_exception_type,
before_sleep_log,
after_log
)
import logging
logger = logging.getLogger(__name__)
# 3次尝试,固定2秒等待
@retry(
stop=stop_after_attempt(3),
wait=wait_fixed(2)
)
def simple_retry():
print("尝试中...")
raise Exception("失败!")
# 指数退避(1, 2, 4, 8... 秒)
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=1, max=60)
)
def exponential_retry():
print("尝试中...")
raise Exception("失败!")
# 仅重试特定异常
@retry(
retry=retry_if_exception_type((ConnectionError, TimeoutError)),
stop=stop_after_attempt(3)
)
def selective_retry():
raise ValueError("此异常不重试")
# 带日志
@retry(
stop=stop_after_attempt(3),
wait=wait_fixed(1),
before_sleep=before_sleep_log(logger, logging.WARNING),
after=after_log(logger, logging.INFO)
)
def logged_retry():
raise Exception("失败!")
# 30秒超时
@retry(
stop=stop_after_delay(30),
wait=wait_fixed(2)
)
def timeout_retry():
raise Exception("失败!")
# 复合条件
@retry(
stop=(stop_after_attempt(5) | stop_after_delay(30)),
wait=wait_exponential(multiplier=1, max=10) + wait_random(0, 2)
)
def complex_retry():
raise Exception("失败!")
7.3 Circuit Breaker模式
import time
from enum import Enum
from threading import Lock
class CircuitState(Enum):
CLOSED = "closed" # 正常状态
OPEN = "open" # 断开状态
HALF_OPEN = "half_open" # 测试状态
class CircuitBreaker:
"""
Circuit Breaker模式实现
连续失败时,在一定时间内阻止请求,
防止系统过载。
"""
def __init__(
self,
failure_threshold=5,
recovery_timeout=30,
expected_exception=Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.lock = Lock()
def __call__(self, func):
def wrapper(*args, **kwargs):
return self.call(func, *args, **kwargs)
return wrapper
def call(self, func, *args, **kwargs):
with self.lock:
if self.state == CircuitState.OPEN:
if self._should_try_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpen(
f"电路断开中。{self._time_until_reset()}秒后可重试"
)
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _on_success(self):
with self.lock:
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def _should_try_reset(self):
return time.time() - self.last_failure_time >= self.recovery_timeout
def _time_until_reset(self):
elapsed = time.time() - self.last_failure_time
return max(0, int(self.recovery_timeout - elapsed))
class CircuitBreakerOpen(Exception):
pass
# 使用示例
circuit_breaker = CircuitBreaker(
failure_threshold=3,
recovery_timeout=10
)
@circuit_breaker
def unreliable_api():
import random
if random.random() < 0.8:
raise ConnectionError("API连接失败")
return "成功"
# 测试
for i in range(10):
try:
result = unreliable_api()
print(f"尝试 {i+1}: {result}")
except CircuitBreakerOpen as e:
print(f"尝试 {i+1}: 电路断开 - {e}")
except ConnectionError as e:
print(f"尝试 {i+1}: 连接错误 - {e}")
time.sleep(1)
8. 配置文件管理
8.1 使用configparser
import configparser
from pathlib import Path
# config.ini文件示例:
"""
[DEFAULT]
debug = false
log_level = INFO
[database]
host = localhost
port = 5432
name = mydb
user = admin
password = secret123
[api]
base_url = https://api.example.com
timeout = 30
max_retries = 3
[email]
smtp_server = smtp.gmail.com
smtp_port = 587
sender = noreply@example.com
"""
class Config:
"""配置文件管理类"""
def __init__(self, config_path="config.ini"):
self.config = configparser.ConfigParser()
self.config_path = Path(config_path)
if self.config_path.exists():
self.config.read(config_path, encoding='utf-8')
else:
self._create_default_config()
def _create_default_config(self):
"""创建默认配置文件"""
self.config['DEFAULT'] = {
'debug': 'false',
'log_level': 'INFO'
}
self.config['database'] = {
'host': 'localhost',
'port': '5432',
'name': 'mydb'
}
self.config['api'] = {
'base_url': 'https://api.example.com',
'timeout': '30'
}
self.save()
def get(self, section, key, fallback=None):
"""获取配置值"""
return self.config.get(section, key, fallback=fallback)
def getint(self, section, key, fallback=0):
"""获取整数配置值"""
return self.config.getint(section, key, fallback=fallback)
def getboolean(self, section, key, fallback=False):
"""获取布尔配置值"""
return self.config.getboolean(section, key, fallback=fallback)
def set(self, section, key, value):
"""保存配置值"""
if section not in self.config:
self.config[section] = {}
self.config[section][key] = str(value)
def save(self):
"""保存配置文件"""
with open(self.config_path, 'w', encoding='utf-8') as f:
self.config.write(f)
@property
def database(self):
"""数据库配置字典"""
return dict(self.config['database'])
@property
def api(self):
"""API配置字典"""
return dict(self.config['api'])
# 使用示例
config = Config()
print(f"DB主机: {config.get('database', 'host')}")
print(f"API超时: {config.getint('api', 'timeout')}")
print(f"调试模式: {config.getboolean('DEFAULT', 'debug')}")
8.2 使用python-dotenv
# pip install python-dotenv
from dotenv import load_dotenv
import os
from pathlib import Path
# .env文件示例:
"""
# 数据库设置
DB_HOST=localhost
DB_PORT=5432
DB_NAME=mydb
DB_USER=admin
DB_PASSWORD=secret123
# API密钥
OPENAI_API_KEY=sk-xxxxxxxxxxxxx
NAVER_CLIENT_ID=your_client_id
NAVER_CLIENT_SECRET=your_secret
# 环境
ENVIRONMENT=development
DEBUG=true
"""
class EnvConfig:
"""基于环境变量的配置管理"""
def __init__(self, env_file=".env"):
# 加载.env文件
env_path = Path(env_file)
load_dotenv(env_path)
@staticmethod
def get(key, default=None):
"""获取环境变量"""
return os.getenv(key, default)
@staticmethod
def get_bool(key, default=False):
"""布尔环境变量"""
value = os.getenv(key, str(default)).lower()
return value in ('true', '1', 'yes', 'on')
@staticmethod
def get_int(key, default=0):
"""整数环境变量"""
try:
return int(os.getenv(key, default))
except ValueError:
return default
@staticmethod
def get_list(key, default=None, separator=','):
"""列表环境变量"""
value = os.getenv(key)
if value is None:
return default or []
return [item.strip() for item in value.split(separator)]
@property
def is_debug(self):
return self.get_bool('DEBUG')
@property
def environment(self):
return self.get('ENVIRONMENT', 'development')
@property
def database_url(self):
"""生成数据库连接URL"""
host = self.get('DB_HOST', 'localhost')
port = self.get('DB_PORT', '5432')
name = self.get('DB_NAME', 'mydb')
user = self.get('DB_USER')
password = self.get('DB_PASSWORD')
if user and password:
return f"postgresql://{user}:{password}@{host}:{port}/{name}"
return f"postgresql://{host}:{port}/{name}"
# 按环境加载配置文件
def load_env_for_environment():
"""根据环境加载配置文件"""
env = os.getenv('ENVIRONMENT', 'development')
# 加载基本.env
load_dotenv('.env')
# 加载环境特定的.env(如果存在则覆盖)
env_file = f'.env.{env}'
if Path(env_file).exists():
load_dotenv(env_file, override=True)
# 使用示例
config = EnvConfig()
print(f"环境: {config.environment}")
print(f"调试: {config.is_debug}")
print(f"DB URL: {config.database_url}")
print(f"OpenAI API Key: {config.get('OPENAI_API_KEY', 'Not set')[:20]}...")
9. 后台运行
9.1 守护进程(Linux)
import os
import sys
import time
import signal
import atexit
from pathlib import Path
class Daemon:
"""Unix守护进程"""
def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
self.pidfile = pidfile
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
def daemonize(self):
"""守护进程化(双重fork)"""
# 第一次fork
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError as e:
sys.stderr.write(f"Fork #1 失败: {e}\n")
sys.exit(1)
# 成为会话领导者
os.chdir("/")
os.setsid()
os.umask(0)
# 第二次fork
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError as e:
sys.stderr.write(f"Fork #2 失败: {e}\n")
sys.exit(1)
# 重定向标准输入输出
sys.stdout.flush()
sys.stderr.flush()
with open(self.stdin, 'r') as si:
os.dup2(si.fileno(), sys.stdin.fileno())
with open(self.stdout, 'a+') as so:
os.dup2(so.fileno(), sys.stdout.fileno())
with open(self.stderr, 'a+') as se:
os.dup2(se.fileno(), sys.stderr.fileno())
# 创建PID文件
atexit.register(self.delpid)
pid = str(os.getpid())
with open(self.pidfile, 'w+') as f:
f.write(f"{pid}\n")
def delpid(self):
"""删除PID文件"""
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
def start(self):
"""启动守护进程"""
# 检查PID文件
try:
with open(self.pidfile, 'r') as f:
pid = int(f.read().strip())
except (IOError, ValueError):
pid = None
if pid:
sys.stderr.write(f"PID文件 {self.pidfile} 已存在。守护进程是否已运行?\n")
sys.exit(1)
self.daemonize()
self.run()
def stop(self):
"""停止守护进程"""
try:
with open(self.pidfile, 'r') as f:
pid = int(f.read().strip())
except (IOError, ValueError):
pid = None
if not pid:
sys.stderr.write("PID文件不存在。守护进程未运行。\n")
return
try:
while True:
os.kill(pid, signal.SIGTERM)
time.sleep(0.1)
except OSError as e:
if str(e).find("No such process") > 0:
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
else:
print(str(e))
sys.exit(1)
def restart(self):
"""重启守护进程"""
self.stop()
self.start()
def run(self):
"""主要工作(在子类中实现)"""
raise NotImplementedError
# 使用示例
class MyDaemon(Daemon):
def run(self):
while True:
# 执行实际工作
with open('/tmp/daemon.log', 'a') as f:
f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - 守护进程运行中\n")
time.sleep(60)
# if __name__ == "__main__":
# daemon = MyDaemon('/tmp/mydaemon.pid')
# if len(sys.argv) == 2:
# if sys.argv[1] == 'start':
# daemon.start()
# elif sys.argv[1] == 'stop':
# daemon.stop()
# elif sys.argv[1] == 'restart':
# daemon.restart()
9.2 Windows服务
# pip install pywin32
import win32serviceutil
import win32service
import win32event
import servicemanager
import socket
import time
import logging
class MyWindowsService(win32serviceutil.ServiceFramework):
"""Windows服务类"""
_svc_name_ = 'MyPythonService'
_svc_display_name_ = 'My Python Service'
_svc_description_ = '使用Python创建的Windows服务。'
def __init__(self, args):
win32serviceutil.ServiceFramework.__init__(self, args)
self.stop_event = win32event.CreateEvent(None, 0, 0, None)
self.running = True
# 日志设置
logging.basicConfig(
filename='C:\\logs\\myservice.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def SvcStop(self):
"""停止服务"""
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
win32event.SetEvent(self.stop_event)
self.running = False
logging.info('服务停止请求')
def SvcDoRun(self):
"""服务主循环"""
servicemanager.LogMsg(
servicemanager.EVENTLOG_INFORMATION_TYPE,
servicemanager.PYS_SERVICE_STARTED,
(self._svc_name_, '')
)
logging.info('服务启动')
self.main()
def main(self):
"""执行实际工作"""
while self.running:
# 在这里实现实际的工作逻辑
logging.info('任务执行中...')
time.sleep(60) # 每分钟执行
if __name__ == '__main__':
win32serviceutil.HandleCommandLine(MyWindowsService)
# 命令:
# python service.py install # 安装服务
# python service.py start # 启动服务
# python service.py stop # 停止服务
# python service.py remove # 删除服务
10. 实战综合项目:每日工作自动化系统
"""
每日工作自动化系统
- 新闻收集和摘要
- 天气信息查询
- 每日报告生成
- 邮件发送
"""
import os
import json
import logging
from datetime import datetime
from pathlib import Path
from dotenv import load_dotenv
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
import requests
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from functools import wraps
import time
# 加载环境变量
load_dotenv()
# 目录设置
BASE_DIR = Path(__file__).parent
LOG_DIR = BASE_DIR / "logs"
DATA_DIR = BASE_DIR / "data"
REPORT_DIR = BASE_DIR / "reports"
for dir_path in [LOG_DIR, DATA_DIR, REPORT_DIR]:
dir_path.mkdir(parents=True, exist_ok=True)
# 日志设置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_DIR / "automation.log", encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def retry(max_attempts=3, delay=5):
"""重试装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
logger.error(f"{func.__name__} 最终失败: {e}")
raise
logger.warning(f"{func.__name__} 失败,重试 {attempt + 1}/{max_attempts}")
time.sleep(delay)
return wrapper
return decorator
class NewsCollector:
"""新闻收集器"""
def __init__(self):
self.client_id = os.getenv("NAVER_CLIENT_ID")
self.client_secret = os.getenv("NAVER_CLIENT_SECRET")
@retry(max_attempts=3)
def collect(self, query, count=10):
"""Naver新闻搜索"""
if not self.client_id or not self.client_secret:
logger.warning("Naver API密钥未设置。")
return []
url = "https://openapi.naver.com/v1/search/news.json"
headers = {
"X-Naver-Client-Id": self.client_id,
"X-Naver-Client-Secret": self.client_secret
}
params = {"query": query, "display": count, "sort": "date"}
response = requests.get(url, headers=headers, params=params, timeout=10)
response.raise_for_status()
items = response.json().get("items", [])
logger.info(f"新闻收集完成 {len(items)}条: {query}")
return items
class WeatherCollector:
"""天气信息收集器"""
def __init__(self):
self.api_key = os.getenv("OPENWEATHERMAP_API_KEY")
@retry(max_attempts=3)
def collect(self, city="Seoul"):
"""天气信息查询"""
if not self.api_key:
logger.warning("OpenWeatherMap API密钥未设置。")
return None
url = "https://api.openweathermap.org/data/2.5/weather"
params = {
"q": city,
"appid": self.api_key,
"units": "metric",
"lang": "zh_cn"
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
weather_info = {
"city": city,
"temperature": data["main"]["temp"],
"humidity": data["main"]["humidity"],
"description": data["weather"][0]["description"],
"wind_speed": data["wind"]["speed"]
}
logger.info(f"天气信息收集完成: {city}")
return weather_info
class ReportGenerator:
"""报告生成器"""
def __init__(self):
self.news_collector = NewsCollector()
self.weather_collector = WeatherCollector()
def generate_daily_report(self):
"""生成每日报告"""
logger.info("开始生成每日报告")
report_data = {
"date": datetime.now().strftime("%Y-%m-%d"),
"generated_at": datetime.now().isoformat(),
"weather": None,
"news": {}
}
# 天气信息
try:
report_data["weather"] = self.weather_collector.collect("Seoul")
except Exception as e:
logger.error(f"天气收集失败: {e}")
# 新闻收集(关注关键词)
keywords = ["人工智能", "Python", "IT"]
for keyword in keywords:
try:
report_data["news"][keyword] = self.news_collector.collect(keyword, 5)
except Exception as e:
logger.error(f"新闻收集失败 ({keyword}): {e}")
# 保存报告
report_path = REPORT_DIR / f"report_{datetime.now().strftime('%Y%m%d')}.json"
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(report_data, f, ensure_ascii=False, indent=2)
logger.info(f"报告已保存: {report_path}")
return report_data
def format_report_html(self, report_data):
"""生成HTML格式报告"""
html = f"""
每日工作报告 - {report_data['date']}
"""
# 天气信息
if report_data.get('weather'):
w = report_data['weather']
html += f"""
今日天气 ({w['city']})
温度: {w['temperature']}C / 湿度: {w['humidity']}%
天气: {w['description']} / 风速: {w['wind_speed']}m/s
"""
# 新闻
for keyword, news_list in report_data.get('news', {}).items():
html += f"{keyword}相关新闻
"
for news in news_list[:5]:
title = news.get('title', '').replace('', '').replace('', '')
link = news.get('link', '#')
pubDate = news.get('pubDate', '')
html += f"""
{pubDate}
"""
html += """
此报告由系统自动生成。
"""
return html
class EmailSender:
"""邮件发送器"""
def __init__(self):
self.smtp_server = os.getenv("SMTP_SERVER", "smtp.gmail.com")
self.smtp_port = int(os.getenv("SMTP_PORT", 587))
self.sender_email = os.getenv("SENDER_EMAIL")
self.sender_password = os.getenv("SENDER_PASSWORD")
@retry(max_attempts=2)
def send(self, to_email, subject, html_content):
"""发送邮件"""
if not self.sender_email or not self.sender_password:
logger.warning("邮件设置未完成。")
return False
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = self.sender_email
msg['To'] = to_email
html_part = MIMEText(html_content, 'html', 'utf-8')
msg.attach(html_part)
try:
with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
server.starttls()
server.login(self.sender_email, self.sender_password)
server.sendmail(self.sender_email, to_email, msg.as_string())
logger.info(f"邮件发送完成: {to_email}")
return True
except Exception as e:
logger.error(f"邮件发送失败: {e}")
raise
class DailyAutomation:
"""每日自动化主类"""
def __init__(self):
self.report_generator = ReportGenerator()
self.email_sender = EmailSender()
self.scheduler = BlockingScheduler(timezone='Asia/Seoul')
def morning_routine(self):
"""早间例程"""
logger.info("=" * 50)
logger.info("早间例程开始")
try:
# 生成报告
report_data = self.report_generator.generate_daily_report()
# 转换为HTML
html_content = self.report_generator.format_report_html(report_data)
# 发送邮件
recipient = os.getenv("REPORT_RECIPIENT")
if recipient:
self.email_sender.send(
recipient,
f"[每日报告] {report_data['date']}",
html_content
)
logger.info("早间例程完成")
except Exception as e:
logger.error(f"早间例程失败: {e}")
def evening_routine(self):
"""晚间例程"""
logger.info("=" * 50)
logger.info("晚间例程开始")
try:
# 日志清理、数据备份等
self._cleanup_old_files()
logger.info("晚间例程完成")
except Exception as e:
logger.error(f"晚间例程失败: {e}")
def _cleanup_old_files(self, days=30):
"""清理旧文件"""
import time
cutoff = time.time() - (days * 86400)
for dir_path in [LOG_DIR, REPORT_DIR]:
for file_path in dir_path.glob("*"):
if file_path.stat().st_mtime < cutoff:
file_path.unlink()
logger.info(f"已删除旧文件: {file_path}")
def setup_schedule(self):
"""设置调度"""
# 每天上午8点早间例程
self.scheduler.add_job(
self.morning_routine,
CronTrigger(hour=8, minute=0),
id='morning_routine',
replace_existing=True
)
# 每天下午6点晚间例程
self.scheduler.add_job(
self.evening_routine,
CronTrigger(hour=18, minute=0),
id='evening_routine',
replace_existing=True
)
logger.info("调度设置完成")
logger.info("已注册的任务:")
for job in self.scheduler.get_jobs():
logger.info(f" - {job.id}: 下次执行 {job.next_run_time}")
def run(self):
"""运行自动化系统"""
logger.info("每日工作自动化系统启动")
self.setup_schedule()
try:
self.scheduler.start()
except KeyboardInterrupt:
logger.info("系统终止请求")
self.scheduler.shutdown()
logger.info("系统终止完成")
# 主程序
if __name__ == "__main__":
import sys
automation = DailyAutomation()
if len(sys.argv) > 1:
if sys.argv[1] == "test":
# 测试运行
print("测试模式:立即执行早间例程")
automation.morning_routine()
elif sys.argv[1] == "schedule":
# 运行调度器
automation.run()
else:
print("使用方法:")
print(" python automation.py test # 测试运行")
print(" python automation.py schedule # 启动调度器")
print()
print(".env文件示例:")
print(" NAVER_CLIENT_ID=your_id")
print(" NAVER_CLIENT_SECRET=your_secret")
print(" OPENWEATHERMAP_API_KEY=your_key")
print(" SMTP_SERVER=smtp.gmail.com")
print(" SMTP_PORT=587")
print(" SENDER_EMAIL=your@email.com")
print(" SENDER_PASSWORD=your_app_password")
print(" REPORT_RECIPIENT=recipient@email.com")
总结
本篇结束了Python自动化大师系列。在8篇文章中,我们学习了使用Python进行各种自动化的技术。
本系列涵盖的内容总结:
- 1-2篇:Python基础和文件/文件夹自动化
- 3-4篇:Excel自动化和网页爬虫
- 5-6篇:邮件自动化和数据库集成
- 7篇:API应用和数据收集
- 8篇:定时调度和实战项目
自动化的核心是用代码替代重复性工作,从而节省时间、减少人为错误,让我们能够专注于更有价值的工作。结合本系列所学的技术,几乎可以自动化任何类型的工作。
在实际构建自动化系统时,请始终记住以下原则:
自动化系统构建核心原则
1. 渐进式构建:从小处开始,逐步扩展。
2. 完善日志:记录日志以便在出现问题时查找原因。
3. 错误处理:确保系统在遇到意外错误时不会停止。
4. 监控:定期检查系统是否正常运行。
5. 配置分离:将环境配置与代码分离。
Python自动化具有无限的可能性。希望您基于本系列所学内容构建自己的自动化系统,从重复性工作中解放出来,将时间投入到更有创意和价值的工作中。