Introduction: APIs Open the World of Data

The core of modern web services is the API (Application Programming Interface). Through APIs, we can programmatically collect and utilize various information such as weather data, stock prices, news, and social media data. With Python's requests library, you can handle these API calls very simply.

In this part, we'll systematically learn from the basic concepts of REST APIs, through various authentication methods, practical API usage, to efficient data collection strategies.

1. API Basics: Understanding REST APIs

1.1 What is an API?

An API is an interface that enables communication between different software. Think of it like a waiter in a restaurant, acting as an intermediary between the customer (client) and the kitchen (server). When a customer orders from the menu, the waiter delivers it to the kitchen, and when the food is ready, brings it back to the customer.

# Basic structure of API requests
import requests

# 1. Send request
response = requests.get("https://api.example.com/data")

# 2. Receive response
if response.status_code == 200:
    data = response.json()
    print(data)
else:
    print(f"Error occurred: {response.status_code}")

1.2 Core Concepts of REST API

REST (Representational State Transfer) is the standard architecture for web API design.

# HTTP methods by purpose
"""
GET    : Retrieve data (Read)
POST   : Create data (Create)
PUT    : Update entire data (Update)
PATCH  : Partial update (Partial Update)
DELETE : Delete data (Delete)
"""

import requests

base_url = "https://api.example.com"

# GET - Retrieve data
response = requests.get(f"{base_url}/users")

# POST - Create data
new_user = {"name": "John Doe", "email": "john@example.com"}
response = requests.post(f"{base_url}/users", json=new_user)

# PUT - Update data
updated_user = {"name": "John Doe", "email": "newemail@example.com"}
response = requests.put(f"{base_url}/users/1", json=updated_user)

# DELETE - Delete data
response = requests.delete(f"{base_url}/users/1")

1.3 Understanding HTTP Status Codes

# Major HTTP status codes
status_codes = {
    # 2xx: Success
    200: "OK - Request successful",
    201: "Created - Creation successful",
    204: "No Content - Success (no response body)",

    # 3xx: Redirection
    301: "Moved Permanently - Permanent redirect",
    302: "Found - Temporary redirect",

    # 4xx: Client errors
    400: "Bad Request - Invalid request",
    401: "Unauthorized - Authentication required",
    403: "Forbidden - Access denied",
    404: "Not Found - Resource not found",
    429: "Too Many Requests - Rate limit exceeded",

    # 5xx: Server errors
    500: "Internal Server Error - Server internal error",
    502: "Bad Gateway - Gateway error",
    503: "Service Unavailable - Service unavailable"
}

def handle_response(response):
    """Handle response based on status code"""
    code = response.status_code

    if 200 <= code < 300:
        print(f"Success: {status_codes.get(code, 'Unknown success code')}")
        return response.json() if response.content else None
    elif 400 <= code < 500:
        print(f"Client error: {status_codes.get(code, 'Unknown error')}")
        return None
    elif 500 <= code < 600:
        print(f"Server error: {status_codes.get(code, 'Unknown server error')}")
        return None

2. Complete Guide to the requests Library

2.1 Basic Usage

import requests

# Installation: pip install requests

# Basic GET request
response = requests.get("https://api.github.com/users/octocat")
print(response.status_code)  # 200
print(response.headers)       # Response headers
print(response.text)          # Response as text
print(response.json())        # Parsed JSON dictionary

# Passing URL parameters
params = {
    "q": "python",
    "sort": "stars",
    "order": "desc"
}
response = requests.get(
    "https://api.github.com/search/repositories",
    params=params
)
# Actual request URL: https://api.github.com/search/repositories?q=python&sort=stars&order=desc

2.2 Headers and Timeout Settings

import requests

# Custom header settings
headers = {
    "User-Agent": "MyApp/1.0",
    "Accept": "application/json",
    "Content-Type": "application/json"
}

# Timeout settings (connection timeout, read timeout)
try:
    response = requests.get(
        "https://api.example.com/data",
        headers=headers,
        timeout=(5, 30)  # Connection: 5s, Read: 30s
    )
except requests.exceptions.Timeout:
    print("Request timed out.")
except requests.exceptions.ConnectionError:
    print("Connection failed.")
