前言:API开启的数据世界

现代Web服务的核心是API(应用程序编程接口)。通过API,我们可以以编程方式收集和利用天气信息、股票行情、新闻、社交媒体数据等各种信息。使用Python的requests库,可以非常简单地处理这些API调用。

在本篇中,我们将从REST API的基本概念开始,系统地学习各种认证方式、实战API应用法以及高效的数据采集策略。

1. API基础:理解REST API

1.1 什么是API?

API是使不同软件之间能够通信的接口。用餐厅来比喻的话,API就像是顾客(客户端)和厨房(服务器)之间的服务员。当顾客点餐时,服务员将订单传递给厨房,菜做好后再端给顾客。

# API请求的基本结构
import requests

# 1. 发送请求(Request)
response = requests.get("https://api.example.com/data")

# 2. 接收响应(Response)
if response.status_code == 200:
    data = response.json()
    print(data)
else:
    print(f"发生错误: {response.status_code}")

1.2 REST API的核心概念

REST(Representational State Transfer)是Web API设计的标准架构。

# HTTP方法的用途
"""
GET    : 查询数据 (Read)
POST   : 创建数据 (Create)
PUT    : 完整修改数据 (Update)
PATCH  : 部分修改数据 (Partial Update)
DELETE : 删除数据 (Delete)
"""

import requests

base_url = "https://api.example.com"

# GET - 查询数据
response = requests.get(f"{base_url}/users")

# POST - 创建数据
new_user = {"name": "张三", "email": "zhang@example.com"}
response = requests.post(f"{base_url}/users", json=new_user)

# PUT - 修改数据
updated_user = {"name": "张三", "email": "newemail@example.com"}
response = requests.put(f"{base_url}/users/1", json=updated_user)

# DELETE - 删除数据
response = requests.delete(f"{base_url}/users/1")

1.3 理解HTTP状态码

# 主要HTTP状态码
status_codes = {
    # 2xx: 成功
    200: "OK - 请求成功",
    201: "Created - 创建成功",
    204: "No Content - 成功(无响应内容)",

    # 3xx: 重定向
    301: "Moved Permanently - 永久移动",
    302: "Found - 临时移动",

    # 4xx: 客户端错误
    400: "Bad Request - 错误请求",
    401: "Unauthorized - 需要认证",
    403: "Forbidden - 禁止访问",
    404: "Not Found - 资源不存在",
    429: "Too Many Requests - 请求次数超限",

    # 5xx: 服务器错误
    500: "Internal Server Error - 服务器内部错误",
    502: "Bad Gateway - 网关错误",
    503: "Service Unavailable - 服务不可用"
}

def handle_response(response):
    """根据响应状态码进行处理"""
    code = response.status_code

    if 200 <= code < 300:
        print(f"成功: {status_codes.get(code, '未知成功码')}")
        return response.json() if response.content else None
    elif 400 <= code < 500:
        print(f"客户端错误: {status_codes.get(code, '未知错误')}")
        return None
    elif 500 <= code < 600:
        print(f"服务器错误: {status_codes.get(code, '未知服务器错误')}")
        return None

2. requests库完全指南

2.1 基本用法

import requests

# 安装: pip install requests

# GET请求基础
response = requests.get("https://api.github.com/users/octocat")
print(response.status_code)  # 200
print(response.headers)       # 响应头
print(response.text)          # 文本形式的响应
print(response.json())        # JSON解析后的字典

# 传递URL参数
params = {
    "q": "python",
    "sort": "stars",
    "order": "desc"
}
response = requests.get(
    "https://api.github.com/search/repositories",
    params=params
)
# 实际请求URL: https://api.github.com/search/repositories?q=python&sort=stars&order=desc

2.2 设置请求头和超时

import requests

# 自定义请求头设置
headers = {
    "User-Agent": "MyApp/1.0",
    "Accept": "application/json",
    "Content-Type": "application/json"
}

