Python自动化大师第4篇:网页爬虫实战
Python Automation Master Part 4: Advanced Web Scraping
序言:为什么需要Selenium?
第3篇中学习的requests和BeautifulSoup在解析静态HTML方面非常有效。但是现代网站的很多内容是通过JavaScript动态生成的。在这类页面上,使用requests获取的HTML中往往找不到想要的数据。
Selenium可以自动控制真实的网页浏览器,获取JavaScript执行后的完整页面。此外,它可以模拟点击、输入、滚动等用户交互,对于处理登录、无限滚动、分页非常有用。
1. 安装Selenium和WebDriver配置
1.1 安装Selenium
# 安装Selenium
pip install selenium
# 安装用于自动管理WebDriver的webdriver-manager
pip install webdriver-manager
1.2 配置WebDriver
Selenium需要WebDriver来控制浏览器。使用webdriver-manager可以自动下载和管理驱动程序。
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
# Chrome浏览器设置(自动驱动管理)
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service)
# 打开网页
driver.get('https://www.google.com')
# 输出页面标题
print(driver.title)
# 关闭浏览器
driver.quit()
1.3 浏览器选项设置
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
def create_driver(headless=False):
"""创建Chrome驱动程序。"""
options = Options()
# 无头模式(无浏览器窗口运行)
if headless:
options.add_argument('--headless')
# 基本选项设置
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')
# 窗口大小设置
options.add_argument('--window-size=1920,1080')
# User-Agent设置
options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
# 防止自动化检测
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_experimental_option('useAutomationExtension', False)
# 创建驱动程序
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=options)
# 执行脚本绕过自动化检测
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
return driver
# 使用示例
driver = create_driver(headless=True)
driver.get('https://example.com')
print(driver.page_source)
driver.quit()
2. 浏览器自动化基础
2.1 查找元素
在Selenium 4.x中使用By类来查找元素。
from selenium import webdriver
from selenium.webdriver.common.by import By
driver = webdriver.Chrome()
driver.get('https://example.com')
# 各种方式查找元素
# 通过ID查找
element = driver.find_element(By.ID, 'main-content')
# 通过类名查找
element = driver.find_element(By.CLASS_NAME, 'article')
# 通过CSS选择器查找
element = driver.find_element(By.CSS_SELECTOR, 'div.content > p')
# 通过XPath查找
element = driver.find_element(By.XPATH, '//div[@class="content"]/p')
# 通过标签名查找
elements = driver.find_elements(By.TAG_NAME, 'a')
# 通过链接文本查找
element = driver.find_element(By.LINK_TEXT, '查看更多')
element = driver.find_element(By.PARTIAL_LINK_TEXT, '更多')
# 通过Name属性查找
element = driver.find_element(By.NAME, 'username')
# 查找多个元素(返回列表)
all_links = driver.find_elements(By.TAG_NAME, 'a')
for link in all_links:
print(link.text, link.get_attribute('href'))
2.2 元素交互
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
driver = webdriver.Chrome()
driver.get('https://www.google.com')
# 输入文本
search_box = driver.find_element(By.NAME, 'q')
search_box.send_keys('Python Selenium')
# 键盘输入
search_box.send_keys(Keys.ENTER) # Enter键
# 或者
search_box.send_keys(Keys.CONTROL, 'a') # Ctrl+A(全选)
# 点击
button = driver.find_element(By.CSS_SELECTOR, 'button[type="submit"]')
button.click()
# 获取文本
element = driver.find_element(By.ID, 'result')
print(element.text)
# 获取属性
link = driver.find_element(By.TAG_NAME, 'a')
print(link.get_attribute('href'))
# 清除输入框
input_field = driver.find_element(By.NAME, 'username')
input_field.clear()
input_field.send_keys('new_value')
# 提交表单
form = driver.find_element(By.TAG_NAME, 'form')
form.submit()
2.3 滚动处理
from selenium import webdriver
from selenium.webdriver.common.by import By
import time
driver = webdriver.Chrome()
driver.get('https://example.com/infinite-scroll')
def scroll_to_bottom(driver, pause_time=2):
"""滚动到页面底部。"""
last_height = driver.execute_script("return document.body.scrollHeight")
while True:
# 滚动到页面底部
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
# 等待页面加载
time.sleep(pause_time)
# 计算新高度
new_height = driver.execute_script("return document.body.scrollHeight")
# 如果无法再滚动则退出
if new_height == last_height:
break
last_height = new_height
def scroll_by_pixel(driver, pixels=500):
"""按指定像素滚动。"""
driver.execute_script(f"window.scrollBy(0, {pixels});")
def scroll_to_element(driver, element):
"""滚动到特定元素可见。"""
driver.execute_script("arguments[0].scrollIntoView(true);", element)
# 使用示例
scroll_to_bottom(driver)
# 滚动到特定元素
target = driver.find_element(By.ID, 'target-section')
scroll_to_element(driver, target)
3. 等待策略(Wait)
在动态网页中需要等待元素加载。Selenium提供两种等待方式。
3.1 隐式等待(Implicit Wait)
全局设置,在查找元素时等待指定时间。
from selenium import webdriver
driver = webdriver.Chrome()
# 设置隐式等待(最多10秒)
driver.implicitly_wait(10)
# 现在所有find_element调用最多等待10秒
driver.get('https://example.com')
element = driver.find_element(By.ID, 'dynamic-content')
3.2 显式等待(Explicit Wait)
等待特定条件满足。这是更灵活且推荐的方式。
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
driver = webdriver.Chrome()
driver.get('https://example.com')
# 等待元素出现(最多10秒)
wait = WebDriverWait(driver, 10)
# 等待元素存在
element = wait.until(EC.presence_of_element_located((By.ID, 'content')))
# 等待元素可见
element = wait.until(EC.visibility_of_element_located((By.ID, 'content')))
# 等待元素可点击
button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, 'button.submit')))
# 等待特定文本出现
wait.until(EC.text_to_be_present_in_element((By.ID, 'status'), '完成'))
# 等待元素消失
wait.until(EC.invisibility_of_element_located((By.ID, 'loading')))
# 等待新窗口打开
wait.until(EC.number_of_windows_to_be(2))
# 等待URL包含特定值
wait.until(EC.url_contains('/success'))
3.3 自定义等待条件
from selenium.webdriver.support.ui import WebDriverWait
def wait_for_ajax(driver, timeout=30):
"""等待AJAX请求完成。"""
wait = WebDriverWait(driver, timeout)
wait.until(lambda d: d.execute_script('return jQuery.active == 0'))
def wait_for_page_load(driver, timeout=30):
"""等待页面完全加载。"""
wait = WebDriverWait(driver, timeout)
wait.until(lambda d: d.execute_script('return document.readyState') == 'complete')
def wait_for_element_count(driver, locator, count, timeout=10):
"""等待特定数量的元素出现。"""
wait = WebDriverWait(driver, timeout)
wait.until(lambda d: len(d.find_elements(*locator)) >= count)
# 使用示例
driver.get('https://example.com')
wait_for_page_load(driver)
wait_for_element_count(driver, (By.CSS_SELECTOR, '.item'), 10)
4. 分页处理
4.1 按钮点击方式
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException
def scrape_with_pagination(driver, item_selector, next_button_selector, max_pages=10):
"""爬取有分页的网站。"""
all_items = []
current_page = 1
while current_page <= max_pages:
print(f"正在爬取第{current_page}页...")
# 收集当前页面的项目
wait = WebDriverWait(driver, 10)
items = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, item_selector)))
for item in items:
all_items.append(item.text)
# 查找下一页按钮
try:
next_button = driver.find_element(By.CSS_SELECTOR, next_button_selector)
# 检查按钮是否禁用
if 'disabled' in next_button.get_attribute('class') or not next_button.is_enabled():
print("这是最后一页。")
break
# 转到下一页
next_button.click()
# 等待新内容加载
time.sleep(2)
current_page += 1
except NoSuchElementException:
print("找不到下一页按钮。")
break
except TimeoutException:
print("页面加载超时")
break
return all_items
# 使用示例
driver = webdriver.Chrome()
driver.get('https://example.com/products')
items = scrape_with_pagination(
driver,
item_selector='.product-item',
next_button_selector='.pagination .next',
max_pages=5
)
print(f"共收集了{len(items)}个项目。")
4.2 URL参数方式
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
def scrape_url_pagination(base_url, item_selector, start_page=1, max_pages=10):
"""处理基于URL参数的分页。"""
driver = webdriver.Chrome()
all_items = []
try:
for page in range(start_page, start_page + max_pages):
url = f"{base_url}?page={page}"
print(f"正在爬取:{url}")
driver.get(url)
# 等待内容加载
wait = WebDriverWait(driver, 10)
try:
items = wait.until(EC.presence_of_all_elements_located(
(By.CSS_SELECTOR, item_selector)
))
if not items:
print("没有更多项目。")
break
for item in items:
all_items.append({
'text': item.text,
'page': page
})
time.sleep(1) # 防止服务器过载
except Exception as e:
print(f"处理第{page}页时出错:{e}")
break
finally:
driver.quit()
return all_items
# 使用示例
items = scrape_url_pagination(
base_url='https://example.com/search',
item_selector='.result-item',
max_pages=5
)
5. 爬取需要登录的网站
5.1 基本登录处理
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
class LoginScraper:
"""需要登录的网站爬虫"""
def __init__(self, headless=False):
options = webdriver.ChromeOptions()
if headless:
options.add_argument('--headless')
self.driver = webdriver.Chrome(options=options)
self.wait = WebDriverWait(self.driver, 10)
def login(self, login_url, username, password, username_field, password_field, submit_button):
"""登录网站。"""
self.driver.get(login_url)
# 输入用户名
username_input = self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, username_field))
)
username_input.clear()
username_input.send_keys(username)
# 输入密码
password_input = self.driver.find_element(By.CSS_SELECTOR, password_field)
password_input.clear()
password_input.send_keys(password)
# 点击登录按钮
submit_btn = self.driver.find_element(By.CSS_SELECTOR, submit_button)
submit_btn.click()
# 等待登录完成
time.sleep(3)
return self.is_logged_in()
def is_logged_in(self):
"""检查登录状态。"""
# 检查登出按钮或用户资料元素
try:
self.driver.find_element(By.CSS_SELECTOR, '.logout-btn, .user-profile')
return True
except:
return False
def scrape_protected_page(self, url, selector):
"""登录后爬取受保护的页面。"""
if not self.is_logged_in():
raise Exception("需要登录。")
self.driver.get(url)
elements = self.wait.until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, selector))
)
return [el.text for el in elements]
def close(self):
"""关闭浏览器。"""
self.driver.quit()
# 使用示例
scraper = LoginScraper()
try:
# 登录
logged_in = scraper.login(
login_url='https://example.com/login',
username='your_username',
password='your_password',
username_field='input[name="username"]',
password_field='input[name="password"]',
submit_button='button[type="submit"]'
)
if logged_in:
print("登录成功!")
# 爬取受保护的页面
data = scraper.scrape_protected_page(
url='https://example.com/dashboard',
selector='.dashboard-item'
)
print(data)
else:
print("登录失败")
finally:
scraper.close()
5.2 使用Cookie保持会话
import pickle
import os
from selenium import webdriver
def save_cookies(driver, filepath):
"""将Cookie保存到文件。"""
with open(filepath, 'wb') as f:
pickle.dump(driver.get_cookies(), f)
def load_cookies(driver, filepath):
"""从文件加载Cookie。"""
if os.path.exists(filepath):
with open(filepath, 'rb') as f:
cookies = pickle.load(f)
for cookie in cookies:
# 处理Cookie过期
if 'expiry' in cookie:
del cookie['expiry']
driver.add_cookie(cookie)
return True
return False
# 使用示例
driver = webdriver.Chrome()
cookie_file = 'cookies.pkl'
# 先访问网站
driver.get('https://example.com')
# 如果有保存的Cookie则加载
if load_cookies(driver, cookie_file):
driver.refresh() # 刷新以应用Cookie
print("Cookie加载完成")
else:
# 执行登录
# ...(登录逻辑)
# 登录后保存Cookie
save_cookies(driver, cookie_file)
print("Cookie保存完成")
6. 数据存储
6.1 保存为CSV文件
import csv
from datetime import datetime
def save_to_csv(data, filename, fieldnames=None):
"""将数据保存为CSV文件。"""
if not data:
print("没有要保存的数据。")
return
# 如果没有字段名,使用第一条数据的键
if fieldnames is None:
fieldnames = data[0].keys()
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
print(f"数据已保存到{filename}。")
# 使用示例
products = [
{'name': '商品A', 'price': 10000, 'rating': 4.5},
{'name': '商品B', 'price': 20000, 'rating': 4.8},
{'name': '商品C', 'price': 15000, 'rating': 4.2},
]
save_to_csv(products, 'products.csv')
6.2 保存为JSON文件
import json
def save_to_json(data, filename, indent=2):
"""将数据保存为JSON文件。"""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=indent)
print(f"数据已保存到{filename}。")
def load_from_json(filename):
"""从JSON文件加载数据。"""
with open(filename, 'r', encoding='utf-8') as f:
return json.load(f)
# 使用示例
data = {
'scraped_at': '2026-01-22',
'total_items': 100,
'items': [
{'title': '新闻1', 'url': 'https://example.com/1'},
{'title': '新闻2', 'url': 'https://example.com/2'},
]
}
save_to_json(data, 'news_data.json')
6.3 保存到SQLite数据库
import sqlite3
from datetime import datetime
class DatabaseManager:
"""SQLite数据库管理类"""
def __init__(self, db_name='scraped_data.db'):
self.conn = sqlite3.connect(db_name)
self.cursor = self.conn.cursor()
def create_table(self, table_name, columns):
"""创建表。"""
columns_str = ', '.join([f'{name} {type_}' for name, type_ in columns])
query = f'CREATE TABLE IF NOT EXISTS {table_name} ({columns_str})'
self.cursor.execute(query)
self.conn.commit()
def insert(self, table_name, data):
"""插入数据。"""
columns = ', '.join(data.keys())
placeholders = ', '.join(['?' for _ in data])
query = f'INSERT INTO {table_name} ({columns}) VALUES ({placeholders})'
self.cursor.execute(query, list(data.values()))
self.conn.commit()
def insert_many(self, table_name, data_list):
"""批量插入数据。"""
if not data_list:
return
columns = ', '.join(data_list[0].keys())
placeholders = ', '.join(['?' for _ in data_list[0]])
query = f'INSERT INTO {table_name} ({columns}) VALUES ({placeholders})'
values = [list(data.values()) for data in data_list]
self.cursor.executemany(query, values)
self.conn.commit()
def select_all(self, table_name):
"""查询所有数据。"""
query = f'SELECT * FROM {table_name}'
self.cursor.execute(query)
return self.cursor.fetchall()
def close(self):
"""关闭数据库连接。"""
self.conn.close()
# 使用示例
db = DatabaseManager('products.db')
# 创建表
db.create_table('products', [
('id', 'INTEGER PRIMARY KEY AUTOINCREMENT'),
('name', 'TEXT'),
('price', 'INTEGER'),
('rating', 'REAL'),
('scraped_at', 'TEXT')
])
# 插入数据
products = [
{'name': '商品A', 'price': 10000, 'rating': 4.5, 'scraped_at': datetime.now().isoformat()},
{'name': '商品B', 'price': 20000, 'rating': 4.8, 'scraped_at': datetime.now().isoformat()},
]
db.insert_many('products', products)
# 查询数据
all_products = db.select_all('products')
for product in all_products:
print(product)
db.close()
7. 反爬虫绕过技术
注意:绕过反爬虫系统可能违反该网站的服务条款。请务必遵守法律和道德标准。
7.1 基本绕过技术
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import random
import time
def create_stealth_driver():
"""创建避免检测的隐身驱动程序。"""
options = Options()
# 防止自动化检测
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_experimental_option('useAutomationExtension', False)
# 看起来像真实浏览器
options.add_argument('--disable-infobars')
options.add_argument('--disable-extensions')
# 随机User-Agent
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0',
]
options.add_argument(f'--user-agent={random.choice(user_agents)}')
driver = webdriver.Chrome(options=options)
# 隐藏navigator.webdriver属性
driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
'source': '''
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
})
'''
})
return driver
def random_delay(min_seconds=1, max_seconds=3):
"""应用随机延迟。"""
time.sleep(random.uniform(min_seconds, max_seconds))
def human_like_typing(element, text):
"""像人一样输入文本。"""
for char in text:
element.send_keys(char)
time.sleep(random.uniform(0.05, 0.2))
7.2 使用代理
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
def create_driver_with_proxy(proxy_address):
"""创建使用代理的驱动程序。"""
options = Options()
options.add_argument(f'--proxy-server={proxy_address}')
driver = webdriver.Chrome(options=options)
return driver
# 使用示例
proxy = '123.456.789.012:8080'
driver = create_driver_with_proxy(proxy)
# 检查IP
driver.get('https://httpbin.org/ip')
print(driver.page_source)
8. 实战项目:新闻爬虫
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import json
import time
from datetime import datetime
class NewsCrawler:
"""新闻网站爬虫"""
def __init__(self, headless=True):
options = Options()
if headless:
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
self.driver = webdriver.Chrome(options=options)
self.wait = WebDriverWait(self.driver, 10)
self.articles = []
def crawl_news_list(self, url, article_selector, title_selector, link_selector, max_articles=20):
"""爬取新闻列表页面。"""
self.driver.get(url)
# 等待页面加载
self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, article_selector)))
articles = self.driver.find_elements(By.CSS_SELECTOR, article_selector)[:max_articles]
for article in articles:
try:
title_elem = article.find_element(By.CSS_SELECTOR, title_selector)
link_elem = article.find_element(By.CSS_SELECTOR, link_selector)
self.articles.append({
'title': title_elem.text.strip(),
'url': link_elem.get_attribute('href'),
'crawled_at': datetime.now().isoformat()
})
except Exception as e:
print(f"文章解析错误:{e}")
continue
time.sleep(0.5) # 防止服务器过载
return self.articles
def crawl_article_content(self, url, content_selector, author_selector=None, date_selector=None):
"""爬取单篇新闻文章的正文。"""
self.driver.get(url)
article_data = {'url': url}
try:
# 提取正文
content_elem = self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, content_selector))
)
article_data['content'] = content_elem.text.strip()
# 提取作者(可选)
if author_selector:
try:
author_elem = self.driver.find_element(By.CSS_SELECTOR, author_selector)
article_data['author'] = author_elem.text.strip()
except:
article_data['author'] = None
# 提取日期(可选)
if date_selector:
try:
date_elem = self.driver.find_element(By.CSS_SELECTOR, date_selector)
article_data['date'] = date_elem.text.strip()
except:
article_data['date'] = None
except Exception as e:
print(f"正文提取错误:{e}")
article_data['content'] = None
return article_data
def crawl_full_articles(self, content_selector, author_selector=None, date_selector=None):
"""爬取列表中所有文章的正文。"""
for i, article in enumerate(self.articles):
print(f"[{i+1}/{len(self.articles)}] 正在爬取:{article['title'][:30]}...")
content_data = self.crawl_article_content(
article['url'],
content_selector,
author_selector,
date_selector
)
article.update(content_data)
time.sleep(1) # 防止服务器过载
return self.articles
def save_to_json(self, filename):
"""将收集的数据保存为JSON文件。"""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.articles, f, ensure_ascii=False, indent=2)
print(f"数据已保存到{filename}。")
def close(self):
"""关闭浏览器。"""
self.driver.quit()
# 使用示例
if __name__ == '__main__':
crawler = NewsCrawler(headless=True)
try:
# 爬取新闻列表(根据目标网站修改实际URL和选择器)
print("开始爬取新闻列表...")
crawler.crawl_news_list(
url='https://news.example.com/tech',
article_selector='.news-item',
title_selector='.news-title',
link_selector='a',
max_articles=10
)
# 爬取文章正文
print("\n开始爬取文章正文...")
crawler.crawl_full_articles(
content_selector='.article-body',
author_selector='.author-name',
date_selector='.publish-date'
)
# 保存结果
crawler.save_to_json('news_articles.json')
print(f"\n共收集了{len(crawler.articles)}篇文章。")
finally:
crawler.close()
9. 实战项目:电商价格监控
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import sqlite3
import time
from datetime import datetime
class PriceMonitor:
"""电商价格监控爬虫"""
def __init__(self, db_name='price_history.db'):
# 浏览器设置
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
self.driver = webdriver.Chrome(options=options)
self.wait = WebDriverWait(self.driver, 10)
# 数据库设置
self.conn = sqlite3.connect(db_name)
self.cursor = self.conn.cursor()
self._create_tables()
def _create_tables(self):
"""创建数据库表。"""
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
url TEXT UNIQUE,
created_at TEXT
)
''')
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS price_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER,
price INTEGER,
recorded_at TEXT,
FOREIGN KEY (product_id) REFERENCES products (id)
)
''')
self.conn.commit()
def add_product(self, name, url):
"""添加要监控的商品。"""
try:
self.cursor.execute(
'INSERT INTO products (name, url, created_at) VALUES (?, ?, ?)',
(name, url, datetime.now().isoformat())
)
self.conn.commit()
print(f"商品已添加:{name}")
except sqlite3.IntegrityError:
print(f"已注册的商品:{name}")
def get_price(self, url, price_selector):
"""获取商品价格。"""
self.driver.get(url)
try:
price_elem = self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, price_selector))
)
# 从价格文本中只提取数字
price_text = price_elem.text
price = int(''.join(filter(str.isdigit, price_text)))
return price
except Exception as e:
print(f"价格提取错误:{e}")
return None
def record_price(self, product_id, price):
"""记录价格。"""
self.cursor.execute(
'INSERT INTO price_history (product_id, price, recorded_at) VALUES (?, ?, ?)',
(product_id, price, datetime.now().isoformat())
)
self.conn.commit()
def check_all_prices(self, price_selector):
"""检查所有注册商品的价格。"""
self.cursor.execute('SELECT id, name, url FROM products')
products = self.cursor.fetchall()
results = []
for product_id, name, url in products:
print(f"正在检查价格:{name}")
price = self.get_price(url, price_selector)
if price:
self.record_price(product_id, price)
results.append({
'name': name,
'price': price,
'url': url
})
time.sleep(2) # 防止服务器过载
return results
def get_price_history(self, product_id, limit=30):
"""查询商品的价格变动历史。"""
self.cursor.execute('''
SELECT price, recorded_at
FROM price_history
WHERE product_id = ?
ORDER BY recorded_at DESC
LIMIT ?
''', (product_id, limit))
return self.cursor.fetchall()
def get_price_alert(self, product_id, threshold_price):
"""当价格低于特定金额时返回提醒。"""
self.cursor.execute('''
SELECT p.name, ph.price
FROM products p
JOIN price_history ph ON p.id = ph.product_id
WHERE p.id = ?
ORDER BY ph.recorded_at DESC
LIMIT 1
''', (product_id,))
result = self.cursor.fetchone()
if result and result[1] <= threshold_price:
return {
'alert': True,
'name': result[0],
'current_price': result[1],
'threshold': threshold_price
}
return {'alert': False}
def close(self):
"""清理资源。"""
self.driver.quit()
self.conn.close()
# 使用示例
if __name__ == '__main__':
monitor = PriceMonitor()
try:
# 添加要监控的商品(更换为实际URL)
monitor.add_product('笔记本电脑A', 'https://shop.example.com/product/1')
monitor.add_product('笔记本电脑B', 'https://shop.example.com/product/2')
# 检查并记录价格
results = monitor.check_all_prices(price_selector='.product-price')
for result in results:
print(f"{result['name']}:{result['price']:,}元")
# 检查价格提醒
alert = monitor.get_price_alert(product_id=1, threshold_price=1000000)
if alert['alert']:
print(f"\n[提醒] {alert['name']}已降至{alert['current_price']:,}元!")
finally:
monitor.close()
总结
在这一篇中,我们学习了使用Selenium进行高级网页爬虫技术。掌握了动态网页处理、浏览器自动化、登录处理以及以各种形式存储收集的数据的方法。
网页爬虫是强大的工具,但必须始终遵守法律和道德标准。要检查robots.txt,设置适当的延迟以不给服务器造成过大负载,并尊重收集数据的版权。
系列指南:Python自动化大师系列还在继续。在下一篇中将学习邮件自动化、API集成等更多自动化技术!