except requests.exceptions.RequestException as e:
    print(f"Request error: {e}")

2.3 Using Sessions

import requests

# Using sessions improves performance by reusing connections
session = requests.Session()

# Apply default settings to session
session.headers.update({
    "User-Agent": "MyApp/1.0",
    "Accept": "application/json"
})

# Use session for multiple requests to the same host
urls = [
    "https://api.github.com/users/octocat",
    "https://api.github.com/users/octocat/repos",
    "https://api.github.com/users/octocat/followers"
]

for url in urls:
    response = session.get(url)
    print(f"{url}: {response.status_code}")

# Close session
session.close()

# Or use context manager (recommended)
with requests.Session() as session:
    session.headers.update({"User-Agent": "MyApp/1.0"})
    response = session.get("https://api.github.com/users/octocat")

3. API Authentication Methods

3.1 API Key Authentication

import requests

# Method 1: Pass as URL parameter
api_key = "your_api_key_here"
response = requests.get(
    "https://api.example.com/data",
    params={"api_key": api_key}
)

# Method 2: Pass in header
headers = {"X-API-Key": api_key}
response = requests.get(
    "https://api.example.com/data",
    headers=headers
)

# Method 3: Pass in Authorization header
headers = {"Authorization": f"Api-Key {api_key}"}
response = requests.get(
    "https://api.example.com/data",
    headers=headers
)

3.2 Bearer Token Authentication (OAuth 2.0)

import requests

# Using Bearer Token
access_token = "your_access_token_here"

headers = {
    "Authorization": f"Bearer {access_token}",
    "Content-Type": "application/json"
}

response = requests.get(
    "https://api.example.com/protected/resource",
    headers=headers
)

# OAuth 2.0 token issuance example
def get_oauth_token(client_id, client_secret, token_url):
    """Get token using OAuth 2.0 Client Credentials method"""
    data = {
        "grant_type": "client_credentials",
        "client_id": client_id,
        "client_secret": client_secret
    }

    response = requests.post(token_url, data=data)

    if response.status_code == 200:
        token_data = response.json()
        return token_data.get("access_token")
    else:
        raise Exception(f"Token issuance failed: {response.text}")

# Usage
# token = get_oauth_token("my_client_id", "my_secret", "https://auth.example.com/token")

3.3 Basic Authentication

import requests
from requests.auth import HTTPBasicAuth

# Method 1: Using auth parameter
response = requests.get(
    "https://api.example.com/data",
    auth=HTTPBasicAuth("username", "password")
)

# Method 2: Simple tuple form
response = requests.get(
    "https://api.example.com/data",
    auth=("username", "password")
)

4. JSON Response Handling

4.1 JSON Data Parsing

import requests
import json

response = requests.get("https://api.github.com/users/octocat")
data = response.json()

# Access data
print(f"Username: {data['login']}")
print(f"Name: {data.get('name', 'None')}")  # Safe access

# Handling nested JSON
def safe_get(data, *keys, default=None):
    """Safely get value from nested dictionary"""
    for key in keys:
        if isinstance(data, dict):
            data = data.get(key, default)
        elif isinstance(data, list) and isinstance(key, int):
            try:
                data = data[key]
            except IndexError:
                return default
        else:
            return default
    return data

# Usage example
nested_data = {
    "user": {
        "profile": {
            "name": "John Doe",
            "contacts": [
                {"type": "email", "value": "john@example.com"}
            ]
        }
    }
}

name = safe_get(nested_data, "user", "profile", "name")
email = safe_get(nested_data, "user", "profile", "contacts", 0, "value")
print(f"Name: {name}, Email: {email}")

4.2 Saving and Loading JSON Data

import json
from pathlib import Path

def save_json(data, filepath, indent=2, ensure_ascii=False):
    """Save JSON data to file"""
    filepath = Path(filepath)
    filepath.parent.mkdir(parents=True, exist_ok=True)

    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=indent, ensure_ascii=ensure_ascii)

    print(f"Saved: {filepath}")

def load_json(filepath):
    """Load data from JSON file"""
    with open(filepath, 'r', encoding='utf-8') as f:
        return json.load(f)

