Python Automation Master Part 4: Advanced Web Scraping
Mastering Dynamic Web Pages with Selenium
Introduction: Why Do We Need Selenium?
The requests and BeautifulSoup libraries we learned in Part 3 are effective for parsing static HTML. However, many parts of modern websites dynamically generate content with JavaScript. In such pages, the HTML fetched with requests often doesn't contain the data we want.
Selenium can automatically control a real web browser to get the complete page after JavaScript has executed. It can also simulate user interactions like clicking, typing, and scrolling, making it useful for handling logins, infinite scroll, and pagination.
1. Installing Selenium and WebDriver Setup
1.1 Installing Selenium
# Install Selenium
pip install selenium
# Install webdriver-manager for automatic driver management
pip install webdriver-manager
1.2 WebDriver Setup
Selenium requires a WebDriver to control browsers. Using webdriver-manager allows you to automatically download and manage drivers.
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
# Chrome browser setup (automatic driver management)
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service)
# Open web page
driver.get('https://www.google.com')
# Print page title
print(driver.title)
# Close browser
driver.quit()
1.3 Browser Options Configuration
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
def create_driver(headless=False):
"""Creates a Chrome driver."""
options = Options()
# Headless mode (run without browser window)
if headless:
options.add_argument('--headless')
# Basic options
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')
# Window size
options.add_argument('--window-size=1920,1080')
# User-Agent setting
options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
# Disable automation detection
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_experimental_option('useAutomationExtension', False)
# Create driver
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=options)
# Execute script to bypass automation detection
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
return driver
# Usage example
driver = create_driver(headless=True)
driver.get('https://example.com')
print(driver.page_source)
driver.quit()
2. Browser Automation Basics
2.1 Finding Elements
In Selenium 4.x, you use the By class to find elements.
from selenium import webdriver
from selenium.webdriver.common.by import By
driver = webdriver.Chrome()
driver.get('https://example.com')
# Various ways to find elements
# By ID
element = driver.find_element(By.ID, 'main-content')
# By class name
element = driver.find_element(By.CLASS_NAME, 'article')
# By CSS selector
element = driver.find_element(By.CSS_SELECTOR, 'div.content > p')
# By XPath
element = driver.find_element(By.XPATH, '//div[@class="content"]/p')
# By tag name
elements = driver.find_elements(By.TAG_NAME, 'a')
# By link text
element = driver.find_element(By.LINK_TEXT, 'Read More')
element = driver.find_element(By.PARTIAL_LINK_TEXT, 'Read')
# By name attribute
element = driver.find_element(By.NAME, 'username')
# Find multiple elements (returns list)
all_links = driver.find_elements(By.TAG_NAME, 'a')
for link in all_links:
print(link.text, link.get_attribute('href'))
2.2 Element Interactions
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
driver = webdriver.Chrome()
driver.get('https://www.google.com')
# Enter text
search_box = driver.find_element(By.NAME, 'q')
search_box.send_keys('Python Selenium')
# Keyboard input
search_box.send_keys(Keys.ENTER) # Enter key
# or
search_box.send_keys(Keys.CONTROL, 'a') # Ctrl+A (Select all)
# Click
button = driver.find_element(By.CSS_SELECTOR, 'button[type="submit"]')
button.click()
# Get text
element = driver.find_element(By.ID, 'result')
print(element.text)
# Get attribute
link = driver.find_element(By.TAG_NAME, 'a')
print(link.get_attribute('href'))
# Clear input field
input_field = driver.find_element(By.NAME, 'username')
input_field.clear()
input_field.send_keys('new_value')
# Submit form
form = driver.find_element(By.TAG_NAME, 'form')
form.submit()
2.3 Scroll Handling
from selenium import webdriver
from selenium.webdriver.common.by import By
import time
driver = webdriver.Chrome()
driver.get('https://example.com/infinite-scroll')
def scroll_to_bottom(driver, pause_time=2):
"""Scrolls to the bottom of the page."""
last_height = driver.execute_script("return document.body.scrollHeight")
while True:
# Scroll to bottom of page
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
# Wait for page to load
time.sleep(pause_time)
# Calculate new height
new_height = driver.execute_script("return document.body.scrollHeight")
# Exit if no more scrolling possible
if new_height == last_height:
break
last_height = new_height
def scroll_by_pixel(driver, pixels=500):
"""Scrolls by specified pixels."""
driver.execute_script(f"window.scrollBy(0, {pixels});")
def scroll_to_element(driver, element):
"""Scrolls until a specific element is visible."""
driver.execute_script("arguments[0].scrollIntoView(true);", element)
# Usage example
scroll_to_bottom(driver)
# Scroll to specific element
target = driver.find_element(By.ID, 'target-section')
scroll_to_element(driver, target)
3. Wait Strategies
With dynamic web pages, you need to wait for elements to load. Selenium provides two types of waits.
3.1 Implicit Wait
Set globally, waits up to the specified time when finding elements.
from selenium import webdriver
driver = webdriver.Chrome()
# Set implicit wait (max 10 seconds)
driver.implicitly_wait(10)
# Now all find_element calls will wait up to 10 seconds
driver.get('https://example.com')
element = driver.find_element(By.ID, 'dynamic-content')
3.2 Explicit Wait
Waits until a specific condition is met. This is the more flexible and recommended approach.
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
driver = webdriver.Chrome()
driver.get('https://example.com')
# Wait until element appears (max 10 seconds)
wait = WebDriverWait(driver, 10)
# Wait until element is present
element = wait.until(EC.presence_of_element_located((By.ID, 'content')))
# Wait until element is visible
element = wait.until(EC.visibility_of_element_located((By.ID, 'content')))
# Wait until element is clickable
button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, 'button.submit')))
# Wait until specific text appears
wait.until(EC.text_to_be_present_in_element((By.ID, 'status'), 'Complete'))
# Wait until element disappears
wait.until(EC.invisibility_of_element_located((By.ID, 'loading')))
# Wait until new window opens
wait.until(EC.number_of_windows_to_be(2))
# Wait until URL contains specific value
wait.until(EC.url_contains('/success'))
3.3 Custom Wait Conditions
from selenium.webdriver.support.ui import WebDriverWait
def wait_for_ajax(driver, timeout=30):
"""Waits until AJAX requests complete."""
wait = WebDriverWait(driver, timeout)
wait.until(lambda d: d.execute_script('return jQuery.active == 0'))
def wait_for_page_load(driver, timeout=30):
"""Waits until page fully loads."""
wait = WebDriverWait(driver, timeout)
wait.until(lambda d: d.execute_script('return document.readyState') == 'complete')
def wait_for_element_count(driver, locator, count, timeout=10):
"""Waits until a specific number of elements appear."""
wait = WebDriverWait(driver, timeout)
wait.until(lambda d: len(d.find_elements(*locator)) >= count)
# Usage example
driver.get('https://example.com')
wait_for_page_load(driver)
wait_for_element_count(driver, (By.CSS_SELECTOR, '.item'), 10)
4. Pagination Handling
4.1 Button Click Method
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException
def scrape_with_pagination(driver, item_selector, next_button_selector, max_pages=10):
"""Scrapes a site with pagination."""
all_items = []
current_page = 1
while current_page <= max_pages:
print(f"Scraping page {current_page}...")
# Collect items from current page
wait = WebDriverWait(driver, 10)
items = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, item_selector)))
for item in items:
all_items.append(item.text)
# Find next page button
try:
next_button = driver.find_element(By.CSS_SELECTOR, next_button_selector)
# Check if button is disabled
if 'disabled' in next_button.get_attribute('class') or not next_button.is_enabled():
print("Last page reached.")
break
# Navigate to next page
next_button.click()
# Wait for new content to load
time.sleep(2)
current_page += 1
except NoSuchElementException:
print("Next page button not found.")
break
except TimeoutException:
print("Page load timeout")
break
return all_items
# Usage example
driver = webdriver.Chrome()
driver.get('https://example.com/products')
items = scrape_with_pagination(
driver,
item_selector='.product-item',
next_button_selector='.pagination .next',
max_pages=5
)
print(f"Total {len(items)} items collected.")
4.2 URL Parameter Method
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
def scrape_url_pagination(base_url, item_selector, start_page=1, max_pages=10):
"""Handles URL parameter-based pagination."""
driver = webdriver.Chrome()
all_items = []
try:
for page in range(start_page, start_page + max_pages):
url = f"{base_url}?page={page}"
print(f"Scraping: {url}")
driver.get(url)
# Wait for content to load
wait = WebDriverWait(driver, 10)
try:
items = wait.until(EC.presence_of_all_elements_located(
(By.CSS_SELECTOR, item_selector)
))
if not items:
print("No more items found.")
break
for item in items:
all_items.append({
'text': item.text,
'page': page
})
time.sleep(1) # Prevent server overload
except Exception as e:
print(f"Error processing page {page}: {e}")
break
finally:
driver.quit()
return all_items
# Usage example
items = scrape_url_pagination(
base_url='https://example.com/search',
item_selector='.result-item',
max_pages=5
)
5. Scraping Sites That Require Login
5.1 Basic Login Handling
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
class LoginScraper:
"""Scraper for sites requiring login"""
def __init__(self, headless=False):
options = webdriver.ChromeOptions()
if headless:
options.add_argument('--headless')
self.driver = webdriver.Chrome(options=options)
self.wait = WebDriverWait(self.driver, 10)
def login(self, login_url, username, password, username_field, password_field, submit_button):
"""Logs into the website."""
self.driver.get(login_url)
# Enter username
username_input = self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, username_field))
)
username_input.clear()
username_input.send_keys(username)
# Enter password
password_input = self.driver.find_element(By.CSS_SELECTOR, password_field)
password_input.clear()
password_input.send_keys(password)
# Click login button
submit_btn = self.driver.find_element(By.CSS_SELECTOR, submit_button)
submit_btn.click()
# Wait for login to complete
time.sleep(3)
return self.is_logged_in()
def is_logged_in(self):
"""Checks if logged in."""
# Check for logout button or user profile element
try:
self.driver.find_element(By.CSS_SELECTOR, '.logout-btn, .user-profile')
return True
except:
return False
def scrape_protected_page(self, url, selector):
"""Scrapes a protected page after login."""
if not self.is_logged_in():
raise Exception("Login required.")
self.driver.get(url)
elements = self.wait.until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, selector))
)
return [el.text for el in elements]
def close(self):
"""Closes the browser."""
self.driver.quit()
# Usage example
scraper = LoginScraper()
try:
# Login
logged_in = scraper.login(
login_url='https://example.com/login',
username='your_username',
password='your_password',
username_field='input[name="username"]',
password_field='input[name="password"]',
submit_button='button[type="submit"]'
)
if logged_in:
print("Login successful!")
# Scrape protected page
data = scraper.scrape_protected_page(
url='https://example.com/dashboard',
selector='.dashboard-item'
)
print(data)
else:
print("Login failed")
finally:
scraper.close()
5.2 Maintaining Session with Cookies
import pickle
import os
from selenium import webdriver
def save_cookies(driver, filepath):
"""Saves cookies to a file."""
with open(filepath, 'wb') as f:
pickle.dump(driver.get_cookies(), f)
def load_cookies(driver, filepath):
"""Loads cookies from a file."""
if os.path.exists(filepath):
with open(filepath, 'rb') as f:
cookies = pickle.load(f)
for cookie in cookies:
# Handle cookie expiration
if 'expiry' in cookie:
del cookie['expiry']
driver.add_cookie(cookie)
return True
return False
# Usage example
driver = webdriver.Chrome()
cookie_file = 'cookies.pkl'
# First access the site
driver.get('https://example.com')
# Load saved cookies if available
if load_cookies(driver, cookie_file):
driver.refresh() # Refresh to apply cookies
print("Cookies loaded")
else:
# Perform login
# ... (login logic)
# Save cookies after login
save_cookies(driver, cookie_file)
print("Cookies saved")
6. Storing Data
6.1 Saving to CSV File
import csv
from datetime import datetime
def save_to_csv(data, filename, fieldnames=None):
"""Saves data to a CSV file."""
if not data:
print("No data to save.")
return
# Use keys from first data item if fieldnames not specified
if fieldnames is None:
fieldnames = data[0].keys()
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
print(f"Data saved to {filename}.")
# Usage example
products = [
{'name': 'Product A', 'price': 10000, 'rating': 4.5},
{'name': 'Product B', 'price': 20000, 'rating': 4.8},
{'name': 'Product C', 'price': 15000, 'rating': 4.2},
]
save_to_csv(products, 'products.csv')
6.2 Saving to JSON File
import json
def save_to_json(data, filename, indent=2):
"""Saves data to a JSON file."""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=indent)
print(f"Data saved to {filename}.")
def load_from_json(filename):
"""Loads data from a JSON file."""
with open(filename, 'r', encoding='utf-8') as f:
return json.load(f)
# Usage example
data = {
'scraped_at': '2026-01-22',
'total_items': 100,
'items': [
{'title': 'News 1', 'url': 'https://example.com/1'},
{'title': 'News 2', 'url': 'https://example.com/2'},
]
}
save_to_json(data, 'news_data.json')
6.3 Saving to SQLite Database
import sqlite3
from datetime import datetime
class DatabaseManager:
"""SQLite database management class"""
def __init__(self, db_name='scraped_data.db'):
self.conn = sqlite3.connect(db_name)
self.cursor = self.conn.cursor()
def create_table(self, table_name, columns):
"""Creates a table."""
columns_str = ', '.join([f'{name} {type_}' for name, type_ in columns])
query = f'CREATE TABLE IF NOT EXISTS {table_name} ({columns_str})'
self.cursor.execute(query)
self.conn.commit()
def insert(self, table_name, data):
"""Inserts data."""
columns = ', '.join(data.keys())
placeholders = ', '.join(['?' for _ in data])
query = f'INSERT INTO {table_name} ({columns}) VALUES ({placeholders})'
self.cursor.execute(query, list(data.values()))
self.conn.commit()
def insert_many(self, table_name, data_list):
"""Inserts multiple records at once."""
if not data_list:
return
columns = ', '.join(data_list[0].keys())
placeholders = ', '.join(['?' for _ in data_list[0]])
query = f'INSERT INTO {table_name} ({columns}) VALUES ({placeholders})'
values = [list(data.values()) for data in data_list]
self.cursor.executemany(query, values)
self.conn.commit()
def select_all(self, table_name):
"""Retrieves all data."""
query = f'SELECT * FROM {table_name}'
self.cursor.execute(query)
return self.cursor.fetchall()
def close(self):
"""Closes the database connection."""
self.conn.close()
# Usage example
db = DatabaseManager('products.db')
# Create table
db.create_table('products', [
('id', 'INTEGER PRIMARY KEY AUTOINCREMENT'),
('name', 'TEXT'),
('price', 'INTEGER'),
('rating', 'REAL'),
('scraped_at', 'TEXT')
])
# Insert data
products = [
{'name': 'Product A', 'price': 10000, 'rating': 4.5, 'scraped_at': datetime.now().isoformat()},
{'name': 'Product B', 'price': 20000, 'rating': 4.8, 'scraped_at': datetime.now().isoformat()},
]
db.insert_many('products', products)
# Query data
all_products = db.select_all('products')
for product in all_products:
print(product)
db.close()
7. Anti-Bot Bypass Techniques
Warning: Bypassing anti-bot systems may violate the website's terms of service. Always follow legal and ethical guidelines.
7.1 Basic Bypass Techniques
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import random
import time
def create_stealth_driver():
"""Creates a stealth driver to avoid detection."""
options = Options()
# Disable automation detection
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_experimental_option('useAutomationExtension', False)
# Appear like a real browser
options.add_argument('--disable-infobars')
options.add_argument('--disable-extensions')
# Random User-Agent
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0',
]
options.add_argument(f'--user-agent={random.choice(user_agents)}')
driver = webdriver.Chrome(options=options)
# Hide navigator.webdriver property
driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
'source': '''
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
})
'''
})
return driver
def random_delay(min_seconds=1, max_seconds=3):
"""Applies a random delay."""
time.sleep(random.uniform(min_seconds, max_seconds))
def human_like_typing(element, text):
"""Types text like a human."""
for char in text:
element.send_keys(char)
time.sleep(random.uniform(0.05, 0.2))
7.2 Using Proxies
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
def create_driver_with_proxy(proxy_address):
"""Creates a driver that uses a proxy."""
options = Options()
options.add_argument(f'--proxy-server={proxy_address}')
driver = webdriver.Chrome(options=options)
return driver
# Usage example
proxy = '123.456.789.012:8080'
driver = create_driver_with_proxy(proxy)
# Check IP
driver.get('https://httpbin.org/ip')
print(driver.page_source)
8. Hands-on Project: News Crawler
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import json
import time
from datetime import datetime
class NewsCrawler:
"""News site crawler"""
def __init__(self, headless=True):
options = Options()
if headless:
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
self.driver = webdriver.Chrome(options=options)
self.wait = WebDriverWait(self.driver, 10)
self.articles = []
def crawl_news_list(self, url, article_selector, title_selector, link_selector, max_articles=20):
"""Crawls a news list page."""
self.driver.get(url)
# Wait for page to load
self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, article_selector)))
articles = self.driver.find_elements(By.CSS_SELECTOR, article_selector)[:max_articles]
for article in articles:
try:
title_elem = article.find_element(By.CSS_SELECTOR, title_selector)
link_elem = article.find_element(By.CSS_SELECTOR, link_selector)
self.articles.append({
'title': title_elem.text.strip(),
'url': link_elem.get_attribute('href'),
'crawled_at': datetime.now().isoformat()
})
except Exception as e:
print(f"Article parsing error: {e}")
continue
time.sleep(0.5) # Prevent server overload
return self.articles
def crawl_article_content(self, url, content_selector, author_selector=None, date_selector=None):
"""Crawls an individual news article's content."""
self.driver.get(url)
article_data = {'url': url}
try:
# Extract content
content_elem = self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, content_selector))
)
article_data['content'] = content_elem.text.strip()
# Extract author (optional)
if author_selector:
try:
author_elem = self.driver.find_element(By.CSS_SELECTOR, author_selector)
article_data['author'] = author_elem.text.strip()
except:
article_data['author'] = None
# Extract date (optional)
if date_selector:
try:
date_elem = self.driver.find_element(By.CSS_SELECTOR, date_selector)
article_data['date'] = date_elem.text.strip()
except:
article_data['date'] = None
except Exception as e:
print(f"Content extraction error: {e}")
article_data['content'] = None
return article_data
def crawl_full_articles(self, content_selector, author_selector=None, date_selector=None):
"""Crawls full content for all articles collected from the list."""
for i, article in enumerate(self.articles):
print(f"[{i+1}/{len(self.articles)}] Crawling: {article['title'][:30]}...")
content_data = self.crawl_article_content(
article['url'],
content_selector,
author_selector,
date_selector
)
article.update(content_data)
time.sleep(1) # Prevent server overload
return self.articles
def save_to_json(self, filename):
"""Saves collected data to a JSON file."""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.articles, f, ensure_ascii=False, indent=2)
print(f"Data saved to {filename}.")
def close(self):
"""Closes the browser."""
self.driver.quit()
# Usage example
if __name__ == '__main__':
crawler = NewsCrawler(headless=True)
try:
# Crawl news list (modify URL and selectors for target site)
print("Starting news list crawl...")
crawler.crawl_news_list(
url='https://news.example.com/tech',
article_selector='.news-item',
title_selector='.news-title',
link_selector='a',
max_articles=10
)
# Crawl article content
print("\nStarting article content crawl...")
crawler.crawl_full_articles(
content_selector='.article-body',
author_selector='.author-name',
date_selector='.publish-date'
)
# Save results
crawler.save_to_json('news_articles.json')
print(f"\nTotal {len(crawler.articles)} articles collected.")
finally:
crawler.close()
9. Hands-on Project: E-Commerce Price Monitor
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import sqlite3
import time
from datetime import datetime
class PriceMonitor:
"""E-commerce price monitoring crawler"""
def __init__(self, db_name='price_history.db'):
# Browser setup
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
self.driver = webdriver.Chrome(options=options)
self.wait = WebDriverWait(self.driver, 10)
# Database setup
self.conn = sqlite3.connect(db_name)
self.cursor = self.conn.cursor()
self._create_tables()
def _create_tables(self):
"""Creates database tables."""
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
url TEXT UNIQUE,
created_at TEXT
)
''')
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS price_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER,
price INTEGER,
recorded_at TEXT,
FOREIGN KEY (product_id) REFERENCES products (id)
)
''')
self.conn.commit()
def add_product(self, name, url):
"""Adds a product to monitor."""
try:
self.cursor.execute(
'INSERT INTO products (name, url, created_at) VALUES (?, ?, ?)',
(name, url, datetime.now().isoformat())
)
self.conn.commit()
print(f"Product added: {name}")
except sqlite3.IntegrityError:
print(f"Product already registered: {name}")
def get_price(self, url, price_selector):
"""Gets the product price."""
self.driver.get(url)
try:
price_elem = self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, price_selector))
)
# Extract only digits from price text
price_text = price_elem.text
price = int(''.join(filter(str.isdigit, price_text)))
return price
except Exception as e:
print(f"Price extraction error: {e}")
return None
def record_price(self, product_id, price):
"""Records the price."""
self.cursor.execute(
'INSERT INTO price_history (product_id, price, recorded_at) VALUES (?, ?, ?)',
(product_id, price, datetime.now().isoformat())
)
self.conn.commit()
def check_all_prices(self, price_selector):
"""Checks prices for all registered products."""
self.cursor.execute('SELECT id, name, url FROM products')
products = self.cursor.fetchall()
results = []
for product_id, name, url in products:
print(f"Checking price: {name}")
price = self.get_price(url, price_selector)
if price:
self.record_price(product_id, price)
results.append({
'name': name,
'price': price,
'url': url
})
time.sleep(2) # Prevent server overload
return results
def get_price_history(self, product_id, limit=30):
"""Retrieves price history for a product."""
self.cursor.execute('''
SELECT price, recorded_at
FROM price_history
WHERE product_id = ?
ORDER BY recorded_at DESC
LIMIT ?
''', (product_id, limit))
return self.cursor.fetchall()
def get_price_alert(self, product_id, threshold_price):
"""Returns an alert if price drops below threshold."""
self.cursor.execute('''
SELECT p.name, ph.price
FROM products p
JOIN price_history ph ON p.id = ph.product_id
WHERE p.id = ?
ORDER BY ph.recorded_at DESC
LIMIT 1
''', (product_id,))
result = self.cursor.fetchone()
if result and result[1] <= threshold_price:
return {
'alert': True,
'name': result[0],
'current_price': result[1],
'threshold': threshold_price
}
return {'alert': False}
def close(self):
"""Cleans up resources."""
self.driver.quit()
self.conn.close()
# Usage example
if __name__ == '__main__':
monitor = PriceMonitor()
try:
# Add products to monitor (use actual URLs)
monitor.add_product('Laptop A', 'https://shop.example.com/product/1')
monitor.add_product('Laptop B', 'https://shop.example.com/product/2')
# Check and record prices
results = monitor.check_all_prices(price_selector='.product-price')
for result in results:
print(f"{result['name']}: ${result['price']:,}")
# Check price alert
alert = monitor.get_price_alert(product_id=1, threshold_price=1000000)
if alert['alert']:
print(f"\n[ALERT] {alert['name']} dropped to ${alert['current_price']:,}!")
finally:
monitor.close()
Conclusion
In this part, we learned advanced web scraping techniques using Selenium. We covered dynamic web page handling, browser automation, login processing, and various methods for storing collected data.
Web scraping is a powerful tool, but you must always follow legal and ethical guidelines. Check robots.txt, apply appropriate delays to avoid overloading servers, and respect the copyright of collected data.
Series Notice: The Python Automation Master series continues. In the next part, we'll learn about email automation, API integration, and more diverse automation techniques!