Python自動化マスター 第4編:Webスクレイピング実践
Python Automation Master Part 4: Advanced Web Scraping
はじめに:なぜSeleniumが必要なのか?
第3編で学んだrequestsとBeautifulSoupは、静的HTMLをパースするのに効果的です。しかし、現代のWebサイトの多くはJavaScriptで動的にコンテンツを生成しています。このようなページでは、requestsで取得したHTMLに目的のデータがない場合が多いです。
Seleniumは、実際のWebブラウザを自動制御して、JavaScriptが実行された後の完全なページを取得できます。また、クリック、入力、スクロールなどのユーザーインタラクションをシミュレートできるため、ログイン、無限スクロール、ページネーション処理に便利です。
1. Seleniumのインストールとウェブドライバー設定
1.1 Seleniumのインストール
# Seleniumをインストール
pip install selenium
# ウェブドライバー自動管理のためのwebdriver-managerをインストール
pip install webdriver-manager
1.2 WebDriverの設定
Seleniumは、ブラウザを制御するためにWebDriverが必要です。webdriver-managerを使用すると、ドライバーを自動的にダウンロードして管理できます。
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
# Chromeブラウザの設定(自動ドライバー管理)
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service)
# Webページを開く
driver.get('https://www.google.com')
# ページタイトルを出力
print(driver.title)
# ブラウザを終了
driver.quit()
1.3 ブラウザオプション設定
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
def create_driver(headless=False):
"""Chromeドライバーを作成します。"""
options = Options()
# ヘッドレスモード(ブラウザウィンドウなしで実行)
if headless:
options.add_argument('--headless')
# 基本オプション設定
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')
# ウィンドウサイズ設定
options.add_argument('--window-size=1920,1080')
# User-Agent設定
options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
# 自動化検出防止
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_experimental_option('useAutomationExtension', False)
# ドライバーを作成
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=options)
# 自動化検出回避のためのスクリプト実行
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
return driver
# 使用例
driver = create_driver(headless=True)
driver.get('https://example.com')
print(driver.page_source)
driver.quit()
2. ブラウザ自動化の基本
2.1 要素の検索
Selenium 4.xでは、Byクラスを使用して要素を検索します。
from selenium import webdriver
from selenium.webdriver.common.by import By
driver = webdriver.Chrome()
driver.get('https://example.com')
# さまざまな方法で要素を検索
# IDで検索
element = driver.find_element(By.ID, 'main-content')
# クラス名で検索
element = driver.find_element(By.CLASS_NAME, 'article')
# CSSセレクターで検索
element = driver.find_element(By.CSS_SELECTOR, 'div.content > p')
# XPathで検索
element = driver.find_element(By.XPATH, '//div[@class="content"]/p')
# タグ名で検索
elements = driver.find_elements(By.TAG_NAME, 'a')
# リンクテキストで検索
element = driver.find_element(By.LINK_TEXT, '詳細を見る')
element = driver.find_element(By.PARTIAL_LINK_TEXT, '詳細')
# Name属性で検索
element = driver.find_element(By.NAME, 'username')
# 複数の要素を検索(リストを返す)
all_links = driver.find_elements(By.TAG_NAME, 'a')
for link in all_links:
print(link.text, link.get_attribute('href'))
2.2 要素のインタラクション
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
driver = webdriver.Chrome()
driver.get('https://www.google.com')
# テキスト入力
search_box = driver.find_element(By.NAME, 'q')
search_box.send_keys('Python Selenium')
# キーボード入力
search_box.send_keys(Keys.ENTER) # Enterキー
# または
search_box.send_keys(Keys.CONTROL, 'a') # Ctrl+A(全選択)
# クリック
button = driver.find_element(By.CSS_SELECTOR, 'button[type="submit"]')
button.click()
# テキストを取得
element = driver.find_element(By.ID, 'result')
print(element.text)
# 属性を取得
link = driver.find_element(By.TAG_NAME, 'a')
print(link.get_attribute('href'))
# 入力フィールドをクリア
input_field = driver.find_element(By.NAME, 'username')
input_field.clear()
input_field.send_keys('new_value')
# フォーム送信
form = driver.find_element(By.TAG_NAME, 'form')
form.submit()
2.3 スクロール処理
from selenium import webdriver
from selenium.webdriver.common.by import By
import time
driver = webdriver.Chrome()
driver.get('https://example.com/infinite-scroll')
def scroll_to_bottom(driver, pause_time=2):
"""ページの一番下までスクロールします。"""
last_height = driver.execute_script("return document.body.scrollHeight")
while True:
# ページの一番下へスクロール
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
# ページ読み込み待機
time.sleep(pause_time)
# 新しい高さを計算
new_height = driver.execute_script("return document.body.scrollHeight")
# これ以上スクロールできなければ終了
if new_height == last_height:
break
last_height = new_height
def scroll_by_pixel(driver, pixels=500):
"""指定されたピクセル分スクロールします。"""
driver.execute_script(f"window.scrollBy(0, {pixels});")
def scroll_to_element(driver, element):
"""特定の要素が見えるようにスクロールします。"""
driver.execute_script("arguments[0].scrollIntoView(true);", element)
# 使用例
scroll_to_bottom(driver)
# 特定の要素へスクロール
target = driver.find_element(By.ID, 'target-section')
scroll_to_element(driver, target)
3. 待機戦略(Wait)
動的なWebページでは、要素がロードされるまで待つ必要があります。Seleniumは2種類の待機方式を提供しています。
3.1 暗黙的待機(Implicit Wait)
グローバルに設定され、要素が見つかるまで指定された時間だけ待機します。
from selenium import webdriver
driver = webdriver.Chrome()
# 暗黙的待機を設定(最大10秒)
driver.implicitly_wait(10)
# これ以降のすべてのfind_element呼び出しは最大10秒まで待機
driver.get('https://example.com')
element = driver.find_element(By.ID, 'dynamic-content')
3.2 明示的待機(Explicit Wait)
特定の条件が満たされるまで待機します。より柔軟で推奨される方式です。
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
driver = webdriver.Chrome()
driver.get('https://example.com')
# 要素が現れるまで待機(最大10秒)
wait = WebDriverWait(driver, 10)
# 要素が存在するまで待機
element = wait.until(EC.presence_of_element_located((By.ID, 'content')))
# 要素が表示されるまで待機
element = wait.until(EC.visibility_of_element_located((By.ID, 'content')))
# 要素がクリック可能になるまで待機
button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, 'button.submit')))
# 特定のテキストが現れるまで待機
wait.until(EC.text_to_be_present_in_element((By.ID, 'status'), '完了'))
# 要素が消えるまで待機
wait.until(EC.invisibility_of_element_located((By.ID, 'loading')))
# 新しいウィンドウが開くまで待機
wait.until(EC.number_of_windows_to_be(2))
# URLが特定の値を含むまで待機
wait.until(EC.url_contains('/success'))
3.3 カスタム待機条件
from selenium.webdriver.support.ui import WebDriverWait
def wait_for_ajax(driver, timeout=30):
"""AJAXリクエストが完了するまで待機します。"""
wait = WebDriverWait(driver, timeout)
wait.until(lambda d: d.execute_script('return jQuery.active == 0'))
def wait_for_page_load(driver, timeout=30):
"""ページが完全に読み込まれるまで待機します。"""
wait = WebDriverWait(driver, timeout)
wait.until(lambda d: d.execute_script('return document.readyState') == 'complete')
def wait_for_element_count(driver, locator, count, timeout=10):
"""特定の数の要素が現れるまで待機します。"""
wait = WebDriverWait(driver, timeout)
wait.until(lambda d: len(d.find_elements(*locator)) >= count)
# 使用例
driver.get('https://example.com')
wait_for_page_load(driver)
wait_for_element_count(driver, (By.CSS_SELECTOR, '.item'), 10)
4. ページネーション処理
4.1 ボタンクリック方式
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException
def scrape_with_pagination(driver, item_selector, next_button_selector, max_pages=10):
"""ページネーションがあるサイトをスクレイピングします。"""
all_items = []
current_page = 1
while current_page <= max_pages:
print(f"ページ{current_page}をスクレイピング中...")
# 現在のページのアイテムを収集
wait = WebDriverWait(driver, 10)
items = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, item_selector)))
for item in items:
all_items.append(item.text)
# 次のページボタンを検索
try:
next_button = driver.find_element(By.CSS_SELECTOR, next_button_selector)
# ボタンが無効化されているか確認
if 'disabled' in next_button.get_attribute('class') or not next_button.is_enabled():
print("最後のページです。")
break
# 次のページへ移動
next_button.click()
# 新しいコンテンツの読み込み待機
time.sleep(2)
current_page += 1
except NoSuchElementException:
print("次のページボタンが見つかりません。")
break
except TimeoutException:
print("ページ読み込みタイムアウト")
break
return all_items
# 使用例
driver = webdriver.Chrome()
driver.get('https://example.com/products')
items = scrape_with_pagination(
driver,
item_selector='.product-item',
next_button_selector='.pagination .next',
max_pages=5
)
print(f"合計{len(items)}件のアイテムを収集しました。")
4.2 URLパラメータ方式
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
def scrape_url_pagination(base_url, item_selector, start_page=1, max_pages=10):
"""URLパラメータベースのページネーションを処理します。"""
driver = webdriver.Chrome()
all_items = []
try:
for page in range(start_page, start_page + max_pages):
url = f"{base_url}?page={page}"
print(f"スクレイピング:{url}")
driver.get(url)
# コンテンツの読み込み待機
wait = WebDriverWait(driver, 10)
try:
items = wait.until(EC.presence_of_all_elements_located(
(By.CSS_SELECTOR, item_selector)
))
if not items:
print("アイテムがありません。")
break
for item in items:
all_items.append({
'text': item.text,
'page': page
})
time.sleep(1) # サーバー負荷防止
except Exception as e:
print(f"ページ{page}処理中にエラー:{e}")
break
finally:
driver.quit()
return all_items
# 使用例
items = scrape_url_pagination(
base_url='https://example.com/search',
item_selector='.result-item',
max_pages=5
)
5. ログインが必要なサイトのスクレイピング
5.1 基本的なログイン処理
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
class LoginScraper:
"""ログインが必要なサイトのスクレイパー"""
def __init__(self, headless=False):
options = webdriver.ChromeOptions()
if headless:
options.add_argument('--headless')
self.driver = webdriver.Chrome(options=options)
self.wait = WebDriverWait(self.driver, 10)
def login(self, login_url, username, password, username_field, password_field, submit_button):
"""Webサイトにログインします。"""
self.driver.get(login_url)
# ユーザー名を入力
username_input = self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, username_field))
)
username_input.clear()
username_input.send_keys(username)
# パスワードを入力
password_input = self.driver.find_element(By.CSS_SELECTOR, password_field)
password_input.clear()
password_input.send_keys(password)
# ログインボタンをクリック
submit_btn = self.driver.find_element(By.CSS_SELECTOR, submit_button)
submit_btn.click()
# ログイン完了待機
time.sleep(3)
return self.is_logged_in()
def is_logged_in(self):
"""ログイン状態を確認します。"""
# ログアウトボタンやユーザープロフィール要素を確認
try:
self.driver.find_element(By.CSS_SELECTOR, '.logout-btn, .user-profile')
return True
except:
return False
def scrape_protected_page(self, url, selector):
"""ログイン後、保護されたページをスクレイピングします。"""
if not self.is_logged_in():
raise Exception("ログインが必要です。")
self.driver.get(url)
elements = self.wait.until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, selector))
)
return [el.text for el in elements]
def close(self):
"""ブラウザを終了します。"""
self.driver.quit()
# 使用例
scraper = LoginScraper()
try:
# ログイン
logged_in = scraper.login(
login_url='https://example.com/login',
username='your_username',
password='your_password',
username_field='input[name="username"]',
password_field='input[name="password"]',
submit_button='button[type="submit"]'
)
if logged_in:
print("ログイン成功!")
# 保護されたページをスクレイピング
data = scraper.scrape_protected_page(
url='https://example.com/dashboard',
selector='.dashboard-item'
)
print(data)
else:
print("ログイン失敗")
finally:
scraper.close()
5.2 Cookieを使用したセッション維持
import pickle
import os
from selenium import webdriver
def save_cookies(driver, filepath):
"""Cookieをファイルに保存します。"""
with open(filepath, 'wb') as f:
pickle.dump(driver.get_cookies(), f)
def load_cookies(driver, filepath):
"""ファイルからCookieを読み込みます。"""
if os.path.exists(filepath):
with open(filepath, 'rb') as f:
cookies = pickle.load(f)
for cookie in cookies:
# Cookie有効期限処理
if 'expiry' in cookie:
del cookie['expiry']
driver.add_cookie(cookie)
return True
return False
# 使用例
driver = webdriver.Chrome()
cookie_file = 'cookies.pkl'
# まずサイトにアクセス
driver.get('https://example.com')
# 保存されたCookieがあれば読み込み
if load_cookies(driver, cookie_file):
driver.refresh() # Cookie適用のためにリフレッシュ
print("Cookie読み込み完了")
else:
# ログインを実行
# ... (ログインロジック)
# ログイン後Cookieを保存
save_cookies(driver, cookie_file)
print("Cookie保存完了")
6. データ保存
6.1 CSVファイルに保存
import csv
from datetime import datetime
def save_to_csv(data, filename, fieldnames=None):
"""データをCSVファイルに保存します。"""
if not data:
print("保存するデータがありません。")
return
# フィールド名がなければ最初のデータのキーを使用
if fieldnames is None:
fieldnames = data[0].keys()
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
print(f"データが{filename}に保存されました。")
# 使用例
products = [
{'name': '商品 A', 'price': 10000, 'rating': 4.5},
{'name': '商品 B', 'price': 20000, 'rating': 4.8},
{'name': '商品 C', 'price': 15000, 'rating': 4.2},
]
save_to_csv(products, 'products.csv')
6.2 JSONファイルに保存
import json
def save_to_json(data, filename, indent=2):
"""データをJSONファイルに保存します。"""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=indent)
print(f"データが{filename}に保存されました。")
def load_from_json(filename):
"""JSONファイルからデータを読み込みます。"""
with open(filename, 'r', encoding='utf-8') as f:
return json.load(f)
# 使用例
data = {
'scraped_at': '2026-01-22',
'total_items': 100,
'items': [
{'title': 'ニュース 1', 'url': 'https://example.com/1'},
{'title': 'ニュース 2', 'url': 'https://example.com/2'},
]
}
save_to_json(data, 'news_data.json')
6.3 SQLiteデータベースに保存
import sqlite3
from datetime import datetime
class DatabaseManager:
"""SQLiteデータベース管理クラス"""
def __init__(self, db_name='scraped_data.db'):
self.conn = sqlite3.connect(db_name)
self.cursor = self.conn.cursor()
def create_table(self, table_name, columns):
"""テーブルを作成します。"""
columns_str = ', '.join([f'{name} {type_}' for name, type_ in columns])
query = f'CREATE TABLE IF NOT EXISTS {table_name} ({columns_str})'
self.cursor.execute(query)
self.conn.commit()
def insert(self, table_name, data):
"""データを挿入します。"""
columns = ', '.join(data.keys())
placeholders = ', '.join(['?' for _ in data])
query = f'INSERT INTO {table_name} ({columns}) VALUES ({placeholders})'
self.cursor.execute(query, list(data.values()))
self.conn.commit()
def insert_many(self, table_name, data_list):
"""複数のデータを一度に挿入します。"""
if not data_list:
return
columns = ', '.join(data_list[0].keys())
placeholders = ', '.join(['?' for _ in data_list[0]])
query = f'INSERT INTO {table_name} ({columns}) VALUES ({placeholders})'
values = [list(data.values()) for data in data_list]
self.cursor.executemany(query, values)
self.conn.commit()
def select_all(self, table_name):
"""すべてのデータを取得します。"""
query = f'SELECT * FROM {table_name}'
self.cursor.execute(query)
return self.cursor.fetchall()
def close(self):
"""データベース接続を終了します。"""
self.conn.close()
# 使用例
db = DatabaseManager('products.db')
# テーブルを作成
db.create_table('products', [
('id', 'INTEGER PRIMARY KEY AUTOINCREMENT'),
('name', 'TEXT'),
('price', 'INTEGER'),
('rating', 'REAL'),
('scraped_at', 'TEXT')
])
# データを挿入
products = [
{'name': '商品 A', 'price': 10000, 'rating': 4.5, 'scraped_at': datetime.now().isoformat()},
{'name': '商品 B', 'price': 20000, 'rating': 4.8, 'scraped_at': datetime.now().isoformat()},
]
db.insert_many('products', products)
# データを取得
all_products = db.select_all('products')
for product in all_products:
print(product)
db.close()
7. アンチボット回避技法
注意:アンチボットシステムの回避は、該当サイトの利用規約に違反する可能性があります。必ず法的・倫理的基準を遵守してください。
7.1 基本的な回避技法
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import random
import time
def create_stealth_driver():
"""検出を避けるためのステルスドライバーを作成します。"""
options = Options()
# 自動化検出防止
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_experimental_option('useAutomationExtension', False)
# 実際のブラウザのように見せる
options.add_argument('--disable-infobars')
options.add_argument('--disable-extensions')
# ランダムUser-Agent
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0',
]
options.add_argument(f'--user-agent={random.choice(user_agents)}')
driver = webdriver.Chrome(options=options)
# navigator.webdriver属性を隠す
driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
'source': '''
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
})
'''
})
return driver
def random_delay(min_seconds=1, max_seconds=3):
"""ランダムなディレイを適用します。"""
time.sleep(random.uniform(min_seconds, max_seconds))
def human_like_typing(element, text):
"""人間のようにテキストを入力します。"""
for char in text:
element.send_keys(char)
time.sleep(random.uniform(0.05, 0.2))
7.2 プロキシの使用
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
def create_driver_with_proxy(proxy_address):
"""プロキシを使用するドライバーを作成します。"""
options = Options()
options.add_argument(f'--proxy-server={proxy_address}')
driver = webdriver.Chrome(options=options)
return driver
# 使用例
proxy = '123.456.789.012:8080'
driver = create_driver_with_proxy(proxy)
# IP確認
driver.get('https://httpbin.org/ip')
print(driver.page_source)
8. 実践プロジェクト:ニュースクローラー
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import json
import time
from datetime import datetime
class NewsCrawler:
"""ニュースサイトクローラー"""
def __init__(self, headless=True):
options = Options()
if headless:
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
self.driver = webdriver.Chrome(options=options)
self.wait = WebDriverWait(self.driver, 10)
self.articles = []
def crawl_news_list(self, url, article_selector, title_selector, link_selector, max_articles=20):
"""ニュース一覧ページをクロールします。"""
self.driver.get(url)
# ページ読み込み待機
self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, article_selector)))
articles = self.driver.find_elements(By.CSS_SELECTOR, article_selector)[:max_articles]
for article in articles:
try:
title_elem = article.find_element(By.CSS_SELECTOR, title_selector)
link_elem = article.find_element(By.CSS_SELECTOR, link_selector)
self.articles.append({
'title': title_elem.text.strip(),
'url': link_elem.get_attribute('href'),
'crawled_at': datetime.now().isoformat()
})
except Exception as e:
print(f"記事パースエラー:{e}")
continue
time.sleep(0.5) # サーバー負荷防止
return self.articles
def crawl_article_content(self, url, content_selector, author_selector=None, date_selector=None):
"""個別のニュース記事の本文をクロールします。"""
self.driver.get(url)
article_data = {'url': url}
try:
# 本文を抽出
content_elem = self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, content_selector))
)
article_data['content'] = content_elem.text.strip()
# 著者を抽出(オプション)
if author_selector:
try:
author_elem = self.driver.find_element(By.CSS_SELECTOR, author_selector)
article_data['author'] = author_elem.text.strip()
except:
article_data['author'] = None
# 日付を抽出(オプション)
if date_selector:
try:
date_elem = self.driver.find_element(By.CSS_SELECTOR, date_selector)
article_data['date'] = date_elem.text.strip()
except:
article_data['date'] = None
except Exception as e:
print(f"本文抽出エラー:{e}")
article_data['content'] = None
return article_data
def crawl_full_articles(self, content_selector, author_selector=None, date_selector=None):
"""一覧から収集したすべての記事の本文をクロールします。"""
for i, article in enumerate(self.articles):
print(f"[{i+1}/{len(self.articles)}] クロール中:{article['title'][:30]}...")
content_data = self.crawl_article_content(
article['url'],
content_selector,
author_selector,
date_selector
)
article.update(content_data)
time.sleep(1) # サーバー負荷防止
return self.articles
def save_to_json(self, filename):
"""収集したデータをJSONファイルに保存します。"""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.articles, f, ensure_ascii=False, indent=2)
print(f"データが{filename}に保存されました。")
def close(self):
"""ブラウザを終了します。"""
self.driver.quit()
# 使用例
if __name__ == '__main__':
crawler = NewsCrawler(headless=True)
try:
# ニュース一覧をクロール(実際のURLとセレクターは対象サイトに合わせて修正)
print("ニュース一覧のクロールを開始...")
crawler.crawl_news_list(
url='https://news.example.com/tech',
article_selector='.news-item',
title_selector='.news-title',
link_selector='a',
max_articles=10
)
# 記事本文をクロール
print("\n記事本文のクロールを開始...")
crawler.crawl_full_articles(
content_selector='.article-body',
author_selector='.author-name',
date_selector='.publish-date'
)
# 結果を保存
crawler.save_to_json('news_articles.json')
print(f"\n合計{len(crawler.articles)}件の記事を収集しました。")
finally:
crawler.close()
9. 実践プロジェクト:ショッピングサイト価格モニタリング
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import sqlite3
import time
from datetime import datetime
class PriceMonitor:
"""ショッピングサイト価格モニタリングクローラー"""
def __init__(self, db_name='price_history.db'):
# ブラウザ設定
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
self.driver = webdriver.Chrome(options=options)
self.wait = WebDriverWait(self.driver, 10)
# データベース設定
self.conn = sqlite3.connect(db_name)
self.cursor = self.conn.cursor()
self._create_tables()
def _create_tables(self):
"""データベーステーブルを作成します。"""
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
url TEXT UNIQUE,
created_at TEXT
)
''')
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS price_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER,
price INTEGER,
recorded_at TEXT,
FOREIGN KEY (product_id) REFERENCES products (id)
)
''')
self.conn.commit()
def add_product(self, name, url):
"""モニタリングする商品を追加します。"""
try:
self.cursor.execute(
'INSERT INTO products (name, url, created_at) VALUES (?, ?, ?)',
(name, url, datetime.now().isoformat())
)
self.conn.commit()
print(f"商品追加:{name}")
except sqlite3.IntegrityError:
print(f"登録済みの商品:{name}")
def get_price(self, url, price_selector):
"""商品価格を取得します。"""
self.driver.get(url)
try:
price_elem = self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, price_selector))
)
# 価格テキストから数字のみを抽出
price_text = price_elem.text
price = int(''.join(filter(str.isdigit, price_text)))
return price
except Exception as e:
print(f"価格抽出エラー:{e}")
return None
def record_price(self, product_id, price):
"""価格を記録します。"""
self.cursor.execute(
'INSERT INTO price_history (product_id, price, recorded_at) VALUES (?, ?, ?)',
(product_id, price, datetime.now().isoformat())
)
self.conn.commit()
def check_all_prices(self, price_selector):
"""登録されたすべての商品の価格を確認します。"""
self.cursor.execute('SELECT id, name, url FROM products')
products = self.cursor.fetchall()
results = []
for product_id, name, url in products:
print(f"価格確認中:{name}")
price = self.get_price(url, price_selector)
if price:
self.record_price(product_id, price)
results.append({
'name': name,
'price': price,
'url': url
})
time.sleep(2) # サーバー負荷防止
return results
def get_price_history(self, product_id, limit=30):
"""商品の価格変動履歴を取得します。"""
self.cursor.execute('''
SELECT price, recorded_at
FROM price_history
WHERE product_id = ?
ORDER BY recorded_at DESC
LIMIT ?
''', (product_id, limit))
return self.cursor.fetchall()
def get_price_alert(self, product_id, threshold_price):
"""価格が特定の金額以下になったらアラートを返します。"""
self.cursor.execute('''
SELECT p.name, ph.price
FROM products p
JOIN price_history ph ON p.id = ph.product_id
WHERE p.id = ?
ORDER BY ph.recorded_at DESC
LIMIT 1
''', (product_id,))
result = self.cursor.fetchone()
if result and result[1] <= threshold_price:
return {
'alert': True,
'name': result[0],
'current_price': result[1],
'threshold': threshold_price
}
return {'alert': False}
def close(self):
"""リソースを解放します。"""
self.driver.quit()
self.conn.close()
# 使用例
if __name__ == '__main__':
monitor = PriceMonitor()
try:
# モニタリングする商品を追加(実際のURLに変更)
monitor.add_product('ノートPC A', 'https://shop.example.com/product/1')
monitor.add_product('ノートPC B', 'https://shop.example.com/product/2')
# 価格を確認して記録
results = monitor.check_all_prices(price_selector='.product-price')
for result in results:
print(f"{result['name']}: {result['price']:,}円")
# 価格アラートを確認
alert = monitor.get_price_alert(product_id=1, threshold_price=1000000)
if alert['alert']:
print(f"\n[アラート] {alert['name']}が{alert['current_price']:,}円に下がりました!")
finally:
monitor.close()
まとめ
今回は、Seleniumを使用した高度なWebスクレイピング技法を学びました。動的Webページ処理、ブラウザ自動化、ログイン処理、そして収集したデータをさまざまな形式で保存する方法を習得しました。
Webスクレイピングは強力なツールですが、常に法的・倫理的基準を遵守する必要があります。robots.txtを確認し、サーバーに過負荷をかけないように適切なディレイを設け、収集したデータの著作権を尊重しましょう。
シリーズのご案内:Python自動化マスターシリーズは続きます。次回は、メール自動化、API連携など、さらにさまざまな自動化技法を学びます!