# Usage example
api_data = {"name": "John Doe", "age": 30, "city": "New York"}
save_json(api_data, "data/user_info.json")
loaded_data = load_json("data/user_info.json")

5. Public Data APIs

5.1 Using Public Data APIs

import requests

class WeatherAPI:
    """OpenWeatherMap API Client"""

    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = "https://api.openweathermap.org/data/2.5"

    def get_current_weather(self, city, units="metric"):
        """Get current weather for a city"""
        url = f"{self.base_url}/weather"

        params = {
            "q": city,
            "appid": self.api_key,
            "units": units
        }

        response = requests.get(url, params=params)

        if response.status_code == 200:
            return self._parse_weather(response.json())
        return None

    def _parse_weather(self, data):
        """Parse weather data"""
        return {
            "city": data["name"],
            "temperature": data["main"]["temp"],
            "feels_like": data["main"]["feels_like"],
            "humidity": data["main"]["humidity"],
            "description": data["weather"][0]["description"],
            "wind_speed": data["wind"]["speed"]
        }

    def get_forecast(self, city, units="metric"):
        """Get 5-day forecast"""
        url = f"{self.base_url}/forecast"

        params = {
            "q": city,
            "appid": self.api_key,
            "units": units
        }

        response = requests.get(url, params=params)
        return response.json() if response.status_code == 200 else None

# Usage example
# weather = WeatherAPI("your_api_key")
# current = weather.get_current_weather("London")
# forecast = weather.get_forecast("New York")

5.2 News API

import requests
from datetime import datetime, timedelta

class NewsAPI:
    """NewsAPI Client"""

    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = "https://newsapi.org/v2"
        self.headers = {"X-Api-Key": api_key}

    def get_top_headlines(self, country="us", category=None, page_size=10):
        """Get top headlines"""
        url = f"{self.base_url}/top-headlines"

        params = {
            "country": country,
            "pageSize": page_size
        }

        if category:
            params["category"] = category  # business, technology, sports, etc.

        response = requests.get(url, headers=self.headers, params=params)
        return response.json() if response.status_code == 200 else None

    def search_news(self, query, from_date=None, to_date=None, sort_by="publishedAt"):
        """Search news articles"""
        url = f"{self.base_url}/everything"

        params = {
            "q": query,
            "sortBy": sort_by,  # relevancy, popularity, publishedAt
            "language": "en"
        }

        if from_date:
            params["from"] = from_date
        if to_date:
            params["to"] = to_date

        response = requests.get(url, headers=self.headers, params=params)
        return response.json() if response.status_code == 200 else None

# Usage example
# news = NewsAPI("your_api_key")
# headlines = news.get_top_headlines(country="us", category="technology")
# search_results = news.search_news("artificial intelligence")

6. Google and Twitter API Integration

6.1 Google Custom Search API

import requests

class GoogleSearchAPI:
    """Google Custom Search API Client"""

    def __init__(self, api_key, search_engine_id):
        self.api_key = api_key
        self.search_engine_id = search_engine_id
        self.base_url = "https://www.googleapis.com/customsearch/v1"

    def search(self, query, num=10, start=1, search_type=None):
        """Search the web"""
        params = {
            "key": self.api_key,
            "cx": self.search_engine_id,
            "q": query,
            "num": num,
            "start": start
        }

        if search_type:
            params["searchType"] = search_type  # "image" for image search

        response = requests.get(self.base_url, params=params)
        return response.json() if response.status_code == 200 else None

    def search_images(self, query, num=10):
        """Search images"""
        return self.search(query, num=num, search_type="image")

# Usage example
# google = GoogleSearchAPI("your_api_key", "your_search_engine_id")
# results = google.search("Python programming")
# images = google.search_images("landscape wallpaper")

6.2 Twitter/X API

import requests

