序言:为什么需要Selenium?

第3篇中学习的requests和BeautifulSoup在解析静态HTML方面非常有效。但是现代网站的很多内容是通过JavaScript动态生成的。在这类页面上,使用requests获取的HTML中往往找不到想要的数据。

Selenium可以自动控制真实的网页浏览器,获取JavaScript执行后的完整页面。此外,它可以模拟点击、输入、滚动等用户交互,对于处理登录、无限滚动、分页非常有用。

1. 安装Selenium和WebDriver配置

1.1 安装Selenium

# 安装Selenium
pip install selenium

# 安装用于自动管理WebDriver的webdriver-manager
pip install webdriver-manager

1.2 配置WebDriver

Selenium需要WebDriver来控制浏览器。使用webdriver-manager可以自动下载和管理驱动程序。

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager

# Chrome浏览器设置(自动驱动管理)
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service)

# 打开网页
driver.get('https://www.google.com')

# 输出页面标题
print(driver.title)

# 关闭浏览器
driver.quit()

1.3 浏览器选项设置

from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager

def create_driver(headless=False):
    """创建Chrome驱动程序。"""
    options = Options()

    # 无头模式(无浏览器窗口运行)
    if headless:
        options.add_argument('--headless')

    # 基本选项设置
    options.add_argument('--no-sandbox')
    options.add_argument('--disable-dev-shm-usage')
    options.add_argument('--disable-gpu')

    # 窗口大小设置
    options.add_argument('--window-size=1920,1080')

    # User-Agent设置
    options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')

    # 防止自动化检测
    options.add_argument('--disable-blink-features=AutomationControlled')
    options.add_experimental_option('excludeSwitches', ['enable-automation'])
    options.add_experimental_option('useAutomationExtension', False)

    # 创建驱动程序
    service = Service(ChromeDriverManager().install())
    driver = webdriver.Chrome(service=service, options=options)

    # 执行脚本绕过自动化检测
    driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")

    return driver

# 使用示例
driver = create_driver(headless=True)
driver.get('https://example.com')
print(driver.page_source)
driver.quit()

2. 浏览器自动化基础

2.1 查找元素

在Selenium 4.x中使用By类来查找元素。

from selenium import webdriver
from selenium.webdriver.common.by import By

driver = webdriver.Chrome()
driver.get('https://example.com')

# 各种方式查找元素
# 通过ID查找
element = driver.find_element(By.ID, 'main-content')

# 通过类名查找
element = driver.find_element(By.CLASS_NAME, 'article')

# 通过CSS选择器查找
element = driver.find_element(By.CSS_SELECTOR, 'div.content > p')

# 通过XPath查找
element = driver.find_element(By.XPATH, '//div[@class="content"]/p')

# 通过标签名查找
elements = driver.find_elements(By.TAG_NAME, 'a')

# 通过链接文本查找
element = driver.find_element(By.LINK_TEXT, '查看更多')
element = driver.find_element(By.PARTIAL_LINK_TEXT, '更多')

# 通过Name属性查找
element = driver.find_element(By.NAME, 'username')

# 查找多个元素(返回列表)
all_links = driver.find_elements(By.TAG_NAME, 'a')
for link in all_links:
    print(link.text, link.get_attribute('href'))

2.2 元素交互

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys

driver = webdriver.Chrome()
driver.get('https://www.google.com')

# 输入文本
search_box = driver.find_element(By.NAME, 'q')
search_box.send_keys('Python Selenium')

# 键盘输入
search_box.send_keys(Keys.ENTER)  # Enter键
# 或者
search_box.send_keys(Keys.CONTROL, 'a')  # Ctrl+A(全选)

# 点击
button = driver.find_element(By.CSS_SELECTOR, 'button[type="submit"]')
button.click()

# 获取文本
element = driver.find_element(By.ID, 'result')
print(element.text)

# 获取属性
link = driver.find_element(By.TAG_NAME, 'a')
print(link.get_attribute('href'))

