はじめに:なぜSeleniumが必要なのか?

第3編で学んだrequestsとBeautifulSoupは、静的HTMLをパースするのに効果的です。しかし、現代のWebサイトの多くはJavaScriptで動的にコンテンツを生成しています。このようなページでは、requestsで取得したHTMLに目的のデータがない場合が多いです。

Seleniumは、実際のWebブラウザを自動制御して、JavaScriptが実行された後の完全なページを取得できます。また、クリック、入力、スクロールなどのユーザーインタラクションをシミュレートできるため、ログイン、無限スクロール、ページネーション処理に便利です。

1. Seleniumのインストールとウェブドライバー設定

1.1 Seleniumのインストール

# Seleniumをインストール
pip install selenium

# ウェブドライバー自動管理のためのwebdriver-managerをインストール
pip install webdriver-manager

1.2 WebDriverの設定

Seleniumは、ブラウザを制御するためにWebDriverが必要です。webdriver-managerを使用すると、ドライバーを自動的にダウンロードして管理できます。

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager

# Chromeブラウザの設定(自動ドライバー管理)
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service)

# Webページを開く
driver.get('https://www.google.com')

# ページタイトルを出力
print(driver.title)

# ブラウザを終了
driver.quit()

1.3 ブラウザオプション設定

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager

def create_driver(headless=False):
    """Chromeドライバーを作成します。"""
    options = Options()

    # ヘッドレスモード(ブラウザウィンドウなしで実行)
    if headless:
        options.add_argument('--headless')

    # 基本オプション設定
    options.add_argument('--no-sandbox')
    options.add_argument('--disable-dev-shm-usage')
    options.add_argument('--disable-gpu')

    # ウィンドウサイズ設定
    options.add_argument('--window-size=1920,1080')

    # User-Agent設定
    options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')

    # 自動化検出防止
    options.add_argument('--disable-blink-features=AutomationControlled')
    options.add_experimental_option('excludeSwitches', ['enable-automation'])
    options.add_experimental_option('useAutomationExtension', False)

    # ドライバーを作成
    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=options)

    # 自動化検出回避のためのスクリプト実行
    driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")

    return driver

# 使用例
driver = create_driver(headless=True)
driver.get('https://example.com')
print(driver.page_source)
driver.quit()

2. ブラウザ自動化の基本

2.1 要素の検索

Selenium 4.xでは、Byクラスを使用して要素を検索します。

from selenium import webdriver
from selenium.webdriver.common.by import By

driver = webdriver.Chrome()
driver.get('https://example.com')

# さまざまな方法で要素を検索
# IDで検索
element = driver.find_element(By.ID, 'main-content')

# クラス名で検索
element = driver.find_element(By.CLASS_NAME, 'article')

# CSSセレクターで検索
element = driver.find_element(By.CSS_SELECTOR, 'div.content > p')

# XPathで検索
element = driver.find_element(By.XPATH, '//div[@class="content"]/p')

# タグ名で検索
elements = driver.find_elements(By.TAG_NAME, 'a')

# リンクテキストで検索
element = driver.find_element(By.LINK_TEXT, '詳細を見る')
element = driver.find_element(By.PARTIAL_LINK_TEXT, '詳細')

# Name属性で検索
element = driver.find_element(By.NAME, 'username')

# 複数の要素を検索(リストを返す)
all_links = driver.find_elements(By.TAG_NAME, 'a')
for link in all_links:
    print(link.text, link.get_attribute('href'))

2.2 要素のインタラクション

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys

driver = webdriver.Chrome()
driver.get('https://www.google.com')

# テキスト入力
search_box = driver.find_element(By.NAME, 'q')
search_box.send_keys('Python Selenium')

# キーボード入力
search_box.send_keys(Keys.ENTER)  # Enterキー
# または
search_box.send_keys(Keys.CONTROL, 'a')  # Ctrl+A(全選択)

# クリック
button = driver.find_element(By.CSS_SELECTOR, 'button[type="submit"]')
button.click()

# テキストを取得
element = driver.find_element(By.ID, 'result')
print(element.text)

# 属性を取得
link = driver.find_element(By.TAG_NAME, 'a')
print(link.get_attribute('href'))

