Python自动化大师第7篇:API应用与数据采集
Python Automation Master Part 7: API Integration and Data Collection
前言:API开启的数据世界
现代Web服务的核心是API(应用程序编程接口)。通过API,我们可以以编程方式收集和利用天气信息、股票行情、新闻、社交媒体数据等各种信息。使用Python的requests库,可以非常简单地处理这些API调用。
在本篇中,我们将从REST API的基本概念开始,系统地学习各种认证方式、实战API应用法以及高效的数据采集策略。
1. API基础:理解REST API
1.1 什么是API?
API是使不同软件之间能够通信的接口。用餐厅来比喻的话,API就像是顾客(客户端)和厨房(服务器)之间的服务员。当顾客点餐时,服务员将订单传递给厨房,菜做好后再端给顾客。
# API请求的基本结构
import requests
# 1. 发送请求(Request)
response = requests.get("https://api.example.com/data")
# 2. 接收响应(Response)
if response.status_code == 200:
data = response.json()
print(data)
else:
print(f"发生错误: {response.status_code}")
1.2 REST API的核心概念
REST(Representational State Transfer)是Web API设计的标准架构。
# HTTP方法的用途
"""
GET : 查询数据 (Read)
POST : 创建数据 (Create)
PUT : 完整修改数据 (Update)
PATCH : 部分修改数据 (Partial Update)
DELETE : 删除数据 (Delete)
"""
import requests
base_url = "https://api.example.com"
# GET - 查询数据
response = requests.get(f"{base_url}/users")
# POST - 创建数据
new_user = {"name": "张三", "email": "zhang@example.com"}
response = requests.post(f"{base_url}/users", json=new_user)
# PUT - 修改数据
updated_user = {"name": "张三", "email": "newemail@example.com"}
response = requests.put(f"{base_url}/users/1", json=updated_user)
# DELETE - 删除数据
response = requests.delete(f"{base_url}/users/1")
1.3 理解HTTP状态码
# 主要HTTP状态码
status_codes = {
# 2xx: 成功
200: "OK - 请求成功",
201: "Created - 创建成功",
204: "No Content - 成功(无响应内容)",
# 3xx: 重定向
301: "Moved Permanently - 永久移动",
302: "Found - 临时移动",
# 4xx: 客户端错误
400: "Bad Request - 错误请求",
401: "Unauthorized - 需要认证",
403: "Forbidden - 禁止访问",
404: "Not Found - 资源不存在",
429: "Too Many Requests - 请求次数超限",
# 5xx: 服务器错误
500: "Internal Server Error - 服务器内部错误",
502: "Bad Gateway - 网关错误",
503: "Service Unavailable - 服务不可用"
}
def handle_response(response):
"""根据响应状态码进行处理"""
code = response.status_code
if 200 <= code < 300:
print(f"成功: {status_codes.get(code, '未知成功码')}")
return response.json() if response.content else None
elif 400 <= code < 500:
print(f"客户端错误: {status_codes.get(code, '未知错误')}")
return None
elif 500 <= code < 600:
print(f"服务器错误: {status_codes.get(code, '未知服务器错误')}")
return None
2. requests库完全指南
2.1 基本用法
import requests
# 安装: pip install requests
# GET请求基础
response = requests.get("https://api.github.com/users/octocat")
print(response.status_code) # 200
print(response.headers) # 响应头
print(response.text) # 文本形式的响应
print(response.json()) # JSON解析后的字典
# 传递URL参数
params = {
"q": "python",
"sort": "stars",
"order": "desc"
}
response = requests.get(
"https://api.github.com/search/repositories",
params=params
)
# 实际请求URL: https://api.github.com/search/repositories?q=python&sort=stars&order=desc
2.2 设置请求头和超时
import requests
# 自定义请求头设置
headers = {
"User-Agent": "MyApp/1.0",
"Accept": "application/json",
"Content-Type": "application/json"
}
# 超时设置(连接超时、读取超时)
try:
response = requests.get(
"https://api.example.com/data",
headers=headers,
timeout=(5, 30) # 连接: 5秒, 读取: 30秒
)
except requests.exceptions.Timeout:
print("请求超时。")
except requests.exceptions.ConnectionError:
print("连接失败。")
except requests.exceptions.RequestException as e:
print(f"请求过程中发生错误: {e}")
2.3 使用Session
import requests
# 使用session可以重用连接,提高性能
session = requests.Session()
# 为session应用默认设置
session.headers.update({
"User-Agent": "MyApp/1.0",
"Accept": "application/json"
})
# 向同一主机发送多个请求时使用session
urls = [
"https://api.github.com/users/octocat",
"https://api.github.com/users/octocat/repos",
"https://api.github.com/users/octocat/followers"
]
for url in urls:
response = session.get(url)
print(f"{url}: {response.status_code}")
# 关闭session
session.close()
# 或者使用上下文管理器(推荐)
with requests.Session() as session:
session.headers.update({"User-Agent": "MyApp/1.0"})
response = session.get("https://api.github.com/users/octocat")
3. API认证方式
3.1 API Key认证
import requests
# 方法1: 通过URL参数传递
api_key = "your_api_key_here"
response = requests.get(
"https://api.example.com/data",
params={"api_key": api_key}
)
# 方法2: 通过请求头传递
headers = {"X-API-Key": api_key}
response = requests.get(
"https://api.example.com/data",
headers=headers
)
# 方法3: 通过Authorization头传递
headers = {"Authorization": f"Api-Key {api_key}"}
response = requests.get(
"https://api.example.com/data",
headers=headers
)
3.2 Bearer Token认证(OAuth 2.0)
import requests
# 使用Bearer Token
access_token = "your_access_token_here"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
response = requests.get(
"https://api.example.com/protected/resource",
headers=headers
)
# OAuth 2.0 令牌获取示例
def get_oauth_token(client_id, client_secret, token_url):
"""通过OAuth 2.0 Client Credentials方式获取令牌"""
data = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret
}
response = requests.post(token_url, data=data)
if response.status_code == 200:
token_data = response.json()
return token_data.get("access_token")
else:
raise Exception(f"令牌获取失败: {response.text}")
# 使用示例
# token = get_oauth_token("my_client_id", "my_secret", "https://auth.example.com/token")
3.3 Basic认证
import requests
from requests.auth import HTTPBasicAuth
# 方法1: 使用auth参数
response = requests.get(
"https://api.example.com/data",
auth=HTTPBasicAuth("username", "password")
)
# 方法2: 使用元组简化
response = requests.get(
"https://api.example.com/data",
auth=("username", "password")
)
4. JSON响应处理
4.1 解析JSON数据
import requests
import json
response = requests.get("https://api.github.com/users/octocat")
data = response.json()
# 访问数据
print(f"用户名: {data['login']}")
print(f"姓名: {data.get('name', '无')}") # 安全访问
# 处理嵌套JSON
def safe_get(data, *keys, default=None):
"""从嵌套字典中安全地获取值"""
for key in keys:
if isinstance(data, dict):
data = data.get(key, default)
elif isinstance(data, list) and isinstance(key, int):
try:
data = data[key]
except IndexError:
return default
else:
return default
return data
# 使用示例
nested_data = {
"user": {
"profile": {
"name": "张三",
"contacts": [
{"type": "email", "value": "zhang@example.com"}
]
}
}
}
name = safe_get(nested_data, "user", "profile", "name")
email = safe_get(nested_data, "user", "profile", "contacts", 0, "value")
print(f"姓名: {name}, 邮箱: {email}")
4.2 保存和加载JSON数据
import json
from pathlib import Path
def save_json(data, filepath, indent=2, ensure_ascii=False):
"""将JSON数据保存到文件"""
filepath = Path(filepath)
filepath.parent.mkdir(parents=True, exist_ok=True)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=indent, ensure_ascii=ensure_ascii)
print(f"保存完成: {filepath}")
def load_json(filepath):
"""从JSON文件加载数据"""
with open(filepath, 'r', encoding='utf-8') as f:
return json.load(f)
# 使用示例
api_data = {"name": "张三", "age": 30, "city": "北京"}
save_json(api_data, "data/user_info.json")
loaded_data = load_json("data/user_info.json")
5. 公共数据API应用
5.1 天气API使用
import requests
from datetime import datetime
class WeatherAPI:
"""天气API客户端"""
def __init__(self, api_key):
self.api_key = api_key
self.base_url = "https://api.openweathermap.org/data/2.5"
def get_current_weather(self, city, units="metric", lang="zh_cn"):
"""获取当前天气"""
url = f"{self.base_url}/weather"
params = {
"q": city,
"appid": self.api_key,
"units": units,
"lang": lang
}
response = requests.get(url, params=params)
if response.status_code == 200:
return self._parse_weather_data(response.json())
else:
return None
def _parse_weather_data(self, data):
"""解析天气数据"""
return {
"城市": data["name"],
"温度": f"{data['main']['temp']}°C",
"体感温度": f"{data['main']['feels_like']}°C",
"湿度": f"{data['main']['humidity']}%",
"天气": data["weather"][0]["description"],
"风速": f"{data['wind']['speed']} m/s"
}
# 使用示例
# weather = WeatherAPI("your_api_key")
# result = weather.get_current_weather("Beijing")
# print(result)
5.2 高德地图API
import requests
class AmapAPI:
"""高德地图API客户端"""
def __init__(self, api_key):
self.api_key = api_key
def geocode(self, address):
"""地址转坐标"""
url = "https://restapi.amap.com/v3/geocode/geo"
params = {
"key": self.api_key,
"address": address
}
response = requests.get(url, params=params)
return response.json()
def reverse_geocode(self, location):
"""坐标转地址"""
url = "https://restapi.amap.com/v3/geocode/regeo"
params = {
"key": self.api_key,
"location": location # 格式: "经度,纬度"
}
response = requests.get(url, params=params)
return response.json()
def search_poi(self, keywords, city, types=None):
"""搜索POI"""
url = "https://restapi.amap.com/v3/place/text"
params = {
"key": self.api_key,
"keywords": keywords,
"city": city,
"citylimit": "true"
}
if types:
params["types"] = types
response = requests.get(url, params=params)
return response.json()
# 使用示例
# amap = AmapAPI("your_api_key")
# result = amap.geocode("北京市朝阳区")
# poi_result = amap.search_poi("餐厅", "北京")
6. OpenAI API集成
6.1 ChatGPT API基本使用
import requests
class OpenAIClient:
"""OpenAI API客户端"""
def __init__(self, api_key):
self.api_key = api_key
self.base_url = "https://api.openai.com/v1"
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def chat_completion(self, messages, model="gpt-4o", temperature=0.7, max_tokens=1000):
"""调用聊天完成API"""
url = f"{self.base_url}/chat/completions"
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens
}
response = requests.post(url, headers=self.headers, json=payload)
if response.status_code == 200:
return response.json()['choices'][0]['message']['content']
else:
raise Exception(f"API错误: {response.text}")
def simple_chat(self, user_message, system_prompt=None):
"""简单聊天"""
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": user_message})
return self.chat_completion(messages)
def generate_image(self, prompt, size="1024x1024", n=1):
"""DALL-E图像生成"""
url = f"{self.base_url}/images/generations"
payload = {
"model": "dall-e-3",
"prompt": prompt,
"size": size,
"n": n
}
response = requests.post(url, headers=self.headers, json=payload)
if response.status_code == 200:
return response.json()['data'][0]['url']
else:
raise Exception(f"API错误: {response.text}")
# 使用示例
# openai = OpenAIClient("your_api_key")
# response = openai.simple_chat(
# "请告诉我Python网页爬虫的方法",
# system_prompt="你是Python专家。请配合代码示例进行说明。"
# )
# print(response)
6.2 流式响应处理
import requests
import json
def stream_chat_completion(api_key, messages, model="gpt-4o"):
"""以流式方式接收响应"""
url = "https://api.openai.com/v1/chat/completions"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"model": model,
"messages": messages,
"stream": True
}
response = requests.post(url, headers=headers, json=payload, stream=True)
full_response = ""
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data = line[6:] # 移除'data: '
if data == '[DONE]':
break
try:
chunk = json.loads(data)
content = chunk['choices'][0]['delta'].get('content', '')
if content:
print(content, end='', flush=True)
full_response += content
except json.JSONDecodeError:
continue
print() # 换行
return full_response
# 使用示例
# messages = [{"role": "user", "content": "请告诉我Python的5个优点"}]
# response = stream_chat_completion("your_api_key", messages)
7. 分页和速率限制
7.1 分页处理
import requests
import time
class PaginatedAPIClient:
"""支持分页的API客户端"""
def __init__(self, base_url, headers=None):
self.base_url = base_url
self.headers = headers or {}
def get_all_pages_offset(self, endpoint, page_size=100, max_pages=None):
"""偏移量分页(page, per_page方式)"""
all_data = []
page = 1
while True:
params = {
"page": page,
"per_page": page_size
}
response = requests.get(
f"{self.base_url}{endpoint}",
headers=self.headers,
params=params
)
data = response.json()
if not data: # 空响应则结束
break
all_data.extend(data)
if len(data) < page_size: # 最后一页
break
if max_pages and page >= max_pages:
break
page += 1
time.sleep(0.5) # 防止速率限制
return all_data
def get_all_pages_cursor(self, endpoint, cursor_field="cursor"):
"""游标分页"""
all_data = []
cursor = None
while True:
params = {}
if cursor:
params[cursor_field] = cursor
response = requests.get(
f"{self.base_url}{endpoint}",
headers=self.headers,
params=params
)
result = response.json()
data = result.get("data", [])
all_data.extend(data)
# 没有下一个游标则结束
cursor = result.get("next_cursor")
if not cursor:
break
time.sleep(0.5)
return all_data
7.2 速率限制处理
import requests
import time
from functools import wraps
class RateLimiter:
"""速率限制处理类"""
def __init__(self, calls_per_minute=60):
self.calls_per_minute = calls_per_minute
self.min_interval = 60.0 / calls_per_minute
self.last_call_time = 0
def wait(self):
"""如有必要则等待"""
elapsed = time.time() - self.last_call_time
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
self.last_call_time = time.time()
def rate_limited(calls_per_minute=60):
"""速率限制装饰器"""
min_interval = 60.0 / calls_per_minute
last_call = [0] # 使用可变对象包装
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_call[0]
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
last_call[0] = time.time()
return func(*args, **kwargs)
return wrapper
return decorator
def request_with_retry(url, max_retries=3, backoff_factor=2, **kwargs):
"""带重试逻辑的请求"""
for attempt in range(max_retries):
try:
response = requests.get(url, **kwargs)
# 速率限制超限(429)处理
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
print(f"速率限制超限。{retry_after}秒后重试...")
time.sleep(retry_after)
continue
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise
wait_time = backoff_factor ** attempt
print(f"请求失败。{wait_time}秒后重试... ({attempt + 1}/{max_retries})")
time.sleep(wait_time)
return None
# 使用示例
@rate_limited(calls_per_minute=30)
def fetch_data(url):
return requests.get(url)
8. API响应缓存
import requests
import hashlib
import json
import time
from pathlib import Path
class CachedAPIClient:
"""带缓存功能的API客户端"""
def __init__(self, cache_dir="api_cache", cache_ttl=3600):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.cache_ttl = cache_ttl # 缓存有效时间(秒)
def _get_cache_key(self, url, params=None):
"""生成缓存键"""
key_data = url + json.dumps(params or {}, sort_keys=True)
return hashlib.md5(key_data.encode()).hexdigest()
def _get_cache_path(self, cache_key):
"""缓存文件路径"""
return self.cache_dir / f"{cache_key}.json"
def _is_cache_valid(self, cache_path):
"""检查缓存有效性"""
if not cache_path.exists():
return False
cache_age = time.time() - cache_path.stat().st_mtime
return cache_age < self.cache_ttl
def get(self, url, params=None, force_refresh=False, **kwargs):
"""使用缓存的GET请求"""
cache_key = self._get_cache_key(url, params)
cache_path = self._get_cache_path(cache_key)
# 检查缓存
if not force_refresh and self._is_cache_valid(cache_path):
print(f"使用缓存: {cache_key[:8]}...")
with open(cache_path, 'r', encoding='utf-8') as f:
return json.load(f)
# 调用API
print(f"调用API: {url}")
response = requests.get(url, params=params, **kwargs)
data = response.json()
# 保存缓存
with open(cache_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return data
def clear_cache(self, older_than=None):
"""清除缓存"""
count = 0
for cache_file in self.cache_dir.glob("*.json"):
if older_than:
age = time.time() - cache_file.stat().st_mtime
if age < older_than:
continue
cache_file.unlink()
count += 1
print(f"已删除{count}个缓存文件")
# 基于内存的缓存(简单版本)
from functools import lru_cache
@lru_cache(maxsize=100)
def cached_api_call(url):
"""使用LRU缓存的API调用"""
response = requests.get(url)
return response.json()
9. 实战项目:天气/股票数据采集器
import requests
import json
from datetime import datetime, timedelta
from pathlib import Path
import time
class DataCollector:
"""天气和股票数据采集器"""
def __init__(self, config_path="config.json"):
self.config = self._load_config(config_path)
self.data_dir = Path("collected_data")
self.data_dir.mkdir(parents=True, exist_ok=True)
def _load_config(self, config_path):
"""加载配置文件"""
if Path(config_path).exists():
with open(config_path, 'r') as f:
return json.load(f)
return {}
def collect_weather(self, cities):
"""采集多个城市的天气数据"""
weather_data = []
api_key = self.config.get("openweathermap_api_key")
if not api_key:
print("OpenWeatherMap API密钥未设置。")
return []
for city in cities:
try:
url = "https://api.openweathermap.org/data/2.5/weather"
params = {
"q": city,
"appid": api_key,
"units": "metric",
"lang": "zh_cn"
}
response = requests.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
weather_info = {
"city": city,
"temperature": data["main"]["temp"],
"humidity": data["main"]["humidity"],
"description": data["weather"][0]["description"],
"wind_speed": data["wind"]["speed"],
"collected_at": datetime.now().isoformat()
}
weather_data.append(weather_info)
print(f"{city}: {weather_info['temperature']}°C, {weather_info['description']}")
else:
print(f"{city}: 数据采集失败 ({response.status_code})")
time.sleep(1) # 速率限制
except Exception as e:
print(f"{city}: 发生错误 - {e}")
# 保存数据
self._save_data(weather_data, "weather")
return weather_data
def _save_data(self, data, data_type):
"""保存数据"""
if not data:
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{data_type}_{timestamp}.json"
filepath = self.data_dir / filename
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"数据已保存: {filepath}")
def generate_report(self):
"""生成采集数据报告"""
weather_data = self.get_historical_data("weather", days=1)
report = []
report.append("=" * 50)
report.append(f"数据采集报告 - {datetime.now().strftime('%Y-%m-%d %H:%M')}")
report.append("=" * 50)
if weather_data:
report.append("\n[天气数据]")
for item in weather_data:
report.append(f" - {item['city']}: {item['temperature']}°C, {item['description']}")
report.append("\n" + "=" * 50)
return "\n".join(report)
def get_historical_data(self, data_type, days=7):
"""查询历史数据"""
cutoff_date = datetime.now() - timedelta(days=days)
historical_data = []
for filepath in self.data_dir.glob(f"{data_type}_*.json"):
filename = filepath.stem
date_str = filename.replace(f"{data_type}_", "")[:8]
try:
file_date = datetime.strptime(date_str, "%Y%m%d")
if file_date >= cutoff_date:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
historical_data.extend(data)
except ValueError:
continue
return historical_data
# 使用示例
if __name__ == "__main__":
collector = DataCollector()
# 采集天气数据
cities = ["Beijing", "Shanghai", "Guangzhou", "Shenzhen", "Hangzhou"]
# collector.collect_weather(cities)
# 生成报告
# print(collector.generate_report())
print("数据采集器已准备就绪。")
print("请在config.json文件中设置API密钥后使用。")
总结
在本篇中,我们学习了使用Python进行API集成和数据采集的核心概念。从REST API的基本原理到各种认证方式、实际服务API应用法以及高效的数据采集策略。
使用API可以获得比网页爬虫更稳定和结构化的数据。此外,大多数官方API都提供清晰的文档和一致的响应格式,因此维护也更加容易。
下一篇我们将综合所学内容,构建定时调度和实战自动化项目。让我们创建一个在指定时间自动采集数据、生成报告并发送通知的完整自动化系统。
核心要点
1. REST API使用HTTP方法(GET、POST、PUT、DELETE)操作资源。
2. 使用requests库可以方便地调用API。
3. 理解并活用API Key、Bearer Token、OAuth等各种认证方式。
4. 分页和速率限制是大量数据采集时的必要考虑事项。
5. 利用缓存可以减少API调用次数并提高响应速度。