# 清除输入框
input_field = driver.find_element(By.NAME, 'username')
input_field.clear()
input_field.send_keys('new_value')

# 提交表单
form = driver.find_element(By.TAG_NAME, 'form')
form.submit()

2.3 滚动处理

from selenium import webdriver
from selenium.webdriver.common.by import By
import time

driver = webdriver.Chrome()
driver.get('https://example.com/infinite-scroll')

def scroll_to_bottom(driver, pause_time=2):
    """滚动到页面底部。"""
    last_height = driver.execute_script("return document.body.scrollHeight")

    while True:
        # 滚动到页面底部
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")

        # 等待页面加载
        time.sleep(pause_time)

        # 计算新高度
        new_height = driver.execute_script("return document.body.scrollHeight")

        # 如果无法再滚动则退出
        if new_height == last_height:
            break

        last_height = new_height

def scroll_by_pixel(driver, pixels=500):
    """按指定像素滚动。"""
    driver.execute_script(f"window.scrollBy(0, {pixels});")

def scroll_to_element(driver, element):
    """滚动到特定元素可见。"""
    driver.execute_script("arguments[0].scrollIntoView(true);", element)

# 使用示例
scroll_to_bottom(driver)

# 滚动到特定元素
target = driver.find_element(By.ID, 'target-section')
scroll_to_element(driver, target)

3. 等待策略(Wait)

在动态网页中需要等待元素加载。Selenium提供两种等待方式。

3.1 隐式等待(Implicit Wait)

全局设置,在查找元素时等待指定时间。

from selenium import webdriver

driver = webdriver.Chrome()

# 设置隐式等待(最多10秒)
driver.implicitly_wait(10)

# 现在所有find_element调用最多等待10秒
driver.get('https://example.com')
element = driver.find_element(By.ID, 'dynamic-content')

3.2 显式等待(Explicit Wait)

等待特定条件满足。这是更灵活且推荐的方式。

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

driver = webdriver.Chrome()
driver.get('https://example.com')

# 等待元素出现(最多10秒)
wait = WebDriverWait(driver, 10)

# 等待元素存在
element = wait.until(EC.presence_of_element_located((By.ID, 'content')))

# 等待元素可见
element = wait.until(EC.visibility_of_element_located((By.ID, 'content')))

# 等待元素可点击
button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, 'button.submit')))

# 等待特定文本出现
wait.until(EC.text_to_be_present_in_element((By.ID, 'status'), '完成'))

# 等待元素消失
wait.until(EC.invisibility_of_element_located((By.ID, 'loading')))

# 等待新窗口打开
wait.until(EC.number_of_windows_to_be(2))

# 等待URL包含特定值
wait.until(EC.url_contains('/success'))

3.3 自定义等待条件

from selenium.webdriver.support.ui import WebDriverWait

def wait_for_ajax(driver, timeout=30):
    """等待AJAX请求完成。"""
    wait = WebDriverWait(driver, timeout)
    wait.until(lambda d: d.execute_script('return jQuery.active == 0'))

def wait_for_page_load(driver, timeout=30):
    """等待页面完全加载。"""
    wait = WebDriverWait(driver, timeout)
    wait.until(lambda d: d.execute_script('return document.readyState') == 'complete')

def wait_for_element_count(driver, locator, count, timeout=10):
    """等待特定数量的元素出现。"""
    wait = WebDriverWait(driver, timeout)
    wait.until(lambda d: len(d.find_elements(*locator)) >= count)

# 使用示例
driver.get('https://example.com')
wait_for_page_load(driver)
wait_for_element_count(driver, (By.CSS_SELECTOR, '.item'), 10)

4. 分页处理

4.1 按钮点击方式

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException

def scrape_with_pagination(driver, item_selector, next_button_selector, max_pages=10):
    """爬取有分页的网站。"""
    all_items = []
    current_page = 1

    while current_page <= max_pages:
        print(f"正在爬取第{current_page}页...")

        # 收集当前页面的项目
        wait = WebDriverWait(driver, 10)
        items = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, item_selector)))

        for item in items:
            all_items.append(item.text)

        # 查找下一页按钮
        try:
            next_button = driver.find_element(By.CSS_SELECTOR, next_button_selector)

            # 检查按钮是否禁用
            if 'disabled' in next_button.get_attribute('class') or not next_button.is_enabled():
                print("这是最后一页。")
                break

            # 转到下一页
            next_button.click()

            # 等待新内容加载
            time.sleep(2)
            current_page += 1

        except NoSuchElementException:
            print("找不到下一页按钮。")
            break
        except TimeoutException:
            print("页面加载超时")
            break

    return all_items

# 使用示例
driver = webdriver.Chrome()
driver.get('https://example.com/products')

items = scrape_with_pagination(
    driver,
    item_selector='.product-item',
    next_button_selector='.pagination .next',
    max_pages=5
)

print(f"共收集了{len(items)}个项目。")

4.2 URL参数方式

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time

def scrape_url_pagination(base_url, item_selector, start_page=1, max_pages=10):
    """处理基于URL参数的分页。"""
    driver = webdriver.Chrome()
    all_items = []

    try:
        for page in range(start_page, start_page + max_pages):
            url = f"{base_url}?page={page}"
            print(f"正在爬取:{url}")

            driver.get(url)

            # 等待内容加载
            wait = WebDriverWait(driver, 10)

            try:
                items = wait.until(EC.presence_of_all_elements_located(
                    (By.CSS_SELECTOR, item_selector)
                ))

                if not items:
                    print("没有更多项目。")
                    break

                for item in items:
                    all_items.append({
                        'text': item.text,
                        'page': page
                    })

                time.sleep(1)  # 防止服务器过载

            except Exception as e:
                print(f"处理第{page}页时出错:{e}")
                break

    finally:
        driver.quit()

    return all_items

# 使用示例
items = scrape_url_pagination(
    base_url='https://example.com/search',
    item_selector='.result-item',
    max_pages=5
)

5. 爬取需要登录的网站

5.1 基本登录处理

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time

class LoginScraper:
    """需要登录的网站爬虫"""

    def __init__(self, headless=False):
        options = webdriver.ChromeOptions()
        if headless:
            options.add_argument('--headless')
        self.driver = webdriver.Chrome(options=options)
        self.wait = WebDriverWait(self.driver, 10)

    def login(self, login_url, username, password, username_field, password_field, submit_button):
        """登录网站。"""
        self.driver.get(login_url)

        # 输入用户名
        username_input = self.wait.until(
            EC.presence_of_element_located((By.CSS_SELECTOR, username_field))
        )
        username_input.clear()
        username_input.send_keys(username)

        # 输入密码
        password_input = self.driver.find_element(By.CSS_SELECTOR, password_field)
        password_input.clear()
        password_input.send_keys(password)

        # 点击登录按钮
        submit_btn = self.driver.find_element(By.CSS_SELECTOR, submit_button)
        submit_btn.click()

        # 等待登录完成
        time.sleep(3)

        return self.is_logged_in()

    def is_logged_in(self):
        """检查登录状态。"""
        # 检查登出按钮或用户资料元素
        try:
            self.driver.find_element(By.CSS_SELECTOR, '.logout-btn, .user-profile')
            return True
        except:
            return False

    def scrape_protected_page(self, url, selector):
        """登录后爬取受保护的页面。"""
        if not self.is_logged_in():
            raise Exception("需要登录。")

        self.driver.get(url)
        elements = self.wait.until(
            EC.presence_of_all_elements_located((By.CSS_SELECTOR, selector))
        )
        return [el.text for el in elements]

    def close(self):
        """关闭浏览器。"""
        self.driver.quit()

# 使用示例
scraper = LoginScraper()