# 入力フィールドをクリア
input_field = driver.find_element(By.NAME, 'username')
input_field.clear()
input_field.send_keys('new_value')

# フォーム送信
form = driver.find_element(By.TAG_NAME, 'form')
form.submit()

2.3 スクロール処理

from selenium import webdriver
from selenium.webdriver.common.by import By
import time

driver = webdriver.Chrome()
driver.get('https://example.com/infinite-scroll')

def scroll_to_bottom(driver, pause_time=2):
    """ページの一番下までスクロールします。"""
    last_height = driver.execute_script("return document.body.scrollHeight")

    while True:
        # ページの一番下へスクロール
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

        # ページ読み込み待機
        time.sleep(pause_time)

        # 新しい高さを計算
        new_height = driver.execute_script("return document.body.scrollHeight")

        # これ以上スクロールできなければ終了
        if new_height == last_height:
            break

        last_height = new_height

def scroll_by_pixel(driver, pixels=500):
    """指定されたピクセル分スクロールします。"""
    driver.execute_script(f"window.scrollBy(0, {pixels});")

def scroll_to_element(driver, element):
    """特定の要素が見えるようにスクロールします。"""
    driver.execute_script("arguments[0].scrollIntoView(true);", element)

# 使用例
scroll_to_bottom(driver)

# 特定の要素へスクロール
target = driver.find_element(By.ID, 'target-section')
scroll_to_element(driver, target)

3. 待機戦略(Wait)

動的なWebページでは、要素がロードされるまで待つ必要があります。Seleniumは2種類の待機方式を提供しています。

3.1 暗黙的待機(Implicit Wait)

グローバルに設定され、要素が見つかるまで指定された時間だけ待機します。

from selenium import webdriver

driver = webdriver.Chrome()

# 暗黙的待機を設定(最大10秒)
driver.implicitly_wait(10)

# これ以降のすべてのfind_element呼び出しは最大10秒まで待機
driver.get('https://example.com')
element = driver.find_element(By.ID, 'dynamic-content')

3.2 明示的待機(Explicit Wait)

特定の条件が満たされるまで待機します。より柔軟で推奨される方式です。

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

driver = webdriver.Chrome()
driver.get('https://example.com')

# 要素が現れるまで待機(最大10秒)
wait = WebDriverWait(driver, 10)

# 要素が存在するまで待機
element = wait.until(EC.presence_of_element_located((By.ID, 'content')))

# 要素が表示されるまで待機
element = wait.until(EC.visibility_of_element_located((By.ID, 'content')))

# 要素がクリック可能になるまで待機
button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, 'button.submit')))

# 特定のテキストが現れるまで待機
wait.until(EC.text_to_be_present_in_element((By.ID, 'status'), '完了'))

# 要素が消えるまで待機
wait.until(EC.invisibility_of_element_located((By.ID, 'loading')))

# 新しいウィンドウが開くまで待機
wait.until(EC.number_of_windows_to_be(2))

# URLが特定の値を含むまで待機
wait.until(EC.url_contains('/success'))

3.3 カスタム待機条件

from selenium.webdriver.support.ui import WebDriverWait

def wait_for_ajax(driver, timeout=30):
    """AJAXリクエストが完了するまで待機します。"""
    wait = WebDriverWait(driver, timeout)
    wait.until(lambda d: d.execute_script('return jQuery.active == 0'))

def wait_for_page_load(driver, timeout=30):
    """ページが完全に読み込まれるまで待機します。"""
    wait = WebDriverWait(driver, timeout)
    wait.until(lambda d: d.execute_script('return document.readyState') == 'complete')

def wait_for_element_count(driver, locator, count, timeout=10):
    """特定の数の要素が現れるまで待機します。"""
    wait = WebDriverWait(driver, timeout)
    wait.until(lambda d: len(d.find_elements(*locator)) >= count)

# 使用例
driver.get('https://example.com')
wait_for_page_load(driver)
wait_for_element_count(driver, (By.CSS_SELECTOR, '.item'), 10)

4. ページネーション処理

4.1 ボタンクリック方式

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException

def scrape_with_pagination(driver, item_selector, next_button_selector, max_pages=10):
    """ページネーションがあるサイトをスクレイピングします。"""
    all_items = []
    current_page = 1

    while current_page <= max_pages:
        print(f"ページ{current_page}をスクレイピング中...")

        # 現在のページのアイテムを収集
        wait = WebDriverWait(driver, 10)
        items = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, item_selector)))

        for item in items:
            all_items.append(item.text)

        # 次のページボタンを検索
        try:
            next_button = driver.find_element(By.CSS_SELECTOR, next_button_selector)

            # ボタンが無効化されているか確認
            if 'disabled' in next_button.get_attribute('class') or not next_button.is_enabled():
                print("最後のページです。")
                break

            # 次のページへ移動
            next_button.click()

            # 新しいコンテンツの読み込み待機
            time.sleep(2)
            current_page += 1

        except NoSuchElementException:
            print("次のページボタンが見つかりません。")
            break
        except TimeoutException:
            print("ページ読み込みタイムアウト")
            break

    return all_items

# 使用例
driver = webdriver.Chrome()
driver.get('https://example.com/products')

items = scrape_with_pagination(
    driver,
    item_selector='.product-item',
    next_button_selector='.pagination .next',
    max_pages=5
)

print(f"合計{len(items)}件のアイテムを収集しました。")

4.2 URLパラメータ方式

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time

def scrape_url_pagination(base_url, item_selector, start_page=1, max_pages=10):
    """URLパラメータベースのページネーションを処理します。"""
    driver = webdriver.Chrome()
    all_items = []

    try:
        for page in range(start_page, start_page + max_pages):
            url = f"{base_url}?page={page}"
            print(f"スクレイピング:{url}")

            driver.get(url)

            # コンテンツの読み込み待機
            wait = WebDriverWait(driver, 10)

            try:
                items = wait.until(EC.presence_of_all_elements_located(
                    (By.CSS_SELECTOR, item_selector)
                ))

                if not items:
                    print("アイテムがありません。")
                    break

                for item in items:
                    all_items.append({
                        'text': item.text,
                        'page': page
                    })

                time.sleep(1)  # サーバー負荷防止

            except Exception as e:
                print(f"ページ{page}処理中にエラー:{e}")
                break

    finally:
        driver.quit()

    return all_items

# 使用例
items = scrape_url_pagination(
    base_url='https://example.com/search',
    item_selector='.result-item',
    max_pages=5
)

5. ログインが必要なサイトのスクレイピング

5.1 基本的なログイン処理

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time

class LoginScraper:
    """ログインが必要なサイトのスクレイパー"""

    def __init__(self, headless=False):
        options = webdriver.ChromeOptions()
        if headless:
            options.add_argument('--headless')
        self.driver = webdriver.Chrome(options=options)
        self.wait = WebDriverWait(self.driver, 10)

    def login(self, login_url, username, password, username_field, password_field, submit_button):
        """Webサイトにログインします。"""
        self.driver.get(login_url)

        # ユーザー名を入力
        username_input = self.wait.until(
            EC.presence_of_element_located((By.CSS_SELECTOR, username_field))
        )
        username_input.clear()
        username_input.send_keys(username)

        # パスワードを入力
        password_input = self.driver.find_element(By.CSS_SELECTOR, password_field)
        password_input.clear()
        password_input.send_keys(password)

        # ログインボタンをクリック
        submit_btn = self.driver.find_element(By.CSS_SELECTOR, submit_button)
        submit_btn.click()

        # ログイン完了待機
        time.sleep(3)

        return self.is_logged_in()

    def is_logged_in(self):
        """ログイン状態を確認します。"""
        # ログアウトボタンやユーザープロフィール要素を確認
        try:
            self.driver.find_element(By.CSS_SELECTOR, '.logout-btn, .user-profile')
            return True
        except:
            return False

    def scrape_protected_page(self, url, selector):
        """ログイン後、保護されたページをスクレイピングします。"""
        if not self.is_logged_in():
            raise Exception("ログインが必要です。")

        self.driver.get(url)
        elements = self.wait.until(
            EC.presence_of_all_elements_located((By.CSS_SELECTOR, selector))
        )
        return [el.text for el in elements]

    def close(self):
        """ブラウザを終了します。"""
        self.driver.quit()

