Python 자동화 마스터 4편: 웹 스크래핑 실전
Python Automation Master Part 4: Advanced Web Scraping
서론: 왜 Selenium이 필요한가?
3편에서 배운 requests와 BeautifulSoup은 정적 HTML을 파싱하는 데 효과적입니다. 하지만 현대 웹사이트의 많은 부분은 JavaScript로 동적으로 콘텐츠를 생성합니다. 이런 페이지에서는 requests로 가져온 HTML에 원하는 데이터가 없는 경우가 많습니다.
Selenium은 실제 웹 브라우저를 자동으로 제어하여 JavaScript가 실행된 후의 완전한 페이지를 가져올 수 있습니다. 또한 클릭, 입력, 스크롤 등 사용자 상호작용을 시뮬레이션할 수 있어 로그인, 무한 스크롤, 페이지네이션 처리에 유용합니다.
1. Selenium 설치와 WebDriver 설정
1.1 Selenium 설치
# Selenium 설치
pip install selenium
# 웹드라이버 자동 관리를 위한 webdriver-manager 설치
pip install webdriver-manager
1.2 WebDriver 설정
Selenium은 브라우저를 제어하기 위해 WebDriver가 필요합니다. webdriver-manager를 사용하면 드라이버를 자동으로 다운로드하고 관리할 수 있습니다.
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager
# Chrome 브라우저 설정 (자동 드라이버 관리)
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service)
# 웹 페이지 열기
driver.get('https://www.google.com')
# 페이지 제목 출력
print(driver.title)
# 브라우저 종료
driver.quit()
1.3 브라우저 옵션 설정
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
def create_driver(headless=False):
"""Chrome 드라이버를 생성합니다."""
options = Options()
# 헤드리스 모드 (브라우저 창 없이 실행)
if headless:
options.add_argument('--headless')
# 기본 옵션 설정
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--disable-gpu')
# 창 크기 설정
options.add_argument('--window-size=1920,1080')
# User-Agent 설정
options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
# 자동화 감지 방지
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_experimental_option('useAutomationExtension', False)
# 드라이버 생성
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=options)
# 자동화 탐지 우회를 위한 스크립트 실행
driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})")
return driver
# 사용 예시
driver = create_driver(headless=True)
driver.get('https://example.com')
print(driver.page_source)
driver.quit()
2. 브라우저 자동화 기본
2.1 요소 찾기
Selenium 4.x에서는 By 클래스를 사용하여 요소를 찾습니다.
from selenium import webdriver
from selenium.webdriver.common.by import By
driver = webdriver.Chrome()
driver.get('https://example.com')
# 다양한 방법으로 요소 찾기
# ID로 찾기
element = driver.find_element(By.ID, 'main-content')
# 클래스명으로 찾기
element = driver.find_element(By.CLASS_NAME, 'article')
# CSS 선택자로 찾기
element = driver.find_element(By.CSS_SELECTOR, 'div.content > p')
# XPath로 찾기
element = driver.find_element(By.XPATH, '//div[@class="content"]/p')
# 태그명으로 찾기
elements = driver.find_elements(By.TAG_NAME, 'a')
# 링크 텍스트로 찾기
element = driver.find_element(By.LINK_TEXT, '더 보기')
element = driver.find_element(By.PARTIAL_LINK_TEXT, '더')
# Name 속성으로 찾기
element = driver.find_element(By.NAME, 'username')
# 여러 요소 찾기 (리스트 반환)
all_links = driver.find_elements(By.TAG_NAME, 'a')
for link in all_links:
print(link.text, link.get_attribute('href'))
2.2 요소 상호작용
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
driver = webdriver.Chrome()
driver.get('https://www.google.com')
# 텍스트 입력
search_box = driver.find_element(By.NAME, 'q')
search_box.send_keys('Python Selenium')
# 키보드 입력
search_box.send_keys(Keys.ENTER) # Enter 키
# 또는
search_box.send_keys(Keys.CONTROL, 'a') # Ctrl+A (전체 선택)
# 클릭
button = driver.find_element(By.CSS_SELECTOR, 'button[type="submit"]')
button.click()
# 텍스트 가져오기
element = driver.find_element(By.ID, 'result')
print(element.text)
# 속성 가져오기
link = driver.find_element(By.TAG_NAME, 'a')
print(link.get_attribute('href'))
# 입력 필드 지우기
input_field = driver.find_element(By.NAME, 'username')
input_field.clear()
input_field.send_keys('new_value')
# 폼 제출
form = driver.find_element(By.TAG_NAME, 'form')
form.submit()
2.3 스크롤 처리
from selenium import webdriver
from selenium.webdriver.common.by import By
import time
driver = webdriver.Chrome()
driver.get('https://example.com/infinite-scroll')
def scroll_to_bottom(driver, pause_time=2):
"""페이지 맨 아래까지 스크롤합니다."""
last_height = driver.execute_script("return document.body.scrollHeight")
while True:
# 페이지 맨 아래로 스크롤
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
# 페이지 로딩 대기
time.sleep(pause_time)
# 새 높이 계산
new_height = driver.execute_script("return document.body.scrollHeight")
# 더 이상 스크롤할 수 없으면 종료
if new_height == last_height:
break
last_height = new_height
def scroll_by_pixel(driver, pixels=500):
"""지정된 픽셀만큼 스크롤합니다."""
driver.execute_script(f"window.scrollBy(0, {pixels});")
def scroll_to_element(driver, element):
"""특정 요소가 보이도록 스크롤합니다."""
driver.execute_script("arguments[0].scrollIntoView(true);", element)
# 사용 예시
scroll_to_bottom(driver)
# 특정 요소로 스크롤
target = driver.find_element(By.ID, 'target-section')
scroll_to_element(driver, target)
3. 대기 전략 (Wait)
동적 웹페이지에서는 요소가 로드될 때까지 기다려야 합니다. Selenium은 두 가지 대기 방식을 제공합니다.
3.1 암시적 대기 (Implicit Wait)
전역적으로 설정되며, 요소를 찾을 때까지 지정된 시간만큼 대기합니다.
from selenium import webdriver
driver = webdriver.Chrome()
# 암시적 대기 설정 (최대 10초)
driver.implicitly_wait(10)
# 이제 모든 find_element 호출은 최대 10초까지 대기
driver.get('https://example.com')
element = driver.find_element(By.ID, 'dynamic-content')
3.2 명시적 대기 (Explicit Wait)
특정 조건이 만족될 때까지 대기합니다. 더 유연하고 권장되는 방식입니다.
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
driver = webdriver.Chrome()
driver.get('https://example.com')
# 요소가 나타날 때까지 대기 (최대 10초)
wait = WebDriverWait(driver, 10)
# 요소가 존재할 때까지 대기
element = wait.until(EC.presence_of_element_located((By.ID, 'content')))
# 요소가 보일 때까지 대기
element = wait.until(EC.visibility_of_element_located((By.ID, 'content')))
# 요소가 클릭 가능할 때까지 대기
button = wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, 'button.submit')))
# 특정 텍스트가 나타날 때까지 대기
wait.until(EC.text_to_be_present_in_element((By.ID, 'status'), '완료'))
# 요소가 사라질 때까지 대기
wait.until(EC.invisibility_of_element_located((By.ID, 'loading')))
# 새 창이 열릴 때까지 대기
wait.until(EC.number_of_windows_to_be(2))
# URL이 특정 값을 포함할 때까지 대기
wait.until(EC.url_contains('/success'))
3.3 커스텀 대기 조건
from selenium.webdriver.support.ui import WebDriverWait
def wait_for_ajax(driver, timeout=30):
"""AJAX 요청이 완료될 때까지 대기합니다."""
wait = WebDriverWait(driver, timeout)
wait.until(lambda d: d.execute_script('return jQuery.active == 0'))
def wait_for_page_load(driver, timeout=30):
"""페이지가 완전히 로드될 때까지 대기합니다."""
wait = WebDriverWait(driver, timeout)
wait.until(lambda d: d.execute_script('return document.readyState') == 'complete')
def wait_for_element_count(driver, locator, count, timeout=10):
"""특정 개수의 요소가 나타날 때까지 대기합니다."""
wait = WebDriverWait(driver, timeout)
wait.until(lambda d: len(d.find_elements(*locator)) >= count)
# 사용 예시
driver.get('https://example.com')
wait_for_page_load(driver)
wait_for_element_count(driver, (By.CSS_SELECTOR, '.item'), 10)
4. 페이지네이션 처리
4.1 버튼 클릭 방식
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException
def scrape_with_pagination(driver, item_selector, next_button_selector, max_pages=10):
"""페이지네이션이 있는 사이트를 스크래핑합니다."""
all_items = []
current_page = 1
while current_page <= max_pages:
print(f"페이지 {current_page} 스크래핑 중...")
# 현재 페이지의 아이템 수집
wait = WebDriverWait(driver, 10)
items = wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, item_selector)))
for item in items:
all_items.append(item.text)
# 다음 페이지 버튼 찾기
try:
next_button = driver.find_element(By.CSS_SELECTOR, next_button_selector)
# 버튼이 비활성화되어 있는지 확인
if 'disabled' in next_button.get_attribute('class') or not next_button.is_enabled():
print("마지막 페이지입니다.")
break
# 다음 페이지로 이동
next_button.click()
# 새 콘텐츠 로딩 대기
time.sleep(2)
current_page += 1
except NoSuchElementException:
print("다음 페이지 버튼을 찾을 수 없습니다.")
break
except TimeoutException:
print("페이지 로딩 시간 초과")
break
return all_items
# 사용 예시
driver = webdriver.Chrome()
driver.get('https://example.com/products')
items = scrape_with_pagination(
driver,
item_selector='.product-item',
next_button_selector='.pagination .next',
max_pages=5
)
print(f"총 {len(items)}개의 아이템을 수집했습니다.")
4.2 URL 파라미터 방식
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
def scrape_url_pagination(base_url, item_selector, start_page=1, max_pages=10):
"""URL 파라미터 기반 페이지네이션을 처리합니다."""
driver = webdriver.Chrome()
all_items = []
try:
for page in range(start_page, start_page + max_pages):
url = f"{base_url}?page={page}"
print(f"스크래핑: {url}")
driver.get(url)
# 콘텐츠 로딩 대기
wait = WebDriverWait(driver, 10)
try:
items = wait.until(EC.presence_of_all_elements_located(
(By.CSS_SELECTOR, item_selector)
))
if not items:
print("더 이상 아이템이 없습니다.")
break
for item in items:
all_items.append({
'text': item.text,
'page': page
})
time.sleep(1) # 서버 부하 방지
except Exception as e:
print(f"페이지 {page} 처리 중 오류: {e}")
break
finally:
driver.quit()
return all_items
# 사용 예시
items = scrape_url_pagination(
base_url='https://example.com/search',
item_selector='.result-item',
max_pages=5
)
5. 로그인이 필요한 사이트 스크래핑
5.1 기본 로그인 처리
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
class LoginScraper:
"""로그인이 필요한 사이트 스크래퍼"""
def __init__(self, headless=False):
options = webdriver.ChromeOptions()
if headless:
options.add_argument('--headless')
self.driver = webdriver.Chrome(options=options)
self.wait = WebDriverWait(self.driver, 10)
def login(self, login_url, username, password, username_field, password_field, submit_button):
"""웹사이트에 로그인합니다."""
self.driver.get(login_url)
# 사용자명 입력
username_input = self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, username_field))
)
username_input.clear()
username_input.send_keys(username)
# 비밀번호 입력
password_input = self.driver.find_element(By.CSS_SELECTOR, password_field)
password_input.clear()
password_input.send_keys(password)
# 로그인 버튼 클릭
submit_btn = self.driver.find_element(By.CSS_SELECTOR, submit_button)
submit_btn.click()
# 로그인 완료 대기
time.sleep(3)
return self.is_logged_in()
def is_logged_in(self):
"""로그인 상태를 확인합니다."""
# 로그아웃 버튼이나 사용자 프로필 요소 확인
try:
self.driver.find_element(By.CSS_SELECTOR, '.logout-btn, .user-profile')
return True
except:
return False
def scrape_protected_page(self, url, selector):
"""로그인 후 보호된 페이지를 스크래핑합니다."""
if not self.is_logged_in():
raise Exception("로그인이 필요합니다.")
self.driver.get(url)
elements = self.wait.until(
EC.presence_of_all_elements_located((By.CSS_SELECTOR, selector))
)
return [el.text for el in elements]
def close(self):
"""브라우저를 종료합니다."""
self.driver.quit()
# 사용 예시
scraper = LoginScraper()
try:
# 로그인
logged_in = scraper.login(
login_url='https://example.com/login',
username='your_username',
password='your_password',
username_field='input[name="username"]',
password_field='input[name="password"]',
submit_button='button[type="submit"]'
)
if logged_in:
print("로그인 성공!")
# 보호된 페이지 스크래핑
data = scraper.scrape_protected_page(
url='https://example.com/dashboard',
selector='.dashboard-item'
)
print(data)
else:
print("로그인 실패")
finally:
scraper.close()
5.2 쿠키를 사용한 세션 유지
import pickle
import os
from selenium import webdriver
def save_cookies(driver, filepath):
"""쿠키를 파일에 저장합니다."""
with open(filepath, 'wb') as f:
pickle.dump(driver.get_cookies(), f)
def load_cookies(driver, filepath):
"""파일에서 쿠키를 로드합니다."""
if os.path.exists(filepath):
with open(filepath, 'rb') as f:
cookies = pickle.load(f)
for cookie in cookies:
# 쿠키 만료 처리
if 'expiry' in cookie:
del cookie['expiry']
driver.add_cookie(cookie)
return True
return False
# 사용 예시
driver = webdriver.Chrome()
cookie_file = 'cookies.pkl'
# 먼저 사이트에 접속
driver.get('https://example.com')
# 저장된 쿠키가 있으면 로드
if load_cookies(driver, cookie_file):
driver.refresh() # 쿠키 적용을 위해 새로고침
print("쿠키 로드 완료")
else:
# 로그인 수행
# ... (로그인 로직)
# 로그인 후 쿠키 저장
save_cookies(driver, cookie_file)
print("쿠키 저장 완료")
6. 데이터 저장
6.1 CSV 파일로 저장
import csv
from datetime import datetime
def save_to_csv(data, filename, fieldnames=None):
"""데이터를 CSV 파일로 저장합니다."""
if not data:
print("저장할 데이터가 없습니다.")
return
# 필드명이 없으면 첫 번째 데이터의 키 사용
if fieldnames is None:
fieldnames = data[0].keys()
with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
print(f"데이터가 {filename}에 저장되었습니다.")
# 사용 예시
products = [
{'name': '상품 A', 'price': 10000, 'rating': 4.5},
{'name': '상품 B', 'price': 20000, 'rating': 4.8},
{'name': '상품 C', 'price': 15000, 'rating': 4.2},
]
save_to_csv(products, 'products.csv')
6.2 JSON 파일로 저장
import json
def save_to_json(data, filename, indent=2):
"""데이터를 JSON 파일로 저장합니다."""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=indent)
print(f"데이터가 {filename}에 저장되었습니다.")
def load_from_json(filename):
"""JSON 파일에서 데이터를 로드합니다."""
with open(filename, 'r', encoding='utf-8') as f:
return json.load(f)
# 사용 예시
data = {
'scraped_at': '2026-01-22',
'total_items': 100,
'items': [
{'title': '뉴스 1', 'url': 'https://example.com/1'},
{'title': '뉴스 2', 'url': 'https://example.com/2'},
]
}
save_to_json(data, 'news_data.json')
6.3 SQLite 데이터베이스에 저장
import sqlite3
from datetime import datetime
class DatabaseManager:
"""SQLite 데이터베이스 관리 클래스"""
def __init__(self, db_name='scraped_data.db'):
self.conn = sqlite3.connect(db_name)
self.cursor = self.conn.cursor()
def create_table(self, table_name, columns):
"""테이블을 생성합니다."""
columns_str = ', '.join([f'{name} {type_}' for name, type_ in columns])
query = f'CREATE TABLE IF NOT EXISTS {table_name} ({columns_str})'
self.cursor.execute(query)
self.conn.commit()
def insert(self, table_name, data):
"""데이터를 삽입합니다."""
columns = ', '.join(data.keys())
placeholders = ', '.join(['?' for _ in data])
query = f'INSERT INTO {table_name} ({columns}) VALUES ({placeholders})'
self.cursor.execute(query, list(data.values()))
self.conn.commit()
def insert_many(self, table_name, data_list):
"""여러 데이터를 한 번에 삽입합니다."""
if not data_list:
return
columns = ', '.join(data_list[0].keys())
placeholders = ', '.join(['?' for _ in data_list[0]])
query = f'INSERT INTO {table_name} ({columns}) VALUES ({placeholders})'
values = [list(data.values()) for data in data_list]
self.cursor.executemany(query, values)
self.conn.commit()
def select_all(self, table_name):
"""모든 데이터를 조회합니다."""
query = f'SELECT * FROM {table_name}'
self.cursor.execute(query)
return self.cursor.fetchall()
def close(self):
"""데이터베이스 연결을 종료합니다."""
self.conn.close()
# 사용 예시
db = DatabaseManager('products.db')
# 테이블 생성
db.create_table('products', [
('id', 'INTEGER PRIMARY KEY AUTOINCREMENT'),
('name', 'TEXT'),
('price', 'INTEGER'),
('rating', 'REAL'),
('scraped_at', 'TEXT')
])
# 데이터 삽입
products = [
{'name': '상품 A', 'price': 10000, 'rating': 4.5, 'scraped_at': datetime.now().isoformat()},
{'name': '상품 B', 'price': 20000, 'rating': 4.8, 'scraped_at': datetime.now().isoformat()},
]
db.insert_many('products', products)
# 데이터 조회
all_products = db.select_all('products')
for product in all_products:
print(product)
db.close()
7. 안티봇 우회 기법
주의: 안티봇 시스템 우회는 해당 사이트의 이용약관을 위반할 수 있습니다. 반드시 법적, 윤리적 기준을 준수하세요.
7.1 기본 우회 기법
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import random
import time
def create_stealth_driver():
"""탐지를 피하기 위한 스텔스 드라이버를 생성합니다."""
options = Options()
# 자동화 감지 방지
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_experimental_option('excludeSwitches', ['enable-automation'])
options.add_experimental_option('useAutomationExtension', False)
# 실제 브라우저처럼 보이기
options.add_argument('--disable-infobars')
options.add_argument('--disable-extensions')
# 랜덤 User-Agent
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/121.0',
]
options.add_argument(f'--user-agent={random.choice(user_agents)}')
driver = webdriver.Chrome(options=options)
# navigator.webdriver 속성 숨기기
driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
'source': '''
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
})
'''
})
return driver
def random_delay(min_seconds=1, max_seconds=3):
"""랜덤한 딜레이를 적용합니다."""
time.sleep(random.uniform(min_seconds, max_seconds))
def human_like_typing(element, text):
"""사람처럼 텍스트를 입력합니다."""
for char in text:
element.send_keys(char)
time.sleep(random.uniform(0.05, 0.2))
7.2 프록시 사용
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
def create_driver_with_proxy(proxy_address):
"""프록시를 사용하는 드라이버를 생성합니다."""
options = Options()
options.add_argument(f'--proxy-server={proxy_address}')
driver = webdriver.Chrome(options=options)
return driver
# 사용 예시
proxy = '123.456.789.012:8080'
driver = create_driver_with_proxy(proxy)
# IP 확인
driver.get('https://httpbin.org/ip')
print(driver.page_source)
8. 실전 프로젝트: 뉴스 크롤러
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import json
import time
from datetime import datetime
class NewsCrawler:
"""뉴스 사이트 크롤러"""
def __init__(self, headless=True):
options = Options()
if headless:
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
self.driver = webdriver.Chrome(options=options)
self.wait = WebDriverWait(self.driver, 10)
self.articles = []
def crawl_news_list(self, url, article_selector, title_selector, link_selector, max_articles=20):
"""뉴스 목록 페이지를 크롤링합니다."""
self.driver.get(url)
# 페이지 로딩 대기
self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, article_selector)))
articles = self.driver.find_elements(By.CSS_SELECTOR, article_selector)[:max_articles]
for article in articles:
try:
title_elem = article.find_element(By.CSS_SELECTOR, title_selector)
link_elem = article.find_element(By.CSS_SELECTOR, link_selector)
self.articles.append({
'title': title_elem.text.strip(),
'url': link_elem.get_attribute('href'),
'crawled_at': datetime.now().isoformat()
})
except Exception as e:
print(f"기사 파싱 오류: {e}")
continue
time.sleep(0.5) # 서버 부하 방지
return self.articles
def crawl_article_content(self, url, content_selector, author_selector=None, date_selector=None):
"""개별 뉴스 기사의 본문을 크롤링합니다."""
self.driver.get(url)
article_data = {'url': url}
try:
# 본문 추출
content_elem = self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, content_selector))
)
article_data['content'] = content_elem.text.strip()
# 작성자 추출 (선택적)
if author_selector:
try:
author_elem = self.driver.find_element(By.CSS_SELECTOR, author_selector)
article_data['author'] = author_elem.text.strip()
except:
article_data['author'] = None
# 날짜 추출 (선택적)
if date_selector:
try:
date_elem = self.driver.find_element(By.CSS_SELECTOR, date_selector)
article_data['date'] = date_elem.text.strip()
except:
article_data['date'] = None
except Exception as e:
print(f"본문 추출 오류: {e}")
article_data['content'] = None
return article_data
def crawl_full_articles(self, content_selector, author_selector=None, date_selector=None):
"""목록에서 수집한 모든 기사의 본문을 크롤링합니다."""
for i, article in enumerate(self.articles):
print(f"[{i+1}/{len(self.articles)}] 크롤링 중: {article['title'][:30]}...")
content_data = self.crawl_article_content(
article['url'],
content_selector,
author_selector,
date_selector
)
article.update(content_data)
time.sleep(1) # 서버 부하 방지
return self.articles
def save_to_json(self, filename):
"""수집한 데이터를 JSON 파일로 저장합니다."""
with open(filename, 'w', encoding='utf-8') as f:
json.dump(self.articles, f, ensure_ascii=False, indent=2)
print(f"데이터가 {filename}에 저장되었습니다.")
def close(self):
"""브라우저를 종료합니다."""
self.driver.quit()
# 사용 예시
if __name__ == '__main__':
crawler = NewsCrawler(headless=True)
try:
# 뉴스 목록 크롤링 (실제 URL과 선택자는 대상 사이트에 맞게 수정)
print("뉴스 목록 크롤링 시작...")
crawler.crawl_news_list(
url='https://news.example.com/tech',
article_selector='.news-item',
title_selector='.news-title',
link_selector='a',
max_articles=10
)
# 기사 본문 크롤링
print("\n기사 본문 크롤링 시작...")
crawler.crawl_full_articles(
content_selector='.article-body',
author_selector='.author-name',
date_selector='.publish-date'
)
# 결과 저장
crawler.save_to_json('news_articles.json')
print(f"\n총 {len(crawler.articles)}개의 기사를 수집했습니다.")
finally:
crawler.close()
9. 실전 프로젝트: 쇼핑몰 가격 모니터링
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import sqlite3
import time
from datetime import datetime
class PriceMonitor:
"""쇼핑몰 가격 모니터링 크롤러"""
def __init__(self, db_name='price_history.db'):
# 브라우저 설정
options = Options()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
self.driver = webdriver.Chrome(options=options)
self.wait = WebDriverWait(self.driver, 10)
# 데이터베이스 설정
self.conn = sqlite3.connect(db_name)
self.cursor = self.conn.cursor()
self._create_tables()
def _create_tables(self):
"""데이터베이스 테이블을 생성합니다."""
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT,
url TEXT UNIQUE,
created_at TEXT
)
''')
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS price_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id INTEGER,
price INTEGER,
recorded_at TEXT,
FOREIGN KEY (product_id) REFERENCES products (id)
)
''')
self.conn.commit()
def add_product(self, name, url):
"""모니터링할 상품을 추가합니다."""
try:
self.cursor.execute(
'INSERT INTO products (name, url, created_at) VALUES (?, ?, ?)',
(name, url, datetime.now().isoformat())
)
self.conn.commit()
print(f"상품 추가됨: {name}")
except sqlite3.IntegrityError:
print(f"이미 등록된 상품: {name}")
def get_price(self, url, price_selector):
"""상품 가격을 가져옵니다."""
self.driver.get(url)
try:
price_elem = self.wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, price_selector))
)
# 가격 텍스트에서 숫자만 추출
price_text = price_elem.text
price = int(''.join(filter(str.isdigit, price_text)))
return price
except Exception as e:
print(f"가격 추출 오류: {e}")
return None
def record_price(self, product_id, price):
"""가격을 기록합니다."""
self.cursor.execute(
'INSERT INTO price_history (product_id, price, recorded_at) VALUES (?, ?, ?)',
(product_id, price, datetime.now().isoformat())
)
self.conn.commit()
def check_all_prices(self, price_selector):
"""등록된 모든 상품의 가격을 확인합니다."""
self.cursor.execute('SELECT id, name, url FROM products')
products = self.cursor.fetchall()
results = []
for product_id, name, url in products:
print(f"가격 확인 중: {name}")
price = self.get_price(url, price_selector)
if price:
self.record_price(product_id, price)
results.append({
'name': name,
'price': price,
'url': url
})
time.sleep(2) # 서버 부하 방지
return results
def get_price_history(self, product_id, limit=30):
"""상품의 가격 변동 이력을 조회합니다."""
self.cursor.execute('''
SELECT price, recorded_at
FROM price_history
WHERE product_id = ?
ORDER BY recorded_at DESC
LIMIT ?
''', (product_id, limit))
return self.cursor.fetchall()
def get_price_alert(self, product_id, threshold_price):
"""가격이 특정 금액 이하로 떨어지면 알림을 반환합니다."""
self.cursor.execute('''
SELECT p.name, ph.price
FROM products p
JOIN price_history ph ON p.id = ph.product_id
WHERE p.id = ?
ORDER BY ph.recorded_at DESC
LIMIT 1
''', (product_id,))
result = self.cursor.fetchone()
if result and result[1] <= threshold_price:
return {
'alert': True,
'name': result[0],
'current_price': result[1],
'threshold': threshold_price
}
return {'alert': False}
def close(self):
"""리소스를 정리합니다."""
self.driver.quit()
self.conn.close()
# 사용 예시
if __name__ == '__main__':
monitor = PriceMonitor()
try:
# 모니터링할 상품 추가 (실제 URL로 변경)
monitor.add_product('노트북 A', 'https://shop.example.com/product/1')
monitor.add_product('노트북 B', 'https://shop.example.com/product/2')
# 가격 확인 및 기록
results = monitor.check_all_prices(price_selector='.product-price')
for result in results:
print(f"{result['name']}: {result['price']:,}원")
# 가격 알림 확인
alert = monitor.get_price_alert(product_id=1, threshold_price=1000000)
if alert['alert']:
print(f"\n[알림] {alert['name']}이(가) {alert['current_price']:,}원으로 하락했습니다!")
finally:
monitor.close()
마무리
이번 편에서는 Selenium을 사용한 고급 웹 스크래핑 기법을 배웠습니다. 동적 웹페이지 처리, 브라우저 자동화, 로그인 처리, 그리고 수집한 데이터를 다양한 형태로 저장하는 방법을 익혔습니다.
웹 스크래핑은 강력한 도구이지만, 항상 법적, 윤리적 기준을 준수해야 합니다. robots.txt를 확인하고, 서버에 과부하를 주지 않도록 적절한 딜레이를 적용하며, 수집한 데이터의 저작권을 존중해야 합니다.
시리즈 안내: Python 자동화 마스터 시리즈는 계속됩니다. 다음 편에서는 이메일 자동화, API 연동 등 더 다양한 자동화 기법을 배워보겠습니다!