Python 자동화 마스터 7편: API 활용과 데이터 수집
Python Automation Master Part 7: API Integration and Data Collection
서론: API가 여는 데이터의 세계
현대 웹 서비스의 핵심은 API(Application Programming Interface)입니다. API를 통해 우리는 날씨 정보, 주식 시세, 뉴스, 소셜 미디어 데이터 등 다양한 정보를 프로그래밍 방식으로 수집하고 활용할 수 있습니다. Python의 requests 라이브러리를 사용하면 이러한 API 호출을 매우 간단하게 처리할 수 있습니다.
이번 편에서는 REST API의 기본 개념부터 시작하여, 다양한 인증 방식, 실전 API 활용법, 그리고 효율적인 데이터 수집 전략까지 체계적으로 학습하겠습니다.
1. API 기초: REST API의 이해
1.1 API란 무엇인가?
API는 서로 다른 소프트웨어 간의 통신을 가능하게 하는 인터페이스입니다. 레스토랑에 비유하면, API는 손님(클라이언트)과 주방(서버) 사이의 웨이터 역할을 합니다. 손님이 메뉴를 주문하면 웨이터가 주방에 전달하고, 요리가 완성되면 다시 손님에게 가져다주는 것처럼요.
# API 요청의 기본 구조
import requests
# 1. 요청(Request) 보내기
response = requests.get("https://api.example.com/data")
# 2. 응답(Response) 받기
if response.status_code == 200:
data = response.json()
print(data)
else:
print(f"오류 발생: {response.status_code}")
1.2 REST API의 핵심 개념
REST(Representational State Transfer)는 웹 API 설계의 표준 아키텍처입니다.
# HTTP 메서드별 용도
"""
GET : 데이터 조회 (Read)
POST : 데이터 생성 (Create)
PUT : 데이터 전체 수정 (Update)
PATCH : 데이터 일부 수정 (Partial Update)
DELETE : 데이터 삭제 (Delete)
"""
import requests
base_url = "https://api.example.com"
# GET - 데이터 조회
response = requests.get(f"{base_url}/users")
# POST - 데이터 생성
new_user = {"name": "홍길동", "email": "hong@example.com"}
response = requests.post(f"{base_url}/users", json=new_user)
# PUT - 데이터 수정
updated_user = {"name": "홍길동", "email": "newemail@example.com"}
response = requests.put(f"{base_url}/users/1", json=updated_user)
# DELETE - 데이터 삭제
response = requests.delete(f"{base_url}/users/1")
1.3 HTTP 상태 코드 이해하기
# 주요 HTTP 상태 코드
status_codes = {
# 2xx: 성공
200: "OK - 요청 성공",
201: "Created - 생성 성공",
204: "No Content - 성공(응답 본문 없음)",
# 3xx: 리다이렉션
301: "Moved Permanently - 영구 이동",
302: "Found - 임시 이동",
# 4xx: 클라이언트 오류
400: "Bad Request - 잘못된 요청",
401: "Unauthorized - 인증 필요",
403: "Forbidden - 접근 거부",
404: "Not Found - 리소스 없음",
429: "Too Many Requests - 요청 횟수 초과",
# 5xx: 서버 오류
500: "Internal Server Error - 서버 내부 오류",
502: "Bad Gateway - 게이트웨이 오류",
503: "Service Unavailable - 서비스 이용 불가"
}
def handle_response(response):
"""응답 상태 코드에 따른 처리"""
code = response.status_code
if 200 <= code < 300:
print(f"성공: {status_codes.get(code, '알 수 없는 성공 코드')}")
return response.json() if response.content else None
elif 400 <= code < 500:
print(f"클라이언트 오류: {status_codes.get(code, '알 수 없는 오류')}")
return None
elif 500 <= code < 600:
print(f"서버 오류: {status_codes.get(code, '알 수 없는 서버 오류')}")
return None
2. requests 라이브러리 완벽 가이드
2.1 기본 사용법
import requests
# 설치: pip install requests
# GET 요청 기본
response = requests.get("https://api.github.com/users/octocat")
print(response.status_code) # 200
print(response.headers) # 응답 헤더
print(response.text) # 텍스트 형태의 응답
print(response.json()) # JSON 파싱된 딕셔너리
# URL 파라미터 전달
params = {
"q": "python",
"sort": "stars",
"order": "desc"
}
response = requests.get(
"https://api.github.com/search/repositories",
params=params
)
# 실제 요청 URL: https://api.github.com/search/repositories?q=python&sort=stars&order=desc
2.2 헤더와 타임아웃 설정
import requests
# 커스텀 헤더 설정
headers = {
"User-Agent": "MyApp/1.0",
"Accept": "application/json",
"Content-Type": "application/json"
}
# 타임아웃 설정 (연결 타임아웃, 읽기 타임아웃)
try:
response = requests.get(
"https://api.example.com/data",
headers=headers,
timeout=(5, 30) # 연결: 5초, 읽기: 30초
)
except requests.exceptions.Timeout:
print("요청 시간이 초과되었습니다.")
except requests.exceptions.ConnectionError:
print("연결에 실패했습니다.")
except requests.exceptions.RequestException as e:
print(f"요청 중 오류 발생: {e}")
2.3 세션 활용하기
import requests
# 세션을 사용하면 연결을 재사용하여 성능 향상
session = requests.Session()
# 세션에 기본 설정 적용
session.headers.update({
"User-Agent": "MyApp/1.0",
"Accept": "application/json"
})
# 같은 호스트에 여러 요청 시 세션 사용
urls = [
"https://api.github.com/users/octocat",
"https://api.github.com/users/octocat/repos",
"https://api.github.com/users/octocat/followers"
]
for url in urls:
response = session.get(url)
print(f"{url}: {response.status_code}")
# 세션 종료
session.close()
# 또는 컨텍스트 매니저 사용 (권장)
with requests.Session() as session:
session.headers.update({"User-Agent": "MyApp/1.0"})
response = session.get("https://api.github.com/users/octocat")
3. API 인증 방식
3.1 API Key 인증
import requests
# 방법 1: URL 파라미터로 전달
api_key = "your_api_key_here"
response = requests.get(
"https://api.example.com/data",
params={"api_key": api_key}
)
# 방법 2: 헤더로 전달
headers = {"X-API-Key": api_key}
response = requests.get(
"https://api.example.com/data",
headers=headers
)
# 방법 3: 인증 헤더로 전달
headers = {"Authorization": f"Api-Key {api_key}"}
response = requests.get(
"https://api.example.com/data",
headers=headers
)
3.2 Bearer Token 인증 (OAuth 2.0)
import requests
# Bearer Token 사용
access_token = "your_access_token_here"
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
response = requests.get(
"https://api.example.com/protected/resource",
headers=headers
)
# OAuth 2.0 토큰 발급 예제
def get_oauth_token(client_id, client_secret, token_url):
"""OAuth 2.0 Client Credentials 방식으로 토큰 발급"""
data = {
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret
}
response = requests.post(token_url, data=data)
if response.status_code == 200:
token_data = response.json()
return token_data.get("access_token")
else:
raise Exception(f"토큰 발급 실패: {response.text}")
# 사용 예
# token = get_oauth_token("my_client_id", "my_secret", "https://auth.example.com/token")
3.3 Basic 인증
import requests
from requests.auth import HTTPBasicAuth
# 방법 1: auth 파라미터 사용
response = requests.get(
"https://api.example.com/data",
auth=HTTPBasicAuth("username", "password")
)
# 방법 2: 튜플로 간단하게
response = requests.get(
"https://api.example.com/data",
auth=("username", "password")
)
4. JSON 응답 처리
4.1 JSON 데이터 파싱
import requests
import json
response = requests.get("https://api.github.com/users/octocat")
data = response.json()
# 데이터 접근
print(f"사용자명: {data['login']}")
print(f"이름: {data.get('name', '없음')}") # 안전한 접근
# 중첩된 JSON 처리
def safe_get(data, *keys, default=None):
"""중첩된 딕셔너리에서 안전하게 값 가져오기"""
for key in keys:
if isinstance(data, dict):
data = data.get(key, default)
elif isinstance(data, list) and isinstance(key, int):
try:
data = data[key]
except IndexError:
return default
else:
return default
return data
# 사용 예
nested_data = {
"user": {
"profile": {
"name": "홍길동",
"contacts": [
{"type": "email", "value": "hong@example.com"}
]
}
}
}
name = safe_get(nested_data, "user", "profile", "name")
email = safe_get(nested_data, "user", "profile", "contacts", 0, "value")
print(f"이름: {name}, 이메일: {email}")
4.2 JSON 데이터 저장 및 로드
import json
from pathlib import Path
def save_json(data, filepath, indent=2, ensure_ascii=False):
"""JSON 데이터를 파일에 저장"""
filepath = Path(filepath)
filepath.parent.mkdir(parents=True, exist_ok=True)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=indent, ensure_ascii=ensure_ascii)
print(f"저장 완료: {filepath}")
def load_json(filepath):
"""JSON 파일에서 데이터 로드"""
with open(filepath, 'r', encoding='utf-8') as f:
return json.load(f)
# 사용 예
api_data = {"name": "홍길동", "age": 30, "city": "서울"}
save_json(api_data, "data/user_info.json")
loaded_data = load_json("data/user_info.json")
5. 공공데이터 API 활용
5.1 공공데이터포털 API 사용하기
import requests
from urllib.parse import quote_plus
class PublicDataAPI:
"""공공데이터포털 API 클라이언트"""
def __init__(self, service_key):
self.service_key = service_key
self.base_url = "http://apis.data.go.kr"
def get_covid_status(self, start_date, end_date):
"""코로나19 현황 조회"""
url = f"{self.base_url}/1352000/ODMS_COVID_04/callCovid04Api"
params = {
"serviceKey": self.service_key,
"pageNo": 1,
"numOfRows": 100,
"apiType": "JSON",
"std_day_from": start_date,
"std_day_to": end_date
}
response = requests.get(url, params=params)
return response.json()
def get_air_quality(self, sido_name):
"""시도별 실시간 대기질 정보 조회"""
url = f"{self.base_url}/B552584/ArpltnInforInqireSvc/getCtprvnRltmMesureDnsty"
params = {
"serviceKey": self.service_key,
"returnType": "json",
"numOfRows": 100,
"pageNo": 1,
"sidoName": sido_name,
"ver": "1.0"
}
response = requests.get(url, params=params)
return response.json()
# 사용 예
# api = PublicDataAPI("your_service_key")
# covid_data = api.get_covid_status("2026-01-01", "2026-01-22")
# air_data = api.get_air_quality("서울")
5.2 기상청 날씨 API
import requests
from datetime import datetime, timedelta
class WeatherAPI:
"""기상청 날씨 API 클라이언트"""
def __init__(self, service_key):
self.service_key = service_key
self.base_url = "http://apis.data.go.kr/1360000/VilageFcstInfoService_2.0"
def get_ultra_short_forecast(self, nx, ny, base_date=None, base_time=None):
"""초단기 예보 조회"""
if base_date is None:
base_date = datetime.now().strftime("%Y%m%d")
if base_time is None:
# 매시 30분에 발표되므로 이전 시간대 사용
base_time = (datetime.now() - timedelta(hours=1)).strftime("%H30")
url = f"{self.base_url}/getUltraSrtFcst"
params = {
"serviceKey": self.service_key,
"pageNo": 1,
"numOfRows": 60,
"dataType": "JSON",
"base_date": base_date,
"base_time": base_time,
"nx": nx,
"ny": ny
}
response = requests.get(url, params=params)
data = response.json()
return self._parse_weather_data(data)
def _parse_weather_data(self, data):
"""날씨 데이터 파싱"""
try:
items = data['response']['body']['items']['item']
weather_info = {}
category_map = {
"T1H": "기온",
"RN1": "1시간 강수량",
"SKY": "하늘 상태",
"UUU": "동서바람성분",
"VVV": "남북바람성분",
"REH": "습도",
"PTY": "강수형태",
"LGT": "낙뢰",
"VEC": "풍향",
"WSD": "풍속"
}
for item in items:
category = item['category']
if category in category_map:
weather_info[category_map[category]] = item['fcstValue']
return weather_info
except KeyError:
return None
# 서울 강남구 좌표 (nx=61, ny=126)
# weather = WeatherAPI("your_service_key")
# forecast = weather.get_ultra_short_forecast(61, 126)
6. 네이버/카카오 API 활용
6.1 네이버 검색 API
import requests
class NaverSearchAPI:
"""네이버 검색 API 클라이언트"""
def __init__(self, client_id, client_secret):
self.client_id = client_id
self.client_secret = client_secret
self.base_url = "https://openapi.naver.com/v1/search"
self.headers = {
"X-Naver-Client-Id": client_id,
"X-Naver-Client-Secret": client_secret
}
def search_blog(self, query, display=10, start=1, sort="sim"):
"""블로그 검색"""
url = f"{self.base_url}/blog.json"
params = {
"query": query,
"display": display,
"start": start,
"sort": sort # sim(정확도순), date(날짜순)
}
response = requests.get(url, headers=self.headers, params=params)
return response.json()
def search_news(self, query, display=10, start=1, sort="date"):
"""뉴스 검색"""
url = f"{self.base_url}/news.json"
params = {
"query": query,
"display": display,
"start": start,
"sort": sort
}
response = requests.get(url, headers=self.headers, params=params)
return response.json()
def search_shop(self, query, display=10, start=1, sort="sim"):
"""쇼핑 검색"""
url = f"{self.base_url}/shop.json"
params = {
"query": query,
"display": display,
"start": start,
"sort": sort # sim, date, asc(가격낮은순), dsc(가격높은순)
}
response = requests.get(url, headers=self.headers, params=params)
return response.json()
# 사용 예
# naver = NaverSearchAPI("your_client_id", "your_client_secret")
# blog_results = naver.search_blog("Python 자동화", display=5)
# news_results = naver.search_news("인공지능", display=10)
6.2 카카오 API
import requests
class KakaoAPI:
"""카카오 API 클라이언트"""
def __init__(self, rest_api_key):
self.rest_api_key = rest_api_key
self.headers = {
"Authorization": f"KakaoAK {rest_api_key}"
}
def search_web(self, query, sort="accuracy", page=1, size=10):
"""웹 문서 검색"""
url = "https://dapi.kakao.com/v2/search/web"
params = {
"query": query,
"sort": sort, # accuracy(정확도순), recency(최신순)
"page": page,
"size": size
}
response = requests.get(url, headers=self.headers, params=params)
return response.json()
def search_image(self, query, sort="accuracy", page=1, size=10):
"""이미지 검색"""
url = "https://dapi.kakao.com/v2/search/image"
params = {
"query": query,
"sort": sort,
"page": page,
"size": size
}
response = requests.get(url, headers=self.headers, params=params)
return response.json()
def get_address(self, query):
"""주소 검색"""
url = "https://dapi.kakao.com/v2/local/search/address.json"
params = {"query": query}
response = requests.get(url, headers=self.headers, params=params)
return response.json()
def get_coord_to_address(self, x, y):
"""좌표를 주소로 변환"""
url = "https://dapi.kakao.com/v2/local/geo/coord2address.json"
params = {"x": x, "y": y}
response = requests.get(url, headers=self.headers, params=params)
return response.json()
# 사용 예
# kakao = KakaoAPI("your_rest_api_key")
# web_results = kakao.search_web("Python 프로그래밍")
# address_info = kakao.get_address("서울특별시 강남구 테헤란로 152")
7. OpenAI API 연동
7.1 ChatGPT API 기본 사용
import requests
class OpenAIClient:
"""OpenAI API 클라이언트"""
def __init__(self, api_key):
self.api_key = api_key
self.base_url = "https://api.openai.com/v1"
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def chat_completion(self, messages, model="gpt-4o", temperature=0.7, max_tokens=1000):
"""채팅 완성 API 호출"""
url = f"{self.base_url}/chat/completions"
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens
}
response = requests.post(url, headers=self.headers, json=payload)
if response.status_code == 200:
return response.json()['choices'][0]['message']['content']
else:
raise Exception(f"API 오류: {response.text}")
def simple_chat(self, user_message, system_prompt=None):
"""간단한 채팅"""
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": user_message})
return self.chat_completion(messages)
def generate_image(self, prompt, size="1024x1024", n=1):
"""DALL-E 이미지 생성"""
url = f"{self.base_url}/images/generations"
payload = {
"model": "dall-e-3",
"prompt": prompt,
"size": size,
"n": n
}
response = requests.post(url, headers=self.headers, json=payload)
if response.status_code == 200:
return response.json()['data'][0]['url']
else:
raise Exception(f"API 오류: {response.text}")
# 사용 예
# openai = OpenAIClient("your_api_key")
# response = openai.simple_chat(
# "Python으로 웹 스크래핑하는 방법을 알려주세요",
# system_prompt="당신은 Python 전문가입니다. 코드 예제와 함께 설명해주세요."
# )
# print(response)
7.2 스트리밍 응답 처리
import requests
import json
def stream_chat_completion(api_key, messages, model="gpt-4o"):
"""스트리밍 방식으로 응답 받기"""
url = "https://api.openai.com/v1/chat/completions"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {
"model": model,
"messages": messages,
"stream": True
}
response = requests.post(url, headers=headers, json=payload, stream=True)
full_response = ""
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data = line[6:] # 'data: ' 제거
if data == '[DONE]':
break
try:
chunk = json.loads(data)
content = chunk['choices'][0]['delta'].get('content', '')
if content:
print(content, end='', flush=True)
full_response += content
except json.JSONDecodeError:
continue
print() # 줄바꿈
return full_response
# 사용 예
# messages = [{"role": "user", "content": "Python의 장점을 5가지 알려주세요"}]
# response = stream_chat_completion("your_api_key", messages)
8. 페이지네이션과 Rate Limiting
8.1 페이지네이션 처리
import requests
import time
class PaginatedAPIClient:
"""페이지네이션이 있는 API 클라이언트"""
def __init__(self, base_url, headers=None):
self.base_url = base_url
self.headers = headers or {}
def get_all_pages_offset(self, endpoint, page_size=100, max_pages=None):
"""오프셋 기반 페이지네이션 (page, per_page 방식)"""
all_data = []
page = 1
while True:
params = {
"page": page,
"per_page": page_size
}
response = requests.get(
f"{self.base_url}{endpoint}",
headers=self.headers,
params=params
)
data = response.json()
if not data: # 빈 응답이면 종료
break
all_data.extend(data)
if len(data) < page_size: # 마지막 페이지
break
if max_pages and page >= max_pages:
break
page += 1
time.sleep(0.5) # Rate limiting 방지
return all_data
def get_all_pages_cursor(self, endpoint, cursor_field="cursor"):
"""커서 기반 페이지네이션"""
all_data = []
cursor = None
while True:
params = {}
if cursor:
params[cursor_field] = cursor
response = requests.get(
f"{self.base_url}{endpoint}",
headers=self.headers,
params=params
)
result = response.json()
data = result.get("data", [])
all_data.extend(data)
# 다음 커서가 없으면 종료
cursor = result.get("next_cursor")
if not cursor:
break
time.sleep(0.5)
return all_data
def get_all_pages_link_header(self, endpoint):
"""Link 헤더 기반 페이지네이션 (GitHub 스타일)"""
all_data = []
url = f"{self.base_url}{endpoint}"
while url:
response = requests.get(url, headers=self.headers)
all_data.extend(response.json())
# Link 헤더에서 다음 페이지 URL 추출
link_header = response.headers.get("Link", "")
url = self._parse_link_header(link_header, "next")
time.sleep(0.5)
return all_data
def _parse_link_header(self, link_header, rel):
"""Link 헤더 파싱"""
if not link_header:
return None
links = link_header.split(", ")
for link in links:
parts = link.split("; ")
if len(parts) == 2 and f'rel="{rel}"' in parts[1]:
return parts[0].strip("<>")
return None
8.2 Rate Limiting 처리
import requests
import time
from functools import wraps
class RateLimiter:
"""Rate Limiting 처리 클래스"""
def __init__(self, calls_per_minute=60):
self.calls_per_minute = calls_per_minute
self.min_interval = 60.0 / calls_per_minute
self.last_call_time = 0
def wait(self):
"""필요한 경우 대기"""
elapsed = time.time() - self.last_call_time
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed)
self.last_call_time = time.time()
def rate_limited(calls_per_minute=60):
"""Rate limiting 데코레이터"""
min_interval = 60.0 / calls_per_minute
last_call = [0] # mutable 객체로 감싸기
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
elapsed = time.time() - last_call[0]
if elapsed < min_interval:
time.sleep(min_interval - elapsed)
last_call[0] = time.time()
return func(*args, **kwargs)
return wrapper
return decorator
def request_with_retry(url, max_retries=3, backoff_factor=2, **kwargs):
"""재시도 로직이 포함된 요청"""
for attempt in range(max_retries):
try:
response = requests.get(url, **kwargs)
# Rate limit 초과 (429) 처리
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
print(f"Rate limit 초과. {retry_after}초 후 재시도...")
time.sleep(retry_after)
continue
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
if attempt == max_retries - 1:
raise
wait_time = backoff_factor ** attempt
print(f"요청 실패. {wait_time}초 후 재시도... ({attempt + 1}/{max_retries})")
time.sleep(wait_time)
return None
# 사용 예
@rate_limited(calls_per_minute=30)
def fetch_data(url):
return requests.get(url)
9. API 응답 캐싱
import requests
import hashlib
import json
import time
from pathlib import Path
class CachedAPIClient:
"""캐싱 기능이 있는 API 클라이언트"""
def __init__(self, cache_dir="api_cache", cache_ttl=3600):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.cache_ttl = cache_ttl # 캐시 유효 시간 (초)
def _get_cache_key(self, url, params=None):
"""캐시 키 생성"""
key_data = url + json.dumps(params or {}, sort_keys=True)
return hashlib.md5(key_data.encode()).hexdigest()
def _get_cache_path(self, cache_key):
"""캐시 파일 경로"""
return self.cache_dir / f"{cache_key}.json"
def _is_cache_valid(self, cache_path):
"""캐시 유효성 검사"""
if not cache_path.exists():
return False
cache_age = time.time() - cache_path.stat().st_mtime
return cache_age < self.cache_ttl
def get(self, url, params=None, force_refresh=False, **kwargs):
"""캐시를 활용한 GET 요청"""
cache_key = self._get_cache_key(url, params)
cache_path = self._get_cache_path(cache_key)
# 캐시 확인
if not force_refresh and self._is_cache_valid(cache_path):
print(f"캐시 사용: {cache_key[:8]}...")
with open(cache_path, 'r', encoding='utf-8') as f:
return json.load(f)
# API 호출
print(f"API 호출: {url}")
response = requests.get(url, params=params, **kwargs)
data = response.json()
# 캐시 저장
with open(cache_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return data
def clear_cache(self, older_than=None):
"""캐시 삭제"""
count = 0
for cache_file in self.cache_dir.glob("*.json"):
if older_than:
age = time.time() - cache_file.stat().st_mtime
if age < older_than:
continue
cache_file.unlink()
count += 1
print(f"{count}개의 캐시 파일 삭제됨")
# 메모리 기반 캐시 (간단한 버전)
from functools import lru_cache
@lru_cache(maxsize=100)
def cached_api_call(url):
"""LRU 캐시를 사용한 API 호출"""
response = requests.get(url)
return response.json()
10. 실전 프로젝트: 날씨/주식 데이터 수집기
import requests
import json
from datetime import datetime, timedelta
from pathlib import Path
import time
class DataCollector:
"""날씨 및 주식 데이터 수집기"""
def __init__(self, config_path="config.json"):
self.config = self._load_config(config_path)
self.data_dir = Path("collected_data")
self.data_dir.mkdir(parents=True, exist_ok=True)
def _load_config(self, config_path):
"""설정 파일 로드"""
if Path(config_path).exists():
with open(config_path, 'r') as f:
return json.load(f)
return {}
def collect_weather(self, cities):
"""여러 도시의 날씨 데이터 수집"""
weather_data = []
api_key = self.config.get("openweathermap_api_key")
if not api_key:
print("OpenWeatherMap API 키가 설정되지 않았습니다.")
return []
for city in cities:
try:
url = "https://api.openweathermap.org/data/2.5/weather"
params = {
"q": city,
"appid": api_key,
"units": "metric",
"lang": "kr"
}
response = requests.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
weather_info = {
"city": city,
"temperature": data["main"]["temp"],
"humidity": data["main"]["humidity"],
"description": data["weather"][0]["description"],
"wind_speed": data["wind"]["speed"],
"collected_at": datetime.now().isoformat()
}
weather_data.append(weather_info)
print(f"{city}: {weather_info['temperature']}°C, {weather_info['description']}")
else:
print(f"{city}: 데이터 수집 실패 ({response.status_code})")
time.sleep(1) # Rate limiting
except Exception as e:
print(f"{city}: 오류 발생 - {e}")
# 데이터 저장
self._save_data(weather_data, "weather")
return weather_data
def collect_stock(self, symbols):
"""주식 데이터 수집 (Alpha Vantage API 사용 예시)"""
stock_data = []
api_key = self.config.get("alphavantage_api_key")
if not api_key:
print("Alpha Vantage API 키가 설정되지 않았습니다.")
return []
for symbol in symbols:
try:
url = "https://www.alphavantage.co/query"
params = {
"function": "GLOBAL_QUOTE",
"symbol": symbol,
"apikey": api_key
}
response = requests.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
if "Global Quote" in data:
quote = data["Global Quote"]
stock_info = {
"symbol": symbol,
"price": float(quote.get("05. price", 0)),
"change": quote.get("09. change", "0"),
"change_percent": quote.get("10. change percent", "0%"),
"volume": int(quote.get("06. volume", 0)),
"collected_at": datetime.now().isoformat()
}
stock_data.append(stock_info)
print(f"{symbol}: ${stock_info['price']} ({stock_info['change_percent']})")
else:
print(f"{symbol}: 데이터 없음")
time.sleep(12) # Alpha Vantage 무료 플랜: 분당 5회 제한
except Exception as e:
print(f"{symbol}: 오류 발생 - {e}")
self._save_data(stock_data, "stock")
return stock_data
def _save_data(self, data, data_type):
"""데이터 저장"""
if not data:
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{data_type}_{timestamp}.json"
filepath = self.data_dir / filename
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"데이터 저장: {filepath}")
def get_historical_data(self, data_type, days=7):
"""과거 데이터 조회"""
cutoff_date = datetime.now() - timedelta(days=days)
historical_data = []
for filepath in self.data_dir.glob(f"{data_type}_*.json"):
# 파일명에서 날짜 추출
filename = filepath.stem
date_str = filename.replace(f"{data_type}_", "")[:8]
try:
file_date = datetime.strptime(date_str, "%Y%m%d")
if file_date >= cutoff_date:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
historical_data.extend(data)
except ValueError:
continue
return historical_data
def generate_report(self):
"""수집된 데이터 리포트 생성"""
weather_data = self.get_historical_data("weather", days=1)
stock_data = self.get_historical_data("stock", days=1)
report = []
report.append("=" * 50)
report.append(f"데이터 수집 리포트 - {datetime.now().strftime('%Y-%m-%d %H:%M')}")
report.append("=" * 50)
if weather_data:
report.append("\n[날씨 데이터]")
for item in weather_data:
report.append(f" - {item['city']}: {item['temperature']}°C, {item['description']}")
if stock_data:
report.append("\n[주식 데이터]")
for item in stock_data:
report.append(f" - {item['symbol']}: ${item['price']} ({item['change_percent']})")
report.append("\n" + "=" * 50)
return "\n".join(report)
# 사용 예
if __name__ == "__main__":
# config.json 파일 예시:
# {
# "openweathermap_api_key": "your_key_here",
# "alphavantage_api_key": "your_key_here"
# }
collector = DataCollector()
# 날씨 데이터 수집
cities = ["Seoul", "Tokyo", "New York", "London", "Paris"]
# collector.collect_weather(cities)
# 주식 데이터 수집
symbols = ["AAPL", "GOOGL", "MSFT", "AMZN"]
# collector.collect_stock(symbols)
# 리포트 생성
# print(collector.generate_report())
print("데이터 수집기가 준비되었습니다.")
print("config.json 파일에 API 키를 설정한 후 사용하세요.")
마무리
이번 편에서는 Python을 활용한 API 연동과 데이터 수집의 핵심 개념들을 학습했습니다. REST API의 기본 원리부터 다양한 인증 방식, 실제 서비스 API 활용법, 그리고 효율적인 데이터 수집 전략까지 다루었습니다.
API를 활용하면 웹 스크래핑보다 안정적이고 구조화된 데이터를 얻을 수 있습니다. 또한 대부분의 공식 API는 명확한 문서와 일관된 응답 형식을 제공하므로 유지보수도 수월합니다.
다음 편에서는 지금까지 배운 내용을 종합하여 스케줄링과 실전 자동화 프로젝트를 구축해보겠습니다. 정해진 시간에 자동으로 데이터를 수집하고, 리포트를 생성하며, 알림을 보내는 완전한 자동화 시스템을 만들어봅시다.
핵심 정리
1. REST API는 HTTP 메서드(GET, POST, PUT, DELETE)를 사용하여 리소스를 조작합니다.
2. requests 라이브러리로 간편하게 API 호출이 가능합니다.
3. API Key, Bearer Token, OAuth 등 다양한 인증 방식을 이해하고 활용하세요.
4. 페이지네이션과 Rate Limiting은 대량 데이터 수집 시 필수 고려 사항입니다.
5. 캐싱을 활용하면 API 호출 횟수를 줄이고 응답 속도를 높일 수 있습니다.