# 使用例
scraper = LoginScraper()

try:
    # ログイン
    logged_in = scraper.login(
        login_url='https://example.com/login',
        username='your_username',
        password='your_password',
        username_field='input[name="username"]',
        password_field='input[name="password"]',
        submit_button='button[type="submit"]'
    )

    if logged_in:
        print("ログイン成功!")
        # 保護されたページをスクレイピング
        data = scraper.scrape_protected_page(
            url='https://example.com/dashboard',
            selector='.dashboard-item'
        )
        print(data)
    else:
        print("ログイン失敗")

finally:
    scraper.close()

5.2 Cookieを使用したセッション維持

import pickle
import os
from selenium import webdriver

def save_cookies(driver, filepath):
    """Cookieをファイルに保存します。"""
    with open(filepath, 'wb') as f:
        pickle.dump(driver.get_cookies(), f)

def load_cookies(driver, filepath):
    """ファイルからCookieを読み込みます。"""
    if os.path.exists(filepath):
        with open(filepath, 'rb') as f:
            cookies = pickle.load(f)
            for cookie in cookies:
                # Cookie有効期限処理
                if 'expiry' in cookie:
                    del cookie['expiry']
                driver.add_cookie(cookie)
        return True
    return False

# 使用例
driver = webdriver.Chrome()
cookie_file = 'cookies.pkl'

# まずサイトにアクセス
driver.get('https://example.com')

# 保存されたCookieがあれば読み込み
if load_cookies(driver, cookie_file):
    driver.refresh()  # Cookie適用のためにリフレッシュ
    print("Cookie読み込み完了")
else:
    # ログインを実行
    # ... (ログインロジック)
    # ログイン後Cookieを保存
    save_cookies(driver, cookie_file)
    print("Cookie保存完了")

6. データ保存

6.1 CSVファイルに保存

import csv
from datetime import datetime

def save_to_csv(data, filename, fieldnames=None):
    """データをCSVファイルに保存します。"""
    if not data:
        print("保存するデータがありません。")
        return

    # フィールド名がなければ最初のデータのキーを使用
    if fieldnames is None:
        fieldnames = data[0].keys()

    with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(data)

    print(f"データが{filename}に保存されました。")

# 使用例
products = [
    {'name': '商品 A', 'price': 10000, 'rating': 4.5},
    {'name': '商品 B', 'price': 20000, 'rating': 4.8},
    {'name': '商品 C', 'price': 15000, 'rating': 4.2},
]

save_to_csv(products, 'products.csv')

6.2 JSONファイルに保存

import json

def save_to_json(data, filename, indent=2):
    """データをJSONファイルに保存します。"""
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=indent)

    print(f"データが{filename}に保存されました。")

def load_from_json(filename):
    """JSONファイルからデータを読み込みます。"""
    with open(filename, 'r', encoding='utf-8') as f:
        return json.load(f)

# 使用例
data = {
    'scraped_at': '2026-01-22',
    'total_items': 100,
    'items': [
        {'title': 'ニュース 1', 'url': 'https://example.com/1'},
        {'title': 'ニュース 2', 'url': 'https://example.com/2'},
    ]
}

save_to_json(data, 'news_data.json')

6.3 SQLiteデータベースに保存

import sqlite3
from datetime import datetime

class DatabaseManager:
    """SQLiteデータベース管理クラス"""

    def __init__(self, db_name='scraped_data.db'):
        self.conn = sqlite3.connect(db_name)
        self.cursor = self.conn.cursor()

    def create_table(self, table_name, columns):
        """テーブルを作成します。"""
        columns_str = ', '.join([f'{name} {type_}' for name, type_ in columns])
        query = f'CREATE TABLE IF NOT EXISTS {table_name} ({columns_str})'
        self.cursor.execute(query)
        self.conn.commit()

    def insert(self, table_name, data):
        """データを挿入します。"""
        columns = ', '.join(data.keys())
        placeholders = ', '.join(['?' for _ in data])
        query = f'INSERT INTO {table_name} ({columns}) VALUES ({placeholders})'
        self.cursor.execute(query, list(data.values()))
        self.conn.commit()

    def insert_many(self, table_name, data_list):
        """複数のデータを一度に挿入します。"""
        if not data_list:
            return

        columns = ', '.join(data_list[0].keys())
        placeholders = ', '.join(['?' for _ in data_list[0]])
        query = f'INSERT INTO {table_name} ({columns}) VALUES ({placeholders})'

        values = [list(data.values()) for data in data_list]
        self.cursor.executemany(query, values)
        self.conn.commit()

    def select_all(self, table_name):
        """すべてのデータを取得します。"""
        query = f'SELECT * FROM {table_name}'
        self.cursor.execute(query)
        return self.cursor.fetchall()

    def close(self):
        """データベース接続を終了します。"""
        self.conn.close()