try:
    # 登录
    logged_in = scraper.login(
        login_url='https://example.com/login',
        username='your_username',
        password='your_password',
        username_field='input[name="username"]',
        password_field='input[name="password"]',
        submit_button='button[type="submit"]'
    )

    if logged_in:
        print("登录成功!")
        # 爬取受保护的页面
        data = scraper.scrape_protected_page(
            url='https://example.com/dashboard',
            selector='.dashboard-item'
        )
        print(data)
    else:
        print("登录失败")

finally:
    scraper.close()

5.2 使用Cookie保持会话

import pickle
import os
from selenium import webdriver

def save_cookies(driver, filepath):
    """将Cookie保存到文件。"""
    with open(filepath, 'wb') as f:
        pickle.dump(driver.get_cookies(), f)

def load_cookies(driver, filepath):
    """从文件加载Cookie。"""
    if os.path.exists(filepath):
        with open(filepath, 'rb') as f:
            cookies = pickle.load(f)
            for cookie in cookies:
                # 处理Cookie过期
                if 'expiry' in cookie:
                    del cookie['expiry']
                driver.add_cookie(cookie)
        return True
    return False

# 使用示例
driver = webdriver.Chrome()
cookie_file = 'cookies.pkl'

# 先访问网站
driver.get('https://example.com')

# 如果有保存的Cookie则加载
if load_cookies(driver, cookie_file):
    driver.refresh()  # 刷新以应用Cookie
    print("Cookie加载完成")
else:
    # 执行登录
    # ...(登录逻辑)
    # 登录后保存Cookie
    save_cookies(driver, cookie_file)
    print("Cookie保存完成")

6. 数据存储

6.1 保存为CSV文件

import csv
from datetime import datetime

def save_to_csv(data, filename, fieldnames=None):
    """将数据保存为CSV文件。"""
    if not data:
        print("没有要保存的数据。")
        return

    # 如果没有字段名,使用第一条数据的键
    if fieldnames is None:
        fieldnames = data[0].keys()

    with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(data)

    print(f"数据已保存到{filename}。")

# 使用示例
products = [
    {'name': '商品A', 'price': 10000, 'rating': 4.5},
    {'name': '商品B', 'price': 20000, 'rating': 4.8},
    {'name': '商品C', 'price': 15000, 'rating': 4.2},
]

save_to_csv(products, 'products.csv')

6.2 保存为JSON文件

import json

def save_to_json(data, filename, indent=2):
    """将数据保存为JSON文件。"""
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=indent)

    print(f"数据已保存到{filename}。")

def load_from_json(filename):
    """从JSON文件加载数据。"""
    with open(filename, 'r', encoding='utf-8') as f:
        return json.load(f)

# 使用示例
data = {
    'scraped_at': '2026-01-22',
    'total_items': 100,
    'items': [
        {'title': '新闻1', 'url': 'https://example.com/1'},
        {'title': '新闻2', 'url': 'https://example.com/2'},
    ]
}

save_to_json(data, 'news_data.json')

6.3 保存到SQLite数据库

import sqlite3
from datetime import datetime

class DatabaseManager:
    """SQLite数据库管理类"""

    def __init__(self, db_name='scraped_data.db'):
        self.conn = sqlite3.connect(db_name)
        self.cursor = self.conn.cursor()

    def create_table(self, table_name, columns):
        """创建表。"""
        columns_str = ', '.join([f'{name} {type_}' for name, type_ in columns])
        query = f'CREATE TABLE IF NOT EXISTS {table_name} ({columns_str})'
        self.cursor.execute(query)
        self.conn.commit()

    def insert(self, table_name, data):
        """插入数据。"""
        columns = ', '.join(data.keys())
        placeholders = ', '.join(['?' for _ in data])
        query = f'INSERT INTO {table_name} ({columns}) VALUES ({placeholders})'
        self.cursor.execute(query, list(data.values()))
        self.conn.commit()

    def insert_many(self, table_name, data_list):
        """批量插入数据。"""
        if not data_list:
            return

        columns = ', '.join(data_list[0].keys())
        placeholders = ', '.join(['?' for _ in data_list[0]])
        query = f'INSERT INTO {table_name} ({columns}) VALUES ({placeholders})'

        values = [list(data.values()) for data in data_list]
        self.cursor.executemany(query, values)
        self.conn.commit()

    def select_all(self, table_name):
        """查询所有数据。"""
        query = f'SELECT * FROM {table_name}'
        self.cursor.execute(query)
        return self.cursor.fetchall()

    def close(self):
        """关闭数据库连接。"""
        self.conn.close()