# 超时设置(连接超时、读取超时)
try:
    response = requests.get(
        "https://api.example.com/data",
        headers=headers,
        timeout=(5, 30)  # 连接: 5秒, 读取: 30秒
    )
except requests.exceptions.Timeout:
    print("请求超时。")
except requests.exceptions.ConnectionError:
    print("连接失败。")
except requests.exceptions.RequestException as e:
    print(f"请求过程中发生错误: {e}")

2.3 使用Session

import requests

# 使用session可以重用连接,提高性能
session = requests.Session()

# 为session应用默认设置
session.headers.update({
    "User-Agent": "MyApp/1.0",
    "Accept": "application/json"
})

# 向同一主机发送多个请求时使用session
urls = [
    "https://api.github.com/users/octocat",
    "https://api.github.com/users/octocat/repos",
    "https://api.github.com/users/octocat/followers"
]

for url in urls:
    response = session.get(url)
    print(f"{url}: {response.status_code}")

# 关闭session
session.close()

# 或者使用上下文管理器(推荐)
with requests.Session() as session:
    session.headers.update({"User-Agent": "MyApp/1.0"})
    response = session.get("https://api.github.com/users/octocat")

3. API认证方式

3.1 API Key认证

import requests

# 方法1: 通过URL参数传递
api_key = "your_api_key_here"
response = requests.get(
    "https://api.example.com/data",
    params={"api_key": api_key}
)

# 方法2: 通过请求头传递
headers = {"X-API-Key": api_key}
response = requests.get(
    "https://api.example.com/data",
    headers=headers
)

# 方法3: 通过Authorization头传递
headers = {"Authorization": f"Api-Key {api_key}"}
response = requests.get(
    "https://api.example.com/data",
    headers=headers
)

3.2 Bearer Token认证(OAuth 2.0)

import requests

# 使用Bearer Token
access_token = "your_access_token_here"

headers = {
    "Authorization": f"Bearer {access_token}",
    "Content-Type": "application/json"
}

response = requests.get(
    "https://api.example.com/protected/resource",
    headers=headers
)

# OAuth 2.0 令牌获取示例
def get_oauth_token(client_id, client_secret, token_url):
    """通过OAuth 2.0 Client Credentials方式获取令牌"""
    data = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret
    }

    response = requests.post(token_url, data=data)

    if response.status_code == 200:
        token_data = response.json()
        return token_data.get("access_token")
    else:
        raise Exception(f"令牌获取失败: {response.text}")

# 使用示例
# token = get_oauth_token("my_client_id", "my_secret", "https://auth.example.com/token")

3.3 Basic认证

import requests
from requests.auth import HTTPBasicAuth

# 方法1: 使用auth参数
response = requests.get(
    "https://api.example.com/data",
    auth=HTTPBasicAuth("username", "password")
)

# 方法2: 使用元组简化
response = requests.get(
    "https://api.example.com/data",
    auth=("username", "password")
)

4. JSON响应处理

4.1 解析JSON数据

import requests
import json

response = requests.get("https://api.github.com/users/octocat")
data = response.json()

# 访问数据
print(f"用户名: {data['login']}")
print(f"姓名: {data.get('name', '无')}")  # 安全访问

# 处理嵌套JSON
def safe_get(data, *keys, default=None):
    """从嵌套字典中安全地获取值"""
    for key in keys:
        if isinstance(data, dict):
            data = data.get(key, default)
        elif isinstance(data, list) and isinstance(key, int):
            try:
                data = data[key]
            except IndexError:
                return default
        else:
            return default
    return data

# 使用示例
nested_data = {
    "user": {
        "profile": {
            "name": "张三",
            "contacts": [
                {"type": "email", "value": "zhang@example.com"}
            ]
        }
    }
}

name = safe_get(nested_data, "user", "profile", "name")
email = safe_get(nested_data, "user", "profile", "contacts", 0, "value")
print(f"姓名: {name}, 邮箱: {email}")