# 使用例
db = DatabaseManager('products.db')

# テーブルを作成
db.create_table('products', [
    ('id', 'INTEGER PRIMARY KEY AUTOINCREMENT'),
    ('name', 'TEXT'),
    ('price', 'INTEGER'),
    ('rating', 'REAL'),
    ('scraped_at', 'TEXT')
])

# データを挿入
products = [
    {'name': '商品 A', 'price': 10000, 'rating': 4.5, 'scraped_at': datetime.now().isoformat()},
    {'name': '商品 B', 'price': 20000, 'rating': 4.8, 'scraped_at': datetime.now().isoformat()},
]

db.insert_many('products', products)

# データを取得
all_products = db.select_all('products')
for product in all_products:
    print(product)

db.close()

7. アンチボット回避技法

注意:アンチボットシステムの回避は、該当サイトの利用規約に違反する可能性があります。必ず法的・倫理的基準を遵守してください。

7.1 基本的な回避技法

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import random
import time

def create_stealth_driver():
    """検出を避けるためのステルスドライバーを作成します。"""
    options = Options()

    # 自動化検出防止
    options.add_argument('--disable-blink-features=AutomationControlled')
    options.add_experimental_option('excludeSwitches', ['enable-automation'])
    options.add_experimental_option('useAutomationExtension', False)

    # 実際のブラウザのように見せる
    options.add_argument('--disable-infobars')
    options.add_argument('--disable-extensions')

    # ランダムUser-Agent
    user_agents = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36',
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0',
    ]
    options.add_argument(f'--user-agent={random.choice(user_agents)}')

    driver = webdriver.Chrome(options=options)

    # navigator.webdriver属性を隠す
    driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
        'source': '''
            Object.defineProperty(navigator, 'webdriver', {
                get: () => undefined
            })
        '''
    })

    return driver

def random_delay(min_seconds=1, max_seconds=3):
    """ランダムなディレイを適用します。"""
    time.sleep(random.uniform(min_seconds, max_seconds))

def human_like_typing(element, text):
    """人間のようにテキストを入力します。"""
    for char in text:
        element.send_keys(char)
        time.sleep(random.uniform(0.05, 0.2))

7.2 プロキシの使用

from selenium import webdriver
from selenium.webdriver.chrome.options import Options

def create_driver_with_proxy(proxy_address):
    """プロキシを使用するドライバーを作成します。"""
    options = Options()
    options.add_argument(f'--proxy-server={proxy_address}')

    driver = webdriver.Chrome(options=options)
    return driver

# 使用例
proxy = '123.456.789.012:8080'
driver = create_driver_with_proxy(proxy)

# IP確認
driver.get('https://httpbin.org/ip')
print(driver.page_source)

8. 実践プロジェクト:ニュースクローラー

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import json
import time
from datetime import datetime

