Python Automation Master Part 7: API Integration and Data Collection
Unlocking the Power of APIs for Automated Data Gathering
Introduction: APIs Open the World of Data
The core of modern web services is the API (Application Programming Interface). Through APIs, we can programmatically collect and utilize various information such as weather data, stock prices, news, and social media data. With Python's requests library, you can handle these API calls very simply.
In this part, we'll systematically learn from the basic concepts of REST APIs, through various authentication methods, practical API usage, to efficient data collection strategies.
1. API Basics: Understanding REST APIs
1.1 What is an API?
An API is an interface that enables communication between different software. Think of it like a waiter in a restaurant, acting as an intermediary between the customer (client) and the kitchen (server). When a customer orders from the menu, the waiter delivers it to the kitchen, and when the food is ready, brings it back to the customer.
# Basic structure of API requests
import requests
# 1. Send request
response = requests.get("https://api.example.com/data")
# 2. Receive response
if response.status_code == 200:
data = response.json()
print(data)
else:
print(f"Error occurred: {response.status_code}")
1.2 Core Concepts of REST API
REST (Representational State Transfer) is the standard architecture for web API design.
# HTTP methods by purpose
"""
GET : Retrieve data (Read)
POST : Create data (Create)
PUT : Update entire data (Update)
PATCH : Partial update (Partial Update)
DELETE : Delete data (Delete)
"""
import requests
base_url = "https://api.example.com"
# GET - Retrieve data
response = requests.get(f"{base_url}/users")
# POST - Create data
new_user = {"name": "John Doe", "email": "john@example.com"}
response = requests.post(f"{base_url}/users", json=new_user)
# PUT - Update data
updated_user = {"name": "John Doe", "email": "newemail@example.com"}
response = requests.put(f"{base_url}/users/1", json=updated_user)
# DELETE - Delete data
response = requests.delete(f"{base_url}/users/1")
1.3 Understanding HTTP Status Codes
# Major HTTP status codes
status_codes = {
# 2xx: Success
200: "OK - Request successful",
201: "Created - Creation successful",
204: "No Content - Success (no response body)",
# 3xx: Redirection
301: "Moved Permanently - Permanent redirect",
302: "Found - Temporary redirect",
# 4xx: Client errors
400: "Bad Request - Invalid request",
401: "Unauthorized - Authentication required",
403: "Forbidden - Access denied",
404: "Not Found - Resource not found",
429: "Too Many Requests - Rate limit exceeded",
# 5xx: Server errors
500: "Internal Server Error - Server internal error",
502: "Bad Gateway - Gateway error",
503: "Service Unavailable - Service unavailable"
}
def handle_response(response):
"""Handle response based on status code"""
code = response.status_code
if 200 <= code < 300:
print(f"Success: {status_codes.get(code, 'Unknown success code')}")
return response.json() if response.content else None
elif 400 <= code < 500:
print(f"Client error: {status_codes.get(code, 'Unknown error')}")
return None
elif 500 <= code < 600:
print(f"Server error: {status_codes.get(code, 'Unknown server error')}")
return None
2. Complete Guide to the requests Library
2.1 Basic Usage
import requests
# Installation: pip install requests
# Basic GET request
response = requests.get("https://api.github.com/users/octocat")
print(response.status_code) # 200
print(response.headers) # Response headers
print(response.text) # Response as text
print(response.json()) # Parsed JSON dictionary
# Passing URL parameters
params = {
"q": "python",
"sort": "stars",
"order": "desc"
}
response = requests.get(
"https://api.github.com/search/repositories",
params=params
)
# Actual request URL: https://api.github.com/search/repositories?q=python&sort=stars&order=desc
2.2 Headers and Timeout Settings
import requests
# Custom header settings
headers = {
"User-Agent": "MyApp/1.0",
"Accept": "application/json",
"Content-Type": "application/json"
}
# Timeout settings (connection timeout, read timeout)
try:
response = requests.get(
"https://api.example.com/data",
headers=headers,
timeout=(5, 30) # Connection: 5s, Read: 30s
)
except requests.exceptions.Timeout:
print("Request timed out.")
except requests.exceptions.ConnectionError:
print("Connection failed.")
except requests.exceptions.RequestException as e:
print(f"Request error: {e}")
2.3 Using Sessions
import requests
# Using sessions improves performance by reusing connections
session = requests.Session()
# Apply default settings to session
session.headers.update({
"User-Agent": "MyApp/1.0",
"Accept": "application/json"
})
# Use session for multiple requests to the same host
urls = [
"https://api.github.com/users/octocat",
"https://api.github.com/users/octocat/repos",
"https://api.github.com/users/octocat/followers"
]
for url in urls:
response = session.get(url)
print(f"{url}: {response.status_code}")
# Close session
session.close()
# Or use context manager (recommended)
with requests.Session() as session:
session.headers.update({"User-Agent": "MyApp/1.0"})
response = session.get("https://api.github.com/users/octocat")
3. API Authentication Methods
3.1 API Key Authentication
import requests
# Method 1: Pass as URL parameter
api_key = "your_api_key_here"
response = requests.get(
"https://api.example.com/data",
params={"api_key": api_key}
)
# Method 2: Pass in header
headers = {"X-API-Key": api_key}
response = requests.get(
"https://api.example.com/data",
headers=headers
)
# Method 3: Pass in Authorization header
headers = {"Authorization": f"Api-Key {api_key}"}
response = requests.get(
"https://api.example.com/data",
headers=headers
)
3.2 Bearer Token Authentication (OAuth 2.0)
import requests
# Using Bearer Token
access_token = "your_access_token_here"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
response = requests.get(
"https://api.example.com/protected/resource",
headers=headers
)
# OAuth 2.0 token issuance example
def get_oauth_token(client_id, client_secret, token_url):
"""Get token using OAuth 2.0 Client Credentials method"""
data = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret
}
response = requests.post(token_url, data=data)
if response.status_code == 200:
token_data = response.json()
return token_data.get("access_token")
else:
raise Exception(f"Token issuance failed: {response.text}")
# Usage
# token = get_oauth_token("my_client_id", "my_secret", "https://auth.example.com/token")
3.3 Basic Authentication
import requests
from requests.auth import HTTPBasicAuth
# Method 1: Using auth parameter
response = requests.get(
"https://api.example.com/data",
auth=HTTPBasicAuth("username", "password")
)
# Method 2: Simple tuple form
response = requests.get(
"https://api.example.com/data",
auth=("username", "password")
)
4. JSON Response Handling
4.1 JSON Data Parsing
import requests
import json
response = requests.get("https://api.github.com/users/octocat")
data = response.json()
# Access data
print(f"Username: {data['login']}")
print(f"Name: {data.get('name', 'None')}") # Safe access
# Handling nested JSON
def safe_get(data, *keys, default=None):
"""Safely get value from nested dictionary"""
for key in keys:
if isinstance(data, dict):
data = data.get(key, default)
elif isinstance(data, list) and isinstance(key, int):
try:
data = data[key]
except IndexError:
return default
else:
return default
return data
# Usage example
nested_data = {
"user": {
"profile": {
"name": "John Doe",
"contacts": [
{"type": "email", "value": "john@example.com"}
]
}
}
}
name = safe_get(nested_data, "user", "profile", "name")
email = safe_get(nested_data, "user", "profile", "contacts", 0, "value")
print(f"Name: {name}, Email: {email}")
4.2 Saving and Loading JSON Data
import json
from pathlib import Path
def save_json(data, filepath, indent=2, ensure_ascii=False):
"""Save JSON data to file"""
filepath = Path(filepath)
filepath.parent.mkdir(parents=True, exist_ok=True)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=indent, ensure_ascii=ensure_ascii)
print(f"Saved: {filepath}")
def load_json(filepath):
"""Load data from JSON file"""
with open(filepath, 'r', encoding='utf-8') as f:
return json.load(f)
# Usage example
api_data = {"name": "John Doe", "age": 30, "city": "New York"}
save_json(api_data, "data/user_info.json")
loaded_data = load_json("data/user_info.json")
5. Public Data APIs
5.1 Using Public Data APIs
import requests
class WeatherAPI:
"""OpenWeatherMap API Client"""
def __init__(self, api_key):
self.api_key = api_key
self.base_url = "https://api.openweathermap.org/data/2.5"
def get_current_weather(self, city, units="metric"):
"""Get current weather for a city"""
url = f"{self.base_url}/weather"
params = {
"q": city,
"appid": self.api_key,
"units": units
}
response = requests.get(url, params=params)
if response.status_code == 200:
return self._parse_weather(response.json())
return None
def _parse_weather(self, data):
"""Parse weather data"""
return {
"city": data["name"],
"temperature": data["main"]["temp"],
"feels_like": data["main"]["feels_like"],
"humidity": data["main"]["humidity"],
"description": data["weather"][0]["description"],
"wind_speed": data["wind"]["speed"]
}
def get_forecast(self, city, units="metric"):
"""Get 5-day forecast"""
url = f"{self.base_url}/forecast"
params = {
"q": city,
"appid": self.api_key,
"units": units
}
response = requests.get(url, params=params)
return response.json() if response.status_code == 200 else None
# Usage example
# weather = WeatherAPI("your_api_key")
# current = weather.get_current_weather("London")
# forecast = weather.get_forecast("New York")
5.2 News API
import requests
from datetime import datetime, timedelta
class NewsAPI:
"""NewsAPI Client"""
def __init__(self, api_key):
self.api_key = api_key
self.base_url = "https://newsapi.org/v2"
self.headers = {"X-Api-Key": api_key}
def get_top_headlines(self, country="us", category=None, page_size=10):
"""Get top headlines"""
url = f"{self.base_url}/top-headlines"
params = {
"country": country,
"pageSize": page_size
}
if category:
params["category"] = category # business, technology, sports, etc.
response = requests.get(url, headers=self.headers, params=params)
return response.json() if response.status_code == 200 else None
def search_news(self, query, from_date=None, to_date=None, sort_by="publishedAt"):
"""Search news articles"""
url = f"{self.base_url}/everything"
params = {
"q": query,
"sortBy": sort_by, # relevancy, popularity, publishedAt
"language": "en"
}
if from_date:
params["from"] = from_date
if to_date:
params["to"] = to_date
response = requests.get(url, headers=self.headers, params=params)
return response.json() if response.status_code == 200 else None
# Usage example
# news = NewsAPI("your_api_key")
# headlines = news.get_top_headlines(country="us", category="technology")
# search_results = news.search_news("artificial intelligence")
6. Google and Twitter API Integration
6.1 Google Custom Search API
import requests
class GoogleSearchAPI:
"""Google Custom Search API Client"""
def __init__(self, api_key, search_engine_id):
self.api_key = api_key
self.search_engine_id = search_engine_id
self.base_url = "https://www.googleapis.com/customsearch/v1"
def search(self, query, num=10, start=1, search_type=None):
"""Search the web"""
params = {
"key": self.api_key,
"cx": self.search_engine_id,
"q": query,
"num": num,
"start": start
}
if search_type:
params["searchType"] = search_type # "image" for image search
response = requests.get(self.base_url, params=params)
return response.json() if response.status_code == 200 else None
def search_images(self, query, num=10):
"""Search images"""
return self.search(query, num=num, search_type="image")
# Usage example
# google = GoogleSearchAPI("your_api_key", "your_search_engine_id")
# results = google.search("Python programming")
# images = google.search_images("landscape wallpaper")
6.2 Twitter/X API
import requests
class TwitterAPI:
"""Twitter API v2 Client"""
def __init__(self, bearer_token):
self.bearer_token = bearer_token
self.base_url = "https://api.twitter.com/2"
self.headers = {
"Authorization": f"Bearer {bearer_token}"
}
def search_recent_tweets(self, query, max_results=10):
"""Search recent tweets"""
url = f"{self.base_url}/tweets/search/recent"
params = {
"query": query,
"max_results": max_results,
"tweet.fields": "created_at,author_id,public_metrics"
}
response = requests.get(url, headers=self.headers, params=params)
return response.json() if response.status_code == 200 else None
def get_user(self, username):
"""Get user information"""
url = f"{self.base_url}/users/by/username/{username}"
params = {
"user.fields": "description,public_metrics,created_at"
}
response = requests.get(url, headers=self.headers, params=params)
return response.json() if response.status_code == 200 else None
def get_user_tweets(self, user_id, max_results=10):
"""Get user's tweets"""
url = f"{self.base_url}/users/{user_id}/tweets"
params = {
"max_results": max_results,
"tweet.fields": "created_at,public_metrics"
}
response = requests.get(url, headers=self.headers, params=params)
return response.json() if response.status_code == 200 else None
# Usage example
# twitter = TwitterAPI("your_bearer_token")
# tweets = twitter.search_recent_tweets("Python programming")
# user = twitter.get_user("elonmusk")
7. OpenAI API Integration
7.1 Basic ChatGPT API Usage
import requests
class OpenAIClient:
"""OpenAI API Client"""
def __init__(self, api_key):
self.api_key = api_key
self.base_url = "https://api.openai.com/v1"
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def chat_completion(self, messages, model="gpt-4o", temperature=0.7, max_tokens=1000):
"""Call Chat Completion API"""
url = f"{self.base_url}/chat/completions"
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens
}
response = requests.post(url, headers=self.headers, json=payload)
if response.status_code == 200:
return response.json()['choices'][0]['message']['content']
else:
raise Exception(f"API error: {response.text}")
def simple_chat(self, user_message, system_prompt=None):
"""Simple chat"""
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": user_message})
return self.chat_completion(messages)
def generate_image(self, prompt, size="1024x1024", n=1):
"""DALL-E image generation"""
url = f"{self.base_url}/images/generations"
payload = {
"model": "dall-e-3",
"prompt": prompt,
"size": size,
"n": n
}
response = requests.post(url, headers=self.headers, json=payload)
if response.status_code == 200:
return response.json()['data'][0]['url']
else:
raise Exception(f"API error: {response.text}")
# Usage example
# openai = OpenAIClient("your_api_key")
# response = openai.simple_chat(
# "How do I do web scraping with Python?",
# system_prompt="You are a Python expert. Explain with code examples."
# )
# print(response)
7.2 Streaming Response Handling
import requests
import json
def stream_chat_completion(api_key, messages, model="gpt-4o"):
"""Receive response via streaming"""
url = "https://api.openai.com/v1/chat/completions"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"model": model,
"messages": messages,
"stream": True
}
response = requests.post(url, headers=headers, json=payload, stream=True)
full_response = ""
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data = line[6:] # Remove 'data: '
if data == '[DONE]':
break
try:
chunk = json.loads(data)
content = chunk['choices'][0]['delta'].get('content', '')
if content:
print(content, end='', flush=True)
full_response += content
except json.JSONDecodeError:
continue
print() # Newline
return full_response
# Usage example
# messages = [{"role": "user", "content": "Tell me 5 benefits of Python"}]
# response = stream_chat_completion("your_api_key", messages)
8. Pagination and Rate Limiting
8.1 Pagination Handling
import requests
import time
class PaginatedAPIClient:
"""API client with pagination support"""
def __init__(self, base_url, headers=None):
self.base_url = base_url
self.headers = headers or {}
def get_all_pages_offset(self, endpoint, page_size=100, max_pages=None):
"""Offset-based pagination (page, per_page style)"""
all_data = []
page = 1
while True:
params = {
"page": page,
"per_page": page_size
}
response = requests.get(
f"{self.base_url}{endpoint}",
headers=self.headers,
params=params
)
data = response.json()
if not data: # Empty response means done
break
all_data.extend(data)
if len(data) < page_size: # Last page
break
if max_pages and page >= max_pages:
break
page += 1
time.sleep(0.5) # Rate limiting prevention
return all_data
def get_all_pages_cursor(self, endpoint, cursor_field="cursor"):
"""Cursor-based pagination"""
all_data = []
cursor = None
while True:
params = {}
if cursor:
params[cursor_field] = cursor
response = requests.get(
f"{self.base_url}{endpoint}",
headers=self.headers,
params=params
)
result = response.json()
data = result.get("data", [])
all_data.extend(data)
# Exit if no next cursor
cursor = result.get("next_cursor")
if not cursor:
break
time.sleep(0.5)
return all_data
def get_all_pages_link_header(self, endpoint):
"""Link header-based pagination (GitHub style)"""
all_data = []
url = f"{self.base_url}{endpoint}"
while url:
response = requests.get(url, headers=self.headers)
all_data.extend(response.json())
# Extract next page URL from Link header
link_header = response.headers.get("Link", "")
url = self._parse_link_header(link_header, "next")
time.sleep(0.5)
return all_data
def _parse_link_header(self, link_header, rel):
"""Parse Link header"""
if not link_header:
return None
links = link_header.split(", ")
for link in links:
parts = link.split("; ")
if len(parts) == 2 and f'rel="{rel}"' in parts[1]:
return parts[0].strip("<>")
return None
8.2 Rate Limiting Handling
import requests
import time
from functools import wraps
class RateLimiter:
"""Rate Limiting handler class"""
def __init__(self, calls_per_minute=60):
self.calls_per_minute = calls_per_minute
self.min_interval = 60.0 / calls_per_minute
self.last_call_time = 0
def wait(self):
"""Wait if necessary"""
elapsed = time.time() - self.last_call_time
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
self.last_call_time = time.time()
def rate_limited(calls_per_minute=60):
"""Rate limiting decorator"""
min_interval = 60.0 / calls_per_minute
last_call = [0] # Wrap in mutable object
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_call[0]
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
last_call[0] = time.time()
return func(*args, **kwargs)
return wrapper
return decorator
@rate_limited(calls_per_minute=30)
def call_api(url):
"""API call with rate limiting"""
return requests.get(url)
# Retry with exponential backoff
def api_call_with_retry(url, max_retries=3, base_delay=1):
"""API call with retry logic"""
for attempt in range(max_retries):
response = requests.get(url)
if response.status_code == 200:
return response.json()
elif response.status_code == 429: # Too Many Requests
delay = base_delay * (2 ** attempt) # Exponential backoff
print(f"Rate limited. Waiting {delay} seconds...")
time.sleep(delay)
else:
raise Exception(f"API error: {response.status_code}")
raise Exception("Max retries exceeded")
Conclusion
In this article, we learned various API integration techniques using Python.
- REST API Basics: HTTP methods and status codes
- requests Library: Sessions, headers, and timeout settings
- Authentication: API Key, Bearer Token, OAuth 2.0
- JSON Handling: Parsing and safe data access
- Public APIs: Weather, news, and search APIs
- AI APIs: OpenAI ChatGPT integration
- Best Practices: Pagination and rate limiting
APIs are the foundation of modern data-driven applications. By mastering API integration, you can build powerful automation systems that collect, process, and act on data from various sources.
In the next Part 8, we'll cover task scheduling and automation. You'll learn how to schedule your automation scripts to run automatically using cron, Windows Task Scheduler, and Python's schedule library.