4.2 保存和加载JSON数据

import json
from pathlib import Path

def save_json(data, filepath, indent=2, ensure_ascii=False):
    """将JSON数据保存到文件"""
    filepath = Path(filepath)
    filepath.parent.mkdir(parents=True, exist_ok=True)

    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=indent, ensure_ascii=ensure_ascii)

    print(f"保存完成: {filepath}")

def load_json(filepath):
    """从JSON文件加载数据"""
    with open(filepath, 'r', encoding='utf-8') as f:
        return json.load(f)

# 使用示例
api_data = {"name": "张三", "age": 30, "city": "北京"}
save_json(api_data, "data/user_info.json")
loaded_data = load_json("data/user_info.json")

5. 公共数据API应用

5.1 天气API使用

import requests
from datetime import datetime

class WeatherAPI:
    """天气API客户端"""

    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = "https://api.openweathermap.org/data/2.5"

    def get_current_weather(self, city, units="metric", lang="zh_cn"):
        """获取当前天气"""
        url = f"{self.base_url}/weather"

        params = {
            "q": city,
            "appid": self.api_key,
            "units": units,
            "lang": lang
        }

        response = requests.get(url, params=params)

        if response.status_code == 200:
            return self._parse_weather_data(response.json())
        else:
            return None

    def _parse_weather_data(self, data):
        """解析天气数据"""
        return {
            "城市": data["name"],
            "温度": f"{data['main']['temp']}°C",
            "体感温度": f"{data['main']['feels_like']}°C",
            "湿度": f"{data['main']['humidity']}%",
            "天气": data["weather"][0]["description"],
            "风速": f"{data['wind']['speed']} m/s"
        }

# 使用示例
# weather = WeatherAPI("your_api_key")
# result = weather.get_current_weather("Beijing")
# print(result)

5.2 高德地图API

import requests

class AmapAPI:
    """高德地图API客户端"""

    def __init__(self, api_key):
        self.api_key = api_key

    def geocode(self, address):
        """地址转坐标"""
        url = "https://restapi.amap.com/v3/geocode/geo"
        params = {
            "key": self.api_key,
            "address": address
        }

        response = requests.get(url, params=params)
        return response.json()

    def reverse_geocode(self, location):
        """坐标转地址"""
        url = "https://restapi.amap.com/v3/geocode/regeo"
        params = {
            "key": self.api_key,
            "location": location  # 格式: "经度,纬度"
        }

        response = requests.get(url, params=params)
        return response.json()

    def search_poi(self, keywords, city, types=None):
        """搜索POI"""
        url = "https://restapi.amap.com/v3/place/text"
        params = {
            "key": self.api_key,
            "keywords": keywords,
            "city": city,
            "citylimit": "true"
        }
        if types:
            params["types"] = types

        response = requests.get(url, params=params)
        return response.json()

# 使用示例
# amap = AmapAPI("your_api_key")
# result = amap.geocode("北京市朝阳区")
# poi_result = amap.search_poi("餐厅", "北京")

6. OpenAI API集成

6.1 ChatGPT API基本使用

import requests

class OpenAIClient:
    """OpenAI API客户端"""

    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = "https://api.openai.com/v1"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }

    def chat_completion(self, messages, model="gpt-4o", temperature=0.7, max_tokens=1000):
        """调用聊天完成API"""
        url = f"{self.base_url}/chat/completions"

        payload = {
            "model": model,
            "messages": messages,
            "temperature": temperature,
            "max_tokens": max_tokens
        }

        response = requests.post(url, headers=self.headers, json=payload)

        if response.status_code == 200:
            return response.json()['choices'][0]['message']['content']
        else:
            raise Exception(f"API错误: {response.text}")

    def simple_chat(self, user_message, system_prompt=None):
        """简单聊天"""
        messages = []

        if system_prompt:
            messages.append({"role": "system", "content": system_prompt})

        messages.append({"role": "user", "content": user_message})

        return self.chat_completion(messages)

    def generate_image(self, prompt, size="1024x1024", n=1):
        """DALL-E图像生成"""
        url = f"{self.base_url}/images/generations"

        payload = {
            "model": "dall-e-3",
            "prompt": prompt,
            "size": size,
            "n": n
        }

        response = requests.post(url, headers=self.headers, json=payload)

        if response.status_code == 200:
            return response.json()['data'][0]['url']
        else:
            raise Exception(f"API错误: {response.text}")