class NewsCrawler:
    """ニュースサイトクローラー"""

    def __init__(self, headless=True):
        options = Options()
        if headless:
            options.add_argument('--headless')
        options.add_argument('--no-sandbox')
        options.add_argument('--disable-dev-shm-usage')
        options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')

        self.driver = webdriver.Chrome(options=options)
        self.wait = WebDriverWait(self.driver, 10)
        self.articles = []

    def crawl_news_list(self, url, article_selector, title_selector, link_selector, max_articles=20):
        """ニュース一覧ページをクロールします。"""
        self.driver.get(url)

        # ページ読み込み待機
        self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, article_selector)))

        articles = self.driver.find_elements(By.CSS_SELECTOR, article_selector)[:max_articles]

        for article in articles:
            try:
                title_elem = article.find_element(By.CSS_SELECTOR, title_selector)
                link_elem = article.find_element(By.CSS_SELECTOR, link_selector)

                self.articles.append({
                    'title': title_elem.text.strip(),
                    'url': link_elem.get_attribute('href'),
                    'crawled_at': datetime.now().isoformat()
                })
            except Exception as e:
                print(f"記事パースエラー:{e}")
                continue

            time.sleep(0.5)  # サーバー負荷防止

        return self.articles

    def crawl_article_content(self, url, content_selector, author_selector=None, date_selector=None):
        """個別のニュース記事の本文をクロールします。"""
        self.driver.get(url)

        article_data = {'url': url}

        try:
            # 本文を抽出
            content_elem = self.wait.until(
                EC.presence_of_element_located((By.CSS_SELECTOR, content_selector))
            )
            article_data['content'] = content_elem.text.strip()

            # 著者を抽出(オプション)
            if author_selector:
                try:
                    author_elem = self.driver.find_element(By.CSS_SELECTOR, author_selector)
                    article_data['author'] = author_elem.text.strip()
                except:
                    article_data['author'] = None

            # 日付を抽出(オプション)
            if date_selector:
                try:
                    date_elem = self.driver.find_element(By.CSS_SELECTOR, date_selector)
                    article_data['date'] = date_elem.text.strip()
                except:
                    article_data['date'] = None

        except Exception as e:
            print(f"本文抽出エラー:{e}")
            article_data['content'] = None

        return article_data

    def crawl_full_articles(self, content_selector, author_selector=None, date_selector=None):
        """一覧から収集したすべての記事の本文をクロールします。"""
        for i, article in enumerate(self.articles):
            print(f"[{i+1}/{len(self.articles)}] クロール中:{article['title'][:30]}...")

            content_data = self.crawl_article_content(
                article['url'],
                content_selector,
                author_selector,
                date_selector
            )

            article.update(content_data)
            time.sleep(1)  # サーバー負荷防止

        return self.articles

    def save_to_json(self, filename):
        """収集したデータをJSONファイルに保存します。"""
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(self.articles, f, ensure_ascii=False, indent=2)
        print(f"データが{filename}に保存されました。")

    def close(self):
        """ブラウザを終了します。"""
        self.driver.quit()

# 使用例
if __name__ == '__main__':
    crawler = NewsCrawler(headless=True)

    try:
        # ニュース一覧をクロール(実際のURLとセレクターは対象サイトに合わせて修正)
        print("ニュース一覧のクロールを開始...")
        crawler.crawl_news_list(
            url='https://news.example.com/tech',
            article_selector='.news-item',
            title_selector='.news-title',
            link_selector='a',
            max_articles=10
        )

        # 記事本文をクロール
        print("\n記事本文のクロールを開始...")
        crawler.crawl_full_articles(
            content_selector='.article-body',
            author_selector='.author-name',
            date_selector='.publish-date'
        )

        # 結果を保存
        crawler.save_to_json('news_articles.json')

        print(f"\n合計{len(crawler.articles)}件の記事を収集しました。")

    finally:
        crawler.close()

9. 実践プロジェクト:ショッピングサイト価格モニタリング

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import sqlite3
import time
from datetime import datetime