class TwitterAPI:
    """Twitter API v2 Client"""

    def __init__(self, bearer_token):
        self.bearer_token = bearer_token
        self.base_url = "https://api.twitter.com/2"
        self.headers = {
            "Authorization": f"Bearer {bearer_token}"
        }

    def search_recent_tweets(self, query, max_results=10):
        """Search recent tweets"""
        url = f"{self.base_url}/tweets/search/recent"

        params = {
            "query": query,
            "max_results": max_results,
            "tweet.fields": "created_at,author_id,public_metrics"
        }

        response = requests.get(url, headers=self.headers, params=params)
        return response.json() if response.status_code == 200 else None

    def get_user(self, username):
        """Get user information"""
        url = f"{self.base_url}/users/by/username/{username}"

        params = {
            "user.fields": "description,public_metrics,created_at"
        }

        response = requests.get(url, headers=self.headers, params=params)
        return response.json() if response.status_code == 200 else None

    def get_user_tweets(self, user_id, max_results=10):
        """Get user's tweets"""
        url = f"{self.base_url}/users/{user_id}/tweets"

        params = {
            "max_results": max_results,
            "tweet.fields": "created_at,public_metrics"
        }

        response = requests.get(url, headers=self.headers, params=params)
        return response.json() if response.status_code == 200 else None

# Usage example
# twitter = TwitterAPI("your_bearer_token")
# tweets = twitter.search_recent_tweets("Python programming")
# user = twitter.get_user("elonmusk")

7. OpenAI API Integration

7.1 Basic ChatGPT API Usage

import requests

class OpenAIClient:
    """OpenAI API Client"""

    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = "https://api.openai.com/v1"
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }

    def chat_completion(self, messages, model="gpt-4o", temperature=0.7, max_tokens=1000):
        """Call Chat Completion API"""
        url = f"{self.base_url}/chat/completions"

        payload = {
            "model": model,
            "messages": messages,
            "temperature": temperature,
            "max_tokens": max_tokens
        }

        response = requests.post(url, headers=self.headers, json=payload)

        if response.status_code == 200:
            return response.json()['choices'][0]['message']['content']
        else:
            raise Exception(f"API error: {response.text}")

    def simple_chat(self, user_message, system_prompt=None):
        """Simple chat"""
        messages = []

        if system_prompt:
            messages.append({"role": "system", "content": system_prompt})

        messages.append({"role": "user", "content": user_message})

        return self.chat_completion(messages)

    def generate_image(self, prompt, size="1024x1024", n=1):
        """DALL-E image generation"""
        url = f"{self.base_url}/images/generations"

        payload = {
            "model": "dall-e-3",
            "prompt": prompt,
            "size": size,
            "n": n
        }

        response = requests.post(url, headers=self.headers, json=payload)

        if response.status_code == 200:
            return response.json()['data'][0]['url']
        else:
            raise Exception(f"API error: {response.text}")

# Usage example
# openai = OpenAIClient("your_api_key")
# response = openai.simple_chat(
#     "How do I do web scraping with Python?",
#     system_prompt="You are a Python expert. Explain with code examples."
# )
# print(response)

7.2 Streaming Response Handling

import requests
import json

def stream_chat_completion(api_key, messages, model="gpt-4o"):
    """Receive response via streaming"""
    url = "https://api.openai.com/v1/chat/completions"

    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }

    payload = {
        "model": model,
        "messages": messages,
        "stream": True
    }

    response = requests.post(url, headers=headers, json=payload, stream=True)

    full_response = ""

    for line in response.iter_lines():
        if line:
            line = line.decode('utf-8')
            if line.startswith('data: '):
                data = line[6:]  # Remove 'data: '

                if data == '[DONE]':
                    break

                try:
                    chunk = json.loads(data)
                    content = chunk['choices'][0]['delta'].get('content', '')
                    if content:
                        print(content, end='', flush=True)
                        full_response += content
                except json.JSONDecodeError:
                    continue

    print()  # Newline
    return full_response

# Usage example
# messages = [{"role": "user", "content": "Tell me 5 benefits of Python"}]
# response = stream_chat_completion("your_api_key", messages)

8. Pagination and Rate Limiting

8.1 Pagination Handling

import requests
import time