# 使用示例
# openai = OpenAIClient("your_api_key")
# response = openai.simple_chat(
#     "请告诉我Python网页爬虫的方法",
#     system_prompt="你是Python专家。请配合代码示例进行说明。"
# )
# print(response)

6.2 流式响应处理

import requests
import json

def stream_chat_completion(api_key, messages, model="gpt-4o"):
    """以流式方式接收响应"""
    url = "https://api.openai.com/v1/chat/completions"

    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }

    payload = {
        "model": model,
        "messages": messages,
        "stream": True
    }

    response = requests.post(url, headers=headers, json=payload, stream=True)

    full_response = ""

    for line in response.iter_lines():
        if line:
            line = line.decode('utf-8')
            if line.startswith('data: '):
                data = line[6:]  # 移除'data: '

                if data == '[DONE]':
                    break

                try:
                    chunk = json.loads(data)
                    content = chunk['choices'][0]['delta'].get('content', '')
                    if content:
                        print(content, end='', flush=True)
                        full_response += content
                except json.JSONDecodeError:
                    continue

    print()  # 换行
    return full_response

# 使用示例
# messages = [{"role": "user", "content": "请告诉我Python的5个优点"}]
# response = stream_chat_completion("your_api_key", messages)

7. 分页和速率限制

7.1 分页处理

import requests
import time

class PaginatedAPIClient:
    """支持分页的API客户端"""

    def __init__(self, base_url, headers=None):
        self.base_url = base_url
        self.headers = headers or {}

    def get_all_pages_offset(self, endpoint, page_size=100, max_pages=None):
        """偏移量分页(page, per_page方式)"""
        all_data = []
        page = 1

        while True:
            params = {
                "page": page,
                "per_page": page_size
            }

            response = requests.get(
                f"{self.base_url}{endpoint}",
                headers=self.headers,
                params=params
            )

            data = response.json()

            if not data:  # 空响应则结束
                break

            all_data.extend(data)

            if len(data) < page_size:  # 最后一页
                break

            if max_pages and page >= max_pages:
                break

            page += 1
            time.sleep(0.5)  # 防止速率限制

        return all_data

    def get_all_pages_cursor(self, endpoint, cursor_field="cursor"):
        """游标分页"""
        all_data = []
        cursor = None

        while True:
            params = {}
            if cursor:
                params[cursor_field] = cursor

            response = requests.get(
                f"{self.base_url}{endpoint}",
                headers=self.headers,
                params=params
            )

            result = response.json()
            data = result.get("data", [])
            all_data.extend(data)

            # 没有下一个游标则结束
            cursor = result.get("next_cursor")
            if not cursor:
                break

            time.sleep(0.5)

        return all_data

7.2 速率限制处理

import requests
import time
from functools import wraps

class RateLimiter:
    """速率限制处理类"""

    def __init__(self, calls_per_minute=60):
        self.calls_per_minute = calls_per_minute
        self.min_interval = 60.0 / calls_per_minute
        self.last_call_time = 0

    def wait(self):
        """如有必要则等待"""
        elapsed = time.time() - self.last_call_time
        if elapsed < self.min_interval:
            time.sleep(self.min_interval - elapsed)
        self.last_call_time = time.time()

def rate_limited(calls_per_minute=60):
    """速率限制装饰器"""
    min_interval = 60.0 / calls_per_minute
    last_call = [0]  # 使用可变对象包装

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_call[0]
            if elapsed < min_interval:
                time.sleep(min_interval - elapsed)
            last_call[0] = time.time()
            return func(*args, **kwargs)
        return wrapper
    return decorator

def request_with_retry(url, max_retries=3, backoff_factor=2, **kwargs):
    """带重试逻辑的请求"""
    for attempt in range(max_retries):
        try:
            response = requests.get(url, **kwargs)

            # 速率限制超限(429)处理
            if response.status_code == 429:
                retry_after = int(response.headers.get("Retry-After", 60))
                print(f"速率限制超限。{retry_after}秒后重试...")
                time.sleep(retry_after)
                continue

            response.raise_for_status()
            return response

        except requests.exceptions.RequestException as e:
            if attempt == max_retries - 1:
                raise

            wait_time = backoff_factor ** attempt
            print(f"请求失败。{wait_time}秒后重试... ({attempt + 1}/{max_retries})")
            time.sleep(wait_time)

    return None

# 使用示例
@rate_limited(calls_per_minute=30)
def fetch_data(url):
    return requests.get(url)

8. API响应缓存

import requests
import hashlib
import json
import time
from pathlib import Path

class CachedAPIClient:
    """带缓存功能的API客户端"""

    def __init__(self, cache_dir="api_cache", cache_ttl=3600):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        self.cache_ttl = cache_ttl  # 缓存有效时间(秒)

    def _get_cache_key(self, url, params=None):
        """生成缓存键"""
        key_data = url + json.dumps(params or {}, sort_keys=True)
        return hashlib.md5(key_data.encode()).hexdigest()

    def _get_cache_path(self, cache_key):
        """缓存文件路径"""
        return self.cache_dir / f"{cache_key}.json"

    def _is_cache_valid(self, cache_path):
        """检查缓存有效性"""
        if not cache_path.exists():
            return False

        cache_age = time.time() - cache_path.stat().st_mtime
        return cache_age < self.cache_ttl

    def get(self, url, params=None, force_refresh=False, **kwargs):
        """使用缓存的GET请求"""
        cache_key = self._get_cache_key(url, params)
        cache_path = self._get_cache_path(cache_key)

        # 检查缓存
        if not force_refresh and self._is_cache_valid(cache_path):
            print(f"使用缓存: {cache_key[:8]}...")
            with open(cache_path, 'r', encoding='utf-8') as f:
                return json.load(f)

        # 调用API
        print(f"调用API: {url}")
        response = requests.get(url, params=params, **kwargs)
        data = response.json()

        # 保存缓存
        with open(cache_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)

        return data

    def clear_cache(self, older_than=None):
        """清除缓存"""
        count = 0
        for cache_file in self.cache_dir.glob("*.json"):
            if older_than:
                age = time.time() - cache_file.stat().st_mtime
                if age < older_than:
                    continue

            cache_file.unlink()
            count += 1

        print(f"已删除{count}个缓存文件")

# 基于内存的缓存(简单版本)
from functools import lru_cache

@lru_cache(maxsize=100)
def cached_api_call(url):
    """使用LRU缓存的API调用"""
    response = requests.get(url)
    return response.json()

9. 实战项目:天气/股票数据采集器

import requests
import json
from datetime import datetime, timedelta
from pathlib import Path
import time

class DataCollector:
    """天气和股票数据采集器"""

    def __init__(self, config_path="config.json"):
        self.config = self._load_config(config_path)
        self.data_dir = Path("collected_data")
        self.data_dir.mkdir(parents=True, exist_ok=True)

    def _load_config(self, config_path):
        """加载配置文件"""
        if Path(config_path).exists():
            with open(config_path, 'r') as f:
                return json.load(f)
        return {}

    def collect_weather(self, cities):
        """采集多个城市的天气数据"""
        weather_data = []
        api_key = self.config.get("openweathermap_api_key")

        if not api_key:
            print("OpenWeatherMap API密钥未设置。")
            return []

        for city in cities:
            try:
                url = "https://api.openweathermap.org/data/2.5/weather"
                params = {
                    "q": city,
                    "appid": api_key,
                    "units": "metric",
                    "lang": "zh_cn"
                }

                response = requests.get(url, params=params, timeout=10)

                if response.status_code == 200:
                    data = response.json()
                    weather_info = {
                        "city": city,
                        "temperature": data["main"]["temp"],
                        "humidity": data["main"]["humidity"],
                        "description": data["weather"][0]["description"],
                        "wind_speed": data["wind"]["speed"],
                        "collected_at": datetime.now().isoformat()
                    }
                    weather_data.append(weather_info)
                    print(f"{city}: {weather_info['temperature']}°C, {weather_info['description']}")
                else:
                    print(f"{city}: 数据采集失败 ({response.status_code})")

                time.sleep(1)  # 速率限制

            except Exception as e:
                print(f"{city}: 发生错误 - {e}")

        # 保存数据
        self._save_data(weather_data, "weather")
        return weather_data

    def _save_data(self, data, data_type):
        """保存数据"""
        if not data:
            return

        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"{data_type}_{timestamp}.json"
        filepath = self.data_dir / filename

        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)

        print(f"数据已保存: {filepath}")

    def generate_report(self):
        """生成采集数据报告"""
        weather_data = self.get_historical_data("weather", days=1)

        report = []
        report.append("=" * 50)
        report.append(f"数据采集报告 - {datetime.now().strftime('%Y-%m-%d %H:%M')}")
        report.append("=" * 50)

        if weather_data:
            report.append("\n[天气数据]")
            for item in weather_data:
                report.append(f"  - {item['city']}: {item['temperature']}°C, {item['description']}")

        report.append("\n" + "=" * 50)

        return "\n".join(report)

    def get_historical_data(self, data_type, days=7):
        """查询历史数据"""
        cutoff_date = datetime.now() - timedelta(days=days)
        historical_data = []

        for filepath in self.data_dir.glob(f"{data_type}_*.json"):
            filename = filepath.stem
            date_str = filename.replace(f"{data_type}_", "")[:8]

            try:
                file_date = datetime.strptime(date_str, "%Y%m%d")
                if file_date >= cutoff_date:
                    with open(filepath, 'r', encoding='utf-8') as f:
                        data = json.load(f)
                        historical_data.extend(data)
            except ValueError:
                continue

        return historical_data

# 使用示例
if __name__ == "__main__":
    collector = DataCollector()

    # 采集天气数据
    cities = ["Beijing", "Shanghai", "Guangzhou", "Shenzhen", "Hangzhou"]
    # collector.collect_weather(cities)

    # 生成报告
    # print(collector.generate_report())

    print("数据采集器已准备就绪。")
    print("请在config.json文件中设置API密钥后使用。")

总结

在本篇中,我们学习了使用Python进行API集成和数据采集的核心概念。从REST API的基本原理到各种认证方式、实际服务API应用法以及高效的数据采集策略。

使用API可以获得比网页爬虫更稳定和结构化的数据。此外,大多数官方API都提供清晰的文档和一致的响应格式,因此维护也更加容易。

下一篇我们将综合所学内容,构建定时调度和实战自动化项目。让我们创建一个在指定时间自动采集数据、生成报告并发送通知的完整自动化系统。

核心要点
1. REST API使用HTTP方法(GET、POST、PUT、DELETE)操作资源。
2. 使用requests库可以方便地调用API。
3. 理解并活用API Key、Bearer Token、OAuth等各种认证方式。
4. 分页和速率限制是大量数据采集时的必要考虑事项。
5. 利用缓存可以减少API调用次数并提高响应速度。