class PriceMonitor:
    """ショッピングサイト価格モニタリングクローラー"""

    def __init__(self, db_name='price_history.db'):
        # ブラウザ設定
        options = Options()
        options.add_argument('--headless')
        options.add_argument('--no-sandbox')
        options.add_argument('--disable-dev-shm-usage')

        self.driver = webdriver.Chrome(options=options)
        self.wait = WebDriverWait(self.driver, 10)

        # データベース設定
        self.conn = sqlite3.connect(db_name)
        self.cursor = self.conn.cursor()
        self._create_tables()

    def _create_tables(self):
        """データベーステーブルを作成します。"""
        self.cursor.execute('''
            CREATE TABLE IF NOT EXISTS products (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT,
                url TEXT UNIQUE,
                created_at TEXT
            )
        ''')

        self.cursor.execute('''
            CREATE TABLE IF NOT EXISTS price_history (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                product_id INTEGER,
                price INTEGER,
                recorded_at TEXT,
                FOREIGN KEY (product_id) REFERENCES products (id)
            )
        ''')

        self.conn.commit()

    def add_product(self, name, url):
        """モニタリングする商品を追加します。"""
        try:
            self.cursor.execute(
                'INSERT INTO products (name, url, created_at) VALUES (?, ?, ?)',
                (name, url, datetime.now().isoformat())
            )
            self.conn.commit()
            print(f"商品追加:{name}")
        except sqlite3.IntegrityError:
            print(f"登録済みの商品:{name}")

    def get_price(self, url, price_selector):
        """商品価格を取得します。"""
        self.driver.get(url)

        try:
            price_elem = self.wait.until(
                EC.presence_of_element_located((By.CSS_SELECTOR, price_selector))
            )

            # 価格テキストから数字のみを抽出
            price_text = price_elem.text
            price = int(''.join(filter(str.isdigit, price_text)))

            return price

        except Exception as e:
            print(f"価格抽出エラー:{e}")
            return None

    def record_price(self, product_id, price):
        """価格を記録します。"""
        self.cursor.execute(
            'INSERT INTO price_history (product_id, price, recorded_at) VALUES (?, ?, ?)',
            (product_id, price, datetime.now().isoformat())
        )
        self.conn.commit()

    def check_all_prices(self, price_selector):
        """登録されたすべての商品の価格を確認します。"""
        self.cursor.execute('SELECT id, name, url FROM products')
        products = self.cursor.fetchall()

        results = []

        for product_id, name, url in products:
            print(f"価格確認中:{name}")

            price = self.get_price(url, price_selector)

            if price:
                self.record_price(product_id, price)
                results.append({
                    'name': name,
                    'price': price,
                    'url': url
                })

            time.sleep(2)  # サーバー負荷防止

        return results

    def get_price_history(self, product_id, limit=30):
        """商品の価格変動履歴を取得します。"""
        self.cursor.execute('''
            SELECT price, recorded_at
            FROM price_history
            WHERE product_id = ?
            ORDER BY recorded_at DESC
            LIMIT ?
        ''', (product_id, limit))

        return self.cursor.fetchall()

    def get_price_alert(self, product_id, threshold_price):
        """価格が特定の金額以下になったらアラートを返します。"""
        self.cursor.execute('''
            SELECT p.name, ph.price
            FROM products p
            JOIN price_history ph ON p.id = ph.product_id
            WHERE p.id = ?
            ORDER BY ph.recorded_at DESC
            LIMIT 1
        ''', (product_id,))

        result = self.cursor.fetchone()

        if result and result[1] <= threshold_price:
            return {
                'alert': True,
                'name': result[0],
                'current_price': result[1],
                'threshold': threshold_price
            }

        return {'alert': False}

    def close(self):
        """リソースを解放します。"""
        self.driver.quit()
        self.conn.close()

# 使用例
if __name__ == '__main__':
    monitor = PriceMonitor()

    try:
        # モニタリングする商品を追加(実際のURLに変更)
        monitor.add_product('ノートPC A', 'https://shop.example.com/product/1')
        monitor.add_product('ノートPC B', 'https://shop.example.com/product/2')

        # 価格を確認して記録
        results = monitor.check_all_prices(price_selector='.product-price')

        for result in results:
            print(f"{result['name']}: {result['price']:,}円")

        # 価格アラートを確認
        alert = monitor.get_price_alert(product_id=1, threshold_price=1000000)
        if alert['alert']:
            print(f"\n[アラート] {alert['name']}が{alert['current_price']:,}円に下がりました!")

    finally:
        monitor.close()

まとめ

今回は、Seleniumを使用した高度なWebスクレイピング技法を学びました。動的Webページ処理、ブラウザ自動化、ログイン処理、そして収集したデータをさまざまな形式で保存する方法を習得しました。

Webスクレイピングは強力なツールですが、常に法的・倫理的基準を遵守する必要があります。robots.txtを確認し、サーバーに過負荷をかけないように適切なディレイを設け、収集したデータの著作権を尊重しましょう。

シリーズのご案内:Python自動化マスターシリーズは続きます。次回は、メール自動化、API連携など、さらにさまざまな自動化技法を学びます!