# 使用示例
db = DatabaseManager('products.db')

# 创建表
db.create_table('products', [
    ('id', 'INTEGER PRIMARY KEY AUTOINCREMENT'),
    ('name', 'TEXT'),
    ('price', 'INTEGER'),
    ('rating', 'REAL'),
    ('scraped_at', 'TEXT')
])

# 插入数据
products = [
    {'name': '商品A', 'price': 10000, 'rating': 4.5, 'scraped_at': datetime.now().isoformat()},
    {'name': '商品B', 'price': 20000, 'rating': 4.8, 'scraped_at': datetime.now().isoformat()},
]

db.insert_many('products', products)

# 查询数据
all_products = db.select_all('products')
for product in all_products:
    print(product)

db.close()

7. 反爬虫绕过技术

注意:绕过反爬虫系统可能违反该网站的服务条款。请务必遵守法律和道德标准。

7.1 基本绕过技术

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import random
import time

def create_stealth_driver():
    """创建避免检测的隐身驱动程序。"""
    options = Options()

    # 防止自动化检测
    options.add_argument('--disable-blink-features=AutomationControlled')
    options.add_experimental_option('excludeSwitches', ['enable-automation'])
    options.add_experimental_option('useAutomationExtension', False)

    # 看起来像真实浏览器
    options.add_argument('--disable-infobars')
    options.add_argument('--disable-extensions')

    # 随机User-Agent
    user_agents = [
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36',
        'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36',
        'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0',
    ]
    options.add_argument(f'--user-agent={random.choice(user_agents)}')

    driver = webdriver.Chrome(options=options)

    # 隐藏navigator.webdriver属性
    driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
        'source': '''
            Object.defineProperty(navigator, 'webdriver', {
                get: () => undefined
            })
        '''
    })

    return driver

def random_delay(min_seconds=1, max_seconds=3):
    """应用随机延迟。"""
    time.sleep(random.uniform(min_seconds, max_seconds))

def human_like_typing(element, text):
    """像人一样输入文本。"""
    for char in text:
        element.send_keys(char)
        time.sleep(random.uniform(0.05, 0.2))

7.2 使用代理

from selenium import webdriver
from selenium.webdriver.chrome.options import Options

def create_driver_with_proxy(proxy_address):
    """创建使用代理的驱动程序。"""
    options = Options()
    options.add_argument(f'--proxy-server={proxy_address}')

    driver = webdriver.Chrome(options=options)
    return driver

# 使用示例
proxy = '123.456.789.012:8080'
driver = create_driver_with_proxy(proxy)

# 检查IP
driver.get('https://httpbin.org/ip')
print(driver.page_source)

8. 实战项目:新闻爬虫

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import json
import time
from datetime import datetime