class PaginatedAPIClient:
    """API client with pagination support"""

    def __init__(self, base_url, headers=None):
        self.base_url = base_url
        self.headers = headers or {}

    def get_all_pages_offset(self, endpoint, page_size=100, max_pages=None):
        """Offset-based pagination (page, per_page style)"""
        all_data = []
        page = 1

        while True:
            params = {
                "page": page,
                "per_page": page_size
            }

            response = requests.get(
                f"{self.base_url}{endpoint}",
                headers=self.headers,
                params=params
            )

            data = response.json()

            if not data:  # Empty response means done
                break

            all_data.extend(data)

            if len(data) < page_size:  # Last page
                break

            if max_pages and page >= max_pages:
                break

            page += 1
            time.sleep(0.5)  # Rate limiting prevention

        return all_data

    def get_all_pages_cursor(self, endpoint, cursor_field="cursor"):
        """Cursor-based pagination"""
        all_data = []
        cursor = None

        while True:
            params = {}
            if cursor:
                params[cursor_field] = cursor

            response = requests.get(
                f"{self.base_url}{endpoint}",
                headers=self.headers,
                params=params
            )

            result = response.json()
            data = result.get("data", [])
            all_data.extend(data)

            # Exit if no next cursor
            cursor = result.get("next_cursor")
            if not cursor:
                break

            time.sleep(0.5)

        return all_data

    def get_all_pages_link_header(self, endpoint):
        """Link header-based pagination (GitHub style)"""
        all_data = []
        url = f"{self.base_url}{endpoint}"

        while url:
            response = requests.get(url, headers=self.headers)
            all_data.extend(response.json())

            # Extract next page URL from Link header
            link_header = response.headers.get("Link", "")
            url = self._parse_link_header(link_header, "next")

            time.sleep(0.5)

        return all_data

    def _parse_link_header(self, link_header, rel):
        """Parse Link header"""
        if not link_header:
            return None

        links = link_header.split(", ")
        for link in links:
            parts = link.split("; ")
            if len(parts) == 2 and f'rel="{rel}"' in parts[1]:
                return parts[0].strip("<>")

        return None

8.2 Rate Limiting Handling

import requests
import time
from functools import wraps

class RateLimiter:
    """Rate Limiting handler class"""

    def __init__(self, calls_per_minute=60):
        self.calls_per_minute = calls_per_minute
        self.min_interval = 60.0 / calls_per_minute
        self.last_call_time = 0

    def wait(self):
        """Wait if necessary"""
        elapsed = time.time() - self.last_call_time
        if elapsed < self.min_interval:
            time.sleep(self.min_interval - elapsed)
        self.last_call_time = time.time()

def rate_limited(calls_per_minute=60):
    """Rate limiting decorator"""
    min_interval = 60.0 / calls_per_minute
    last_call = [0]  # Wrap in mutable object

    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_call[0]
            if elapsed < min_interval:
                time.sleep(min_interval - elapsed)
            last_call[0] = time.time()
            return func(*args, **kwargs)
        return wrapper
    return decorator

@rate_limited(calls_per_minute=30)
def call_api(url):
    """API call with rate limiting"""
    return requests.get(url)

# Retry with exponential backoff
def api_call_with_retry(url, max_retries=3, base_delay=1):
    """API call with retry logic"""
    for attempt in range(max_retries):
        response = requests.get(url)

        if response.status_code == 200:
            return response.json()
        elif response.status_code == 429:  # Too Many Requests
            delay = base_delay * (2 ** attempt)  # Exponential backoff
            print(f"Rate limited. Waiting {delay} seconds...")
            time.sleep(delay)
        else:
            raise Exception(f"API error: {response.status_code}")

    raise Exception("Max retries exceeded")

Conclusion

In this article, we learned various API integration techniques using Python.

  • REST API Basics: HTTP methods and status codes
  • requests Library: Sessions, headers, and timeout settings
  • Authentication: API Key, Bearer Token, OAuth 2.0
  • JSON Handling: Parsing and safe data access
  • Public APIs: Weather, news, and search APIs
  • AI APIs: OpenAI ChatGPT integration
  • Best Practices: Pagination and rate limiting

APIs are the foundation of modern data-driven applications. By mastering API integration, you can build powerful automation systems that collect, process, and act on data from various sources.

In the next Part 8, we'll cover task scheduling and automation. You'll learn how to schedule your automation scripts to run automatically using cron, Windows Task Scheduler, and Python's schedule library.