class NewsCrawler:
    """新闻网站爬虫"""

    def __init__(self, headless=True):
        options = Options()
        if headless:
            options.add_argument('--headless')
        options.add_argument('--no-sandbox')
        options.add_argument('--disable-dev-shm-usage')
        options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')

        self.driver = webdriver.Chrome(options=options)
        self.wait = WebDriverWait(self.driver, 10)
        self.articles = []

    def crawl_news_list(self, url, article_selector, title_selector, link_selector, max_articles=20):
        """爬取新闻列表页面。"""
        self.driver.get(url)

        # 等待页面加载
        self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, article_selector)))

        articles = self.driver.find_elements(By.CSS_SELECTOR, article_selector)[:max_articles]

        for article in articles:
            try:
                title_elem = article.find_element(By.CSS_SELECTOR, title_selector)
                link_elem = article.find_element(By.CSS_SELECTOR, link_selector)

                self.articles.append({
                    'title': title_elem.text.strip(),
                    'url': link_elem.get_attribute('href'),
                    'crawled_at': datetime.now().isoformat()
                })
            except Exception as e:
                print(f"文章解析错误:{e}")
                continue

            time.sleep(0.5)  # 防止服务器过载

        return self.articles

    def crawl_article_content(self, url, content_selector, author_selector=None, date_selector=None):
        """爬取单篇新闻文章的正文。"""
        self.driver.get(url)

        article_data = {'url': url}

        try:
            # 提取正文
            content_elem = self.wait.until(
                EC.presence_of_element_located((By.CSS_SELECTOR, content_selector))
            )
            article_data['content'] = content_elem.text.strip()

            # 提取作者(可选)
            if author_selector:
                try:
                    author_elem = self.driver.find_element(By.CSS_SELECTOR, author_selector)
                    article_data['author'] = author_elem.text.strip()
                except:
                    article_data['author'] = None

            # 提取日期(可选)
            if date_selector:
                try:
                    date_elem = self.driver.find_element(By.CSS_SELECTOR, date_selector)
                    article_data['date'] = date_elem.text.strip()
                except:
                    article_data['date'] = None

        except Exception as e:
            print(f"正文提取错误:{e}")
            article_data['content'] = None

        return article_data

    def crawl_full_articles(self, content_selector, author_selector=None, date_selector=None):
        """爬取列表中所有文章的正文。"""
        for i, article in enumerate(self.articles):
            print(f"[{i+1}/{len(self.articles)}] 正在爬取:{article['title'][:30]}...")

            content_data = self.crawl_article_content(
                article['url'],
                content_selector,
                author_selector,
                date_selector
            )

            article.update(content_data)
            time.sleep(1)  # 防止服务器过载

        return self.articles

    def save_to_json(self, filename):
        """将收集的数据保存为JSON文件。"""
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(self.articles, f, ensure_ascii=False, indent=2)
        print(f"数据已保存到{filename}。")

    def close(self):
        """关闭浏览器。"""
        self.driver.quit()

# 使用示例
if __name__ == '__main__':
    crawler = NewsCrawler(headless=True)

    try:
        # 爬取新闻列表(根据目标网站修改实际URL和选择器)
        print("开始爬取新闻列表...")
        crawler.crawl_news_list(
            url='https://news.example.com/tech',
            article_selector='.news-item',
            title_selector='.news-title',
            link_selector='a',
            max_articles=10
        )

        # 爬取文章正文
        print("\n开始爬取文章正文...")
        crawler.crawl_full_articles(
            content_selector='.article-body',
            author_selector='.author-name',
            date_selector='.publish-date'
        )

        # 保存结果
        crawler.save_to_json('news_articles.json')

        print(f"\n共收集了{len(crawler.articles)}篇文章。")

    finally:
        crawler.close()

9. 实战项目:电商价格监控

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import sqlite3
import time
from datetime import datetime

class PriceMonitor:
    """电商价格监控爬虫"""

    def __init__(self, db_name='price_history.db'):
        # 浏览器设置
        options = Options()
        options.add_argument('--headless')
        options.add_argument('--no-sandbox')
        options.add_argument('--disable-dev-shm-usage')

        self.driver = webdriver.Chrome(options=options)
        self.wait = WebDriverWait(self.driver, 10)

        # 数据库设置
        self.conn = sqlite3.connect(db_name)
        self.cursor = self.conn.cursor()
        self._create_tables()

    def _create_tables(self):
        """创建数据库表。"""
        self.cursor.execute('''
            CREATE TABLE IF NOT EXISTS products (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT,
                url TEXT UNIQUE,
                created_at TEXT
            )
        ''')

        self.cursor.execute('''
            CREATE TABLE IF NOT EXISTS price_history (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                product_id INTEGER,
                price INTEGER,
                recorded_at TEXT,
                FOREIGN KEY (product_id) REFERENCES products (id)
            )
        ''')

        self.conn.commit()

    def add_product(self, name, url):
        """添加要监控的商品。"""
        try:
            self.cursor.execute(
                'INSERT INTO products (name, url, created_at) VALUES (?, ?, ?)',
                (name, url, datetime.now().isoformat())
            )
            self.conn.commit()
            print(f"商品已添加:{name}")
        except sqlite3.IntegrityError:
            print(f"已注册的商品:{name}")

    def get_price(self, url, price_selector):
        """获取商品价格。"""
        self.driver.get(url)

        try:
            price_elem = self.wait.until(
                EC.presence_of_element_located((By.CSS_SELECTOR, price_selector))
            )

            # 从价格文本中只提取数字
            price_text = price_elem.text
            price = int(''.join(filter(str.isdigit, price_text)))

            return price

        except Exception as e:
            print(f"价格提取错误:{e}")
            return None

    def record_price(self, product_id, price):
        """记录价格。"""
        self.cursor.execute(
            'INSERT INTO price_history (product_id, price, recorded_at) VALUES (?, ?, ?)',
            (product_id, price, datetime.now().isoformat())
        )
        self.conn.commit()

    def check_all_prices(self, price_selector):
        """检查所有注册商品的价格。"""
        self.cursor.execute('SELECT id, name, url FROM products')
        products = self.cursor.fetchall()

        results = []

        for product_id, name, url in products:
            print(f"正在检查价格:{name}")

            price = self.get_price(url, price_selector)

            if price:
                self.record_price(product_id, price)
                results.append({
                    'name': name,
                    'price': price,
                    'url': url
                })

            time.sleep(2)  # 防止服务器过载

        return results

    def get_price_history(self, product_id, limit=30):
        """查询商品的价格变动历史。"""
        self.cursor.execute('''
            SELECT price, recorded_at
            FROM price_history
            WHERE product_id = ?
            ORDER BY recorded_at DESC
            LIMIT ?
        ''', (product_id, limit))

        return self.cursor.fetchall()

    def get_price_alert(self, product_id, threshold_price):
        """当价格低于特定金额时返回提醒。"""
        self.cursor.execute('''
            SELECT p.name, ph.price
            FROM products p
            JOIN price_history ph ON p.id = ph.product_id
            WHERE p.id = ?
            ORDER BY ph.recorded_at DESC
            LIMIT 1
        ''', (product_id,))

        result = self.cursor.fetchone()

        if result and result[1] <= threshold_price:
            return {
                'alert': True,
                'name': result[0],
                'current_price': result[1],
                'threshold': threshold_price
            }

        return {'alert': False}

    def close(self):
        """清理资源。"""
        self.driver.quit()
        self.conn.close()

# 使用示例
if __name__ == '__main__':
    monitor = PriceMonitor()

    try:
        # 添加要监控的商品(更换为实际URL)
        monitor.add_product('笔记本电脑A', 'https://shop.example.com/product/1')
        monitor.add_product('笔记本电脑B', 'https://shop.example.com/product/2')

        # 检查并记录价格
        results = monitor.check_all_prices(price_selector='.product-price')

        for result in results:
            print(f"{result['name']}:{result['price']:,}元")

        # 检查价格提醒
        alert = monitor.get_price_alert(product_id=1, threshold_price=1000000)
        if alert['alert']:
            print(f"\n[提醒] {alert['name']}已降至{alert['current_price']:,}元!")

    finally:
        monitor.close()

总结

在这一篇中,我们学习了使用Selenium进行高级网页爬虫技术。掌握了动态网页处理、浏览器自动化、登录处理以及以各种形式存储收集的数据的方法。

网页爬虫是强大的工具,但必须始终遵守法律和道德标准。要检查robots.txt,设置适当的延迟以不给服务器造成过大负载,并尊重收集数据的版权。

系列指南:Python自动化大师系列还在继续。在下一篇中将学习邮件自动化、API集成等更多自动化技术!