Python 자동화 마스터 8편: 스케줄링과 실전 프로젝트
Python Automation Master Part 8: Scheduling and Real-World Projects
서론: 자동화의 완성, 스케줄링
지금까지 배운 파일 처리, 웹 스크래핑, API 연동 등의 기술을 실제로 유용하게 만드는 핵심은 바로 '스케줄링'입니다. 정해진 시간에 자동으로 실행되는 스크립트는 수작업을 완전히 대체하고, 24시간 쉬지 않고 작업을 수행합니다.
이번 마지막 편에서는 Python에서 작업을 예약하고 실행하는 다양한 방법과 함께, 안정적인 자동화 시스템 구축을 위한 로깅, 에러 처리, 설정 관리 기법을 학습합니다. 마지막으로 이 시리즈에서 배운 모든 내용을 종합한 실전 프로젝트를 구축해보겠습니다.
1. 시간 처리: time과 datetime 모듈
1.1 time 모듈 기초
import time
# 현재 시간 (Unix timestamp)
timestamp = time.time()
print(f"현재 타임스탬프: {timestamp}") # 예: 1737529200.123456
# 프로그램 일시 정지
print("3초 대기 중...")
time.sleep(3)
print("대기 완료!")
# 실행 시간 측정
start_time = time.time()
# 작업 수행
for i in range(1000000):
pass
end_time = time.time()
print(f"실행 시간: {end_time - start_time:.4f}초")
# 더 정밀한 시간 측정
start = time.perf_counter()
# 작업 수행
time.sleep(0.1)
end = time.perf_counter()
print(f"정밀 측정: {end - start:.6f}초")
# 구조화된 시간
local_time = time.localtime()
print(f"현재 시간: {local_time.tm_year}년 {local_time.tm_mon}월 {local_time.tm_mday}일")
print(f"시간: {local_time.tm_hour}:{local_time.tm_min}:{local_time.tm_sec}")
# 문자열로 변환
formatted = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
print(f"포맷된 시간: {formatted}")
1.2 datetime 모듈 활용
from datetime import datetime, date, time, timedelta
import pytz # pip install pytz
# 현재 날짜와 시간
now = datetime.now()
print(f"현재: {now}")
print(f"날짜: {now.date()}")
print(f"시간: {now.time()}")
# 특정 날짜/시간 생성
specific_date = datetime(2026, 1, 22, 9, 30, 0)
print(f"특정 시간: {specific_date}")
# 날짜 포맷팅
formatted = now.strftime("%Y년 %m월 %d일 %H시 %M분 %S초")
print(f"한국어 포맷: {formatted}")
# 문자열 파싱
date_str = "2026-01-22 14:30:00"
parsed = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
print(f"파싱된 날짜: {parsed}")
# 날짜 연산
tomorrow = now + timedelta(days=1)
next_week = now + timedelta(weeks=1)
two_hours_later = now + timedelta(hours=2)
print(f"내일: {tomorrow.date()}")
print(f"다음 주: {next_week.date()}")
print(f"2시간 후: {two_hours_later.time()}")
# 두 날짜 간 차이
date1 = datetime(2026, 12, 31)
date2 = datetime.now()
diff = date1 - date2
print(f"2026년 말까지 {diff.days}일 남음")
# 타임존 처리
kst = pytz.timezone('Asia/Seoul')
utc = pytz.UTC
now_kst = datetime.now(kst)
now_utc = datetime.now(utc)
print(f"한국 시간: {now_kst}")
print(f"UTC 시간: {now_utc}")
# UTC를 한국 시간으로 변환
utc_time = datetime(2026, 1, 22, 0, 0, 0, tzinfo=utc)
korea_time = utc_time.astimezone(kst)
print(f"UTC 00:00 = 한국 {korea_time.strftime('%H:%M')}")
1.3 유용한 날짜 유틸리티 함수
from datetime import datetime, timedelta
import calendar
def get_weekday_name(date_obj, lang='ko'):
"""요일 이름 반환"""
weekdays_ko = ['월요일', '화요일', '수요일', '목요일', '금요일', '토요일', '일요일']
weekdays_en = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
weekdays = weekdays_ko if lang == 'ko' else weekdays_en
return weekdays[date_obj.weekday()]
def get_month_range(year, month):
"""해당 월의 시작일과 마지막일 반환"""
first_day = datetime(year, month, 1)
last_day_num = calendar.monthrange(year, month)[1]
last_day = datetime(year, month, last_day_num)
return first_day, last_day
def get_business_days(start_date, end_date):
"""두 날짜 사이의 영업일 계산"""
business_days = 0
current = start_date
while current <= end_date:
if current.weekday() < 5: # 월-금
business_days += 1
current += timedelta(days=1)
return business_days
def is_business_hour(check_time=None, start_hour=9, end_hour=18):
"""업무 시간인지 확인"""
if check_time is None:
check_time = datetime.now()
# 주말 체크
if check_time.weekday() >= 5:
return False
# 시간 체크
hour = check_time.hour
return start_hour <= hour < end_hour
# 사용 예
today = datetime.now()
print(f"오늘은 {get_weekday_name(today)}입니다.")
first, last = get_month_range(2026, 1)
print(f"2026년 1월: {first.date()} ~ {last.date()}")
days = get_business_days(datetime(2026, 1, 1), datetime(2026, 1, 31))
print(f"2026년 1월 영업일: {days}일")
print(f"지금은 업무 시간: {is_business_hour()}")
2. schedule 라이브러리
2.1 기본 사용법
import schedule
import time
# pip install schedule
def job():
print(f"[{time.strftime('%H:%M:%S')}] 작업 실행!")
def morning_report():
print("아침 보고서를 생성합니다.")
def backup_data():
print("데이터를 백업합니다.")
# 다양한 스케줄링 패턴
schedule.every(10).seconds.do(job) # 10초마다
schedule.every(5).minutes.do(job) # 5분마다
schedule.every().hour.do(job) # 매 시간
schedule.every().day.at("09:00").do(morning_report) # 매일 오전 9시
schedule.every().monday.do(job) # 매주 월요일
schedule.every().wednesday.at("13:15").do(job) # 매주 수요일 13:15
# 특정 시간대에만 실행
schedule.every().day.at("00:00").do(backup_data) # 자정에 백업
# 실행 횟수 제한
schedule.every().second.do(job).tag('limited')
# 메인 루프
print("스케줄러 시작...")
while True:
schedule.run_pending()
time.sleep(1)
2.2 고급 schedule 패턴
import schedule
import time
from functools import partial
# 인자가 있는 작업
def greet(name):
print(f"안녕하세요, {name}님!")
# partial을 사용한 인자 전달
schedule.every().day.at("08:00").do(partial(greet, "홍길동"))
# 람다를 사용한 인자 전달
schedule.every().day.at("08:30").do(lambda: greet("김철수"))
# 태그를 사용한 작업 관리
def data_sync():
print("데이터 동기화 중...")
def send_notification():
print("알림 발송 중...")
schedule.every(30).minutes.do(data_sync).tag('sync', 'database')
schedule.every().hour.do(send_notification).tag('notification')
# 특정 태그의 작업만 취소
# schedule.clear('sync')
# 특정 태그의 작업 조회
sync_jobs = schedule.get_jobs('sync')
print(f"동기화 작업 수: {len(sync_jobs)}")
# 작업 취소 (한 번만 실행)
def run_once():
print("한 번만 실행되는 작업")
return schedule.CancelJob
schedule.every().second.do(run_once)
# 다음 실행 시간 확인
next_run = schedule.next_run()
print(f"다음 작업 실행 시간: {next_run}")
# 대기 시간 확인
idle_seconds = schedule.idle_seconds()
print(f"다음 작업까지 {idle_seconds}초 남음")
# 스케줄러 실행
while True:
schedule.run_pending()
time.sleep(1)
2.3 예외 처리가 포함된 스케줄러
import schedule
import time
import traceback
from functools import wraps
def catch_exceptions(job_func):
"""예외를 잡아서 로깅하는 데코레이터"""
@wraps(job_func)
def wrapper(*args, **kwargs):
try:
return job_func(*args, **kwargs)
except Exception as e:
print(f"작업 실행 중 오류 발생: {e}")
traceback.print_exc()
# 오류가 발생해도 스케줄러는 계속 실행
return None
return wrapper
@catch_exceptions
def risky_job():
"""오류가 발생할 수 있는 작업"""
import random
if random.random() < 0.3:
raise Exception("랜덤 오류 발생!")
print("작업 성공!")
schedule.every(5).seconds.do(risky_job)
# 안전한 스케줄러 루프
def run_scheduler():
while True:
try:
schedule.run_pending()
except Exception as e:
print(f"스케줄러 오류: {e}")
time.sleep(1)
run_scheduler()
3. APScheduler 라이브러리
3.1 APScheduler 소개 및 설치
# pip install apscheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
from datetime import datetime, timedelta
def my_job(message="기본 메시지"):
print(f"[{datetime.now().strftime('%H:%M:%S')}] {message}")
# BlockingScheduler - 메인 스레드에서 실행 (블로킹)
scheduler = BlockingScheduler()
# BackgroundScheduler - 백그라운드에서 실행 (논블로킹)
# scheduler = BackgroundScheduler()
# Interval 트리거 - 일정 간격으로 실행
scheduler.add_job(
my_job,
IntervalTrigger(seconds=10),
args=["10초마다 실행"],
id='interval_job'
)
# Cron 트리거 - cron 표현식 사용
scheduler.add_job(
my_job,
CronTrigger(hour=9, minute=0),
args=["매일 오전 9시"],
id='daily_job'
)
# Date 트리거 - 특정 날짜/시간에 한 번 실행
scheduler.add_job(
my_job,
DateTrigger(run_date=datetime.now() + timedelta(seconds=5)),
args=["5초 후 한 번만 실행"],
id='one_time_job'
)
# 스케줄러 시작
try:
print("APScheduler 시작...")
scheduler.start()
except KeyboardInterrupt:
print("스케줄러 종료")
scheduler.shutdown()
3.2 Cron 표현식 활용
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
scheduler = BlockingScheduler()
def job(name):
print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {name} 실행")
# Cron 표현식 패턴
# ┌───────────── 분 (0 - 59)
# │ ┌───────────── 시 (0 - 23)
# │ │ ┌───────────── 일 (1 - 31)
# │ │ │ ┌───────────── 월 (1 - 12)
# │ │ │ │ ┌───────────── 요일 (0 - 6, 0=월요일)
# │ │ │ │ │
# * * * * *
# 매분 실행
scheduler.add_job(job, CronTrigger(minute='*'), args=["매분"])
# 매 시간 30분에 실행
scheduler.add_job(job, CronTrigger(minute=30), args=["매 시간 30분"])
# 매일 오전 9시, 오후 6시 실행
scheduler.add_job(job, CronTrigger(hour='9,18', minute=0), args=["오전 9시, 오후 6시"])
# 평일 오전 9시 실행 (월-금)
scheduler.add_job(job, CronTrigger(day_of_week='mon-fri', hour=9, minute=0), args=["평일 9시"])
# 매월 1일 자정 실행
scheduler.add_job(job, CronTrigger(day=1, hour=0, minute=0), args=["매월 1일"])
# 매주 월요일 오전 8시 30분
scheduler.add_job(job, CronTrigger(day_of_week='mon', hour=8, minute=30), args=["월요일 8:30"])
# 15분마다 실행
scheduler.add_job(job, CronTrigger(minute='*/15'), args=["15분마다"])
# 업무 시간(9-18시) 동안 30분마다
scheduler.add_job(
job,
CronTrigger(hour='9-17', minute='0,30'),
args=["업무 시간 30분마다"]
)
# 문자열로 cron 표현식 전달
scheduler.add_job(
job,
CronTrigger.from_crontab('0 9 * * 1-5'), # 평일 9시
args=["crontab 형식"]
)
scheduler.start()
3.3 작업 저장소와 이벤트 리스너
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from datetime import datetime
import time
# 작업 저장소 설정 (SQLite 사용)
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
# 실행기 설정
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
# 기본 설정
job_defaults = {
'coalesce': False, # 놓친 작업 병합 여부
'max_instances': 3, # 동시 실행 최대 인스턴스 수
'misfire_grace_time': 60 # 놓친 작업 허용 시간 (초)
}
scheduler = BackgroundScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone='Asia/Seoul'
)
# 이벤트 리스너
def job_executed_listener(event):
"""작업 완료 시 호출"""
print(f"작업 완료: {event.job_id}")
print(f"실행 시간: {event.scheduled_run_time}")
print(f"반환값: {event.retval}")
def job_error_listener(event):
"""작업 오류 시 호출"""
print(f"작업 오류: {event.job_id}")
print(f"예외: {event.exception}")
print(f"트레이스백: {event.traceback}")
scheduler.add_listener(job_executed_listener, EVENT_JOB_EXECUTED)
scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR)
# 작업 추가
def sample_job():
print(f"작업 실행: {datetime.now()}")
return "완료"
scheduler.add_job(
sample_job,
'interval',
seconds=10,
id='sample_job',
replace_existing=True
)
# 스케줄러 시작
scheduler.start()
# 작업 관리
print("등록된 작업들:")
for job in scheduler.get_jobs():
print(f" - {job.id}: 다음 실행 {job.next_run_time}")
# 작업 일시 중지
# scheduler.pause_job('sample_job')
# 작업 재개
# scheduler.resume_job('sample_job')
# 작업 수정
# scheduler.reschedule_job('sample_job', trigger='interval', seconds=30)
# 작업 제거
# scheduler.remove_job('sample_job')
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
scheduler.shutdown()
4. Windows 작업 스케줄러 연동
4.1 배치 파일로 Python 스크립트 실행
@echo off
REM run_python_script.bat
REM Python 가상환경 활성화 (선택사항)
call C:\path\to\venv\Scripts\activate.bat
REM Python 스크립트 실행
python C:\path\to\your_script.py
REM 로그 기록
echo %date% %time% - Script executed >> C:\logs\execution.log
REM 가상환경 비활성화
deactivate
4.2 Python으로 Windows 작업 스케줄러 제어
import subprocess
import os
def create_scheduled_task(
task_name,
python_script,
schedule_type="DAILY",
start_time="09:00",
python_path=None
):
"""Windows 작업 스케줄러에 작업 등록"""
if python_path is None:
python_path = os.sys.executable
# 배치 파일 생성
batch_content = f'''@echo off
"{python_path}" "{python_script}"
'''
batch_path = python_script.replace('.py', '.bat')
with open(batch_path, 'w') as f:
f.write(batch_content)
# schtasks 명령어로 작업 등록
cmd = [
'schtasks', '/create',
'/tn', task_name, # 작업 이름
'/tr', batch_path, # 실행할 프로그램
'/sc', schedule_type, # 스케줄 유형 (DAILY, WEEKLY, MONTHLY 등)
'/st', start_time, # 시작 시간
'/f' # 기존 작업 덮어쓰기
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
if result.returncode == 0:
print(f"작업 '{task_name}'이 성공적으로 등록되었습니다.")
else:
print(f"오류: {result.stderr}")
except Exception as e:
print(f"작업 등록 실패: {e}")
def delete_scheduled_task(task_name):
"""등록된 작업 삭제"""
cmd = ['schtasks', '/delete', '/tn', task_name, '/f']
try:
result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
if result.returncode == 0:
print(f"작업 '{task_name}'이 삭제되었습니다.")
else:
print(f"오류: {result.stderr}")
except Exception as e:
print(f"작업 삭제 실패: {e}")
def list_scheduled_tasks(filter_name=None):
"""등록된 작업 목록 조회"""
cmd = ['schtasks', '/query', '/fo', 'LIST']
if filter_name:
cmd.extend(['/tn', filter_name])
try:
result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
print(result.stdout)
except Exception as e:
print(f"목록 조회 실패: {e}")
# 사용 예
# create_scheduled_task(
# "MyPythonTask",
# r"C:\scripts\daily_report.py",
# schedule_type="DAILY",
# start_time="09:00"
# )
5. Linux cron 연동
5.1 crontab 기본 사용법
# crontab 편집
crontab -e
# crontab 목록 확인
crontab -l
# crontab 삭제
crontab -r
# cron 표현식
# ┌───────────── 분 (0 - 59)
# │ ┌───────────── 시 (0 - 23)
# │ │ ┌───────────── 일 (1 - 31)
# │ │ │ ┌───────────── 월 (1 - 12)
# │ │ │ │ ┌───────────── 요일 (0 - 7, 0과 7 모두 일요일)
# │ │ │ │ │
# * * * * * 명령어
# 예제들:
# 매분 실행
* * * * * /usr/bin/python3 /home/user/script.py
# 매일 오전 9시 실행
0 9 * * * /usr/bin/python3 /home/user/daily_report.py
# 매주 월요일 오전 8시 30분
30 8 * * 1 /usr/bin/python3 /home/user/weekly_report.py
# 5분마다 실행
*/5 * * * * /usr/bin/python3 /home/user/check_status.py
# 평일 업무 시간(9-18시) 매 시간 실행
0 9-18 * * 1-5 /usr/bin/python3 /home/user/hourly_check.py
# 로그 출력 리다이렉션
0 9 * * * /usr/bin/python3 /home/user/script.py >> /home/user/logs/cron.log 2>&1
5.2 Python으로 crontab 관리
# pip install python-crontab
from crontab import CronTab
import os
class CronManager:
"""crontab 관리 클래스"""
def __init__(self, user=None):
self.cron = CronTab(user=user or os.getlogin())
def add_job(self, command, schedule, comment=None):
"""새 작업 추가"""
job = self.cron.new(command=command, comment=comment)
job.setall(schedule)
self.cron.write()
print(f"작업 추가됨: {comment or command}")
def remove_job(self, comment):
"""주석으로 작업 찾아 삭제"""
removed = self.cron.remove_all(comment=comment)
self.cron.write()
print(f"{removed}개 작업 삭제됨")
def list_jobs(self):
"""모든 작업 목록"""
for job in self.cron:
print(f"{job.slices} - {job.command} ({job.comment})")
def enable_job(self, comment):
"""작업 활성화"""
for job in self.cron.find_comment(comment):
job.enable()
self.cron.write()
def disable_job(self, comment):
"""작업 비활성화"""
for job in self.cron.find_comment(comment):
job.enable(False)
self.cron.write()
# 사용 예 (Linux에서만 동작)
# manager = CronManager()
#
# # 매일 오전 9시 Python 스크립트 실행
# manager.add_job(
# '/usr/bin/python3 /home/user/daily_report.py',
# '0 9 * * *',
# comment='daily_report'
# )
#
# # 작업 목록 확인
# manager.list_jobs()
6. 로깅 (logging 모듈)
6.1 기본 로깅 설정
import logging
# 기본 설정
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 로거 생성
logger = logging.getLogger(__name__)
# 로그 레벨별 메시지
logger.debug("디버그 메시지 - 상세한 진단 정보")
logger.info("정보 메시지 - 정상 동작 확인")
logger.warning("경고 메시지 - 주의가 필요한 상황")
logger.error("오류 메시지 - 기능 실패")
logger.critical("치명적 오류 - 프로그램 실행 불가")
# 예외 로깅
try:
result = 1 / 0
except Exception as e:
logger.exception("예외 발생!") # 트레이스백 포함
6.2 파일 로깅과 로테이션
import logging
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
from pathlib import Path
def setup_logger(name, log_dir="logs", level=logging.DEBUG):
"""로거 설정 함수"""
log_path = Path(log_dir)
log_path.mkdir(parents=True, exist_ok=True)
logger = logging.getLogger(name)
logger.setLevel(level)
# 포맷터
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 콘솔 핸들러
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
# 파일 핸들러 (크기 기반 로테이션)
file_handler = RotatingFileHandler(
log_path / f"{name}.log",
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
# 에러 전용 파일 핸들러
error_handler = RotatingFileHandler(
log_path / f"{name}_error.log",
maxBytes=10*1024*1024,
backupCount=5,
encoding='utf-8'
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(formatter)
# 시간 기반 로테이션 (매일 자정)
daily_handler = TimedRotatingFileHandler(
log_path / f"{name}_daily.log",
when='midnight',
interval=1,
backupCount=30,
encoding='utf-8'
)
daily_handler.setLevel(logging.INFO)
daily_handler.setFormatter(formatter)
# 핸들러 추가
logger.addHandler(console_handler)
logger.addHandler(file_handler)
logger.addHandler(error_handler)
logger.addHandler(daily_handler)
return logger
# 사용 예
logger = setup_logger("automation")
logger.debug("디버그 메시지")
logger.info("정보 메시지")
logger.warning("경고 메시지")
logger.error("에러 메시지")
6.3 구조화된 로깅 (JSON 로그)
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
"""JSON 형식 로그 포맷터"""
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno
}
# 예외 정보 추가
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
# 추가 필드
if hasattr(record, 'extra_data'):
log_data["extra"] = record.extra_data
return json.dumps(log_data, ensure_ascii=False)
class ContextLogger:
"""컨텍스트 정보를 포함한 로거"""
def __init__(self, logger):
self.logger = logger
self.context = {}
def set_context(self, **kwargs):
"""컨텍스트 설정"""
self.context.update(kwargs)
def clear_context(self):
"""컨텍스트 초기화"""
self.context = {}
def _log(self, level, message, **kwargs):
extra_data = {**self.context, **kwargs}
record = self.logger.makeRecord(
self.logger.name, level, "", 0, message, (), None
)
record.extra_data = extra_data
self.logger.handle(record)
def info(self, message, **kwargs):
self._log(logging.INFO, message, **kwargs)
def error(self, message, **kwargs):
self._log(logging.ERROR, message, **kwargs)
# 설정
logger = logging.getLogger("json_logger")
handler = logging.FileHandler("app.json.log")
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
# 사용
ctx_logger = ContextLogger(logger)
ctx_logger.set_context(user_id="user123", session_id="sess456")
ctx_logger.info("사용자 로그인", ip="192.168.1.1")
ctx_logger.info("데이터 조회", query="SELECT * FROM users")
7. 에러 처리와 재시도 로직
7.1 재시도 데코레이터
import time
import logging
from functools import wraps
import random
logger = logging.getLogger(__name__)
def retry(max_attempts=3, delay=1, backoff=2, exceptions=(Exception,)):
"""
재시도 데코레이터
Args:
max_attempts: 최대 시도 횟수
delay: 초기 대기 시간 (초)
backoff: 대기 시간 증가 배수
exceptions: 재시도할 예외 튜플
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
current_delay = delay
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
if attempt == max_attempts:
logger.error(f"{func.__name__} 최종 실패: {e}")
raise
logger.warning(
f"{func.__name__} 실패 (시도 {attempt}/{max_attempts}): {e}"
)
logger.info(f"{current_delay}초 후 재시도...")
time.sleep(current_delay)
current_delay *= backoff
return None
return wrapper
return decorator
# 사용 예
@retry(max_attempts=3, delay=1, backoff=2, exceptions=(ConnectionError, TimeoutError))
def fetch_data(url):
"""네트워크 요청 (실패할 수 있음)"""
if random.random() < 0.7: # 70% 확률로 실패
raise ConnectionError("연결 실패")
return "데이터"
# 테스트
try:
result = fetch_data("https://example.com")
print(f"결과: {result}")
except ConnectionError:
print("모든 재시도 실패")
7.2 tenacity 라이브러리 활용
# pip install tenacity
from tenacity import (
retry,
stop_after_attempt,
stop_after_delay,
wait_fixed,
wait_exponential,
wait_random,
retry_if_exception_type,
before_sleep_log,
after_log
)
import logging
logger = logging.getLogger(__name__)
# 3번 시도, 2초 고정 대기
@retry(
stop=stop_after_attempt(3),
wait=wait_fixed(2)
)
def simple_retry():
print("시도 중...")
raise Exception("실패!")
# 지수 백오프 (1, 2, 4, 8... 초)
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=1, max=60)
)
def exponential_retry():
print("시도 중...")
raise Exception("실패!")
# 특정 예외만 재시도
@retry(
retry=retry_if_exception_type((ConnectionError, TimeoutError)),
stop=stop_after_attempt(3)
)
def selective_retry():
raise ValueError("이 예외는 재시도하지 않음")
# 로깅과 함께
@retry(
stop=stop_after_attempt(3),
wait=wait_fixed(1),
before_sleep=before_sleep_log(logger, logging.WARNING),
after=after_log(logger, logging.INFO)
)
def logged_retry():
raise Exception("실패!")
# 30초 타임아웃
@retry(
stop=stop_after_delay(30),
wait=wait_fixed(2)
)
def timeout_retry():
raise Exception("실패!")
# 복합 조건
@retry(
stop=(stop_after_attempt(5) | stop_after_delay(30)),
wait=wait_exponential(multiplier=1, max=10) + wait_random(0, 2)
)
def complex_retry():
raise Exception("실패!")
7.3 Circuit Breaker 패턴
import time
from enum import Enum
from threading import Lock
class CircuitState(Enum):
CLOSED = "closed" # 정상 상태
OPEN = "open" # 차단 상태
HALF_OPEN = "half_open" # 테스트 상태
class CircuitBreaker:
"""
Circuit Breaker 패턴 구현
연속 실패 시 일정 시간 동안 요청을 차단하여
시스템 과부하를 방지합니다.
"""
def __init__(
self,
failure_threshold=5,
recovery_timeout=30,
expected_exception=Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
self.lock = Lock()
def __call__(self, func):
def wrapper(*args, **kwargs):
return self.call(func, *args, **kwargs)
return wrapper
def call(self, func, *args, **kwargs):
with self.lock:
if self.state == CircuitState.OPEN:
if self._should_try_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpen(
f"회로 차단 중. {self._time_until_reset()}초 후 재시도 가능"
)
try:
result = func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise
def _on_success(self):
with self.lock:
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
with self.lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
def _should_try_reset(self):
return time.time() - self.last_failure_time >= self.recovery_timeout
def _time_until_reset(self):
elapsed = time.time() - self.last_failure_time
return max(0, int(self.recovery_timeout - elapsed))
class CircuitBreakerOpen(Exception):
pass
# 사용 예
circuit_breaker = CircuitBreaker(
failure_threshold=3,
recovery_timeout=10
)
@circuit_breaker
def unreliable_api():
import random
if random.random() < 0.8:
raise ConnectionError("API 연결 실패")
return "성공"
# 테스트
for i in range(10):
try:
result = unreliable_api()
print(f"시도 {i+1}: {result}")
except CircuitBreakerOpen as e:
print(f"시도 {i+1}: 회로 차단 - {e}")
except ConnectionError as e:
print(f"시도 {i+1}: 연결 오류 - {e}")
time.sleep(1)
8. 설정 파일 관리
8.1 configparser 사용
import configparser
from pathlib import Path
# config.ini 파일 예시:
"""
[DEFAULT]
debug = false
log_level = INFO
[database]
host = localhost
port = 5432
name = mydb
user = admin
password = secret123
[api]
base_url = https://api.example.com
timeout = 30
max_retries = 3
[email]
smtp_server = smtp.gmail.com
smtp_port = 587
sender = noreply@example.com
"""
class Config:
"""설정 파일 관리 클래스"""
def __init__(self, config_path="config.ini"):
self.config = configparser.ConfigParser()
self.config_path = Path(config_path)
if self.config_path.exists():
self.config.read(config_path, encoding='utf-8')
else:
self._create_default_config()
def _create_default_config(self):
"""기본 설정 파일 생성"""
self.config['DEFAULT'] = {
'debug': 'false',
'log_level': 'INFO'
}
self.config['database'] = {
'host': 'localhost',
'port': '5432',
'name': 'mydb'
}
self.config['api'] = {
'base_url': 'https://api.example.com',
'timeout': '30'
}
self.save()
def get(self, section, key, fallback=None):
"""설정 값 가져오기"""
return self.config.get(section, key, fallback=fallback)
def getint(self, section, key, fallback=0):
"""정수 설정 값 가져오기"""
return self.config.getint(section, key, fallback=fallback)
def getboolean(self, section, key, fallback=False):
"""불린 설정 값 가져오기"""
return self.config.getboolean(section, key, fallback=fallback)
def set(self, section, key, value):
"""설정 값 저장"""
if section not in self.config:
self.config[section] = {}
self.config[section][key] = str(value)
def save(self):
"""설정 파일 저장"""
with open(self.config_path, 'w', encoding='utf-8') as f:
self.config.write(f)
@property
def database(self):
"""데이터베이스 설정 딕셔너리"""
return dict(self.config['database'])
@property
def api(self):
"""API 설정 딕셔너리"""
return dict(self.config['api'])
# 사용 예
config = Config()
print(f"DB 호스트: {config.get('database', 'host')}")
print(f"API 타임아웃: {config.getint('api', 'timeout')}")
print(f"디버그 모드: {config.getboolean('DEFAULT', 'debug')}")
8.2 python-dotenv 사용
# pip install python-dotenv
from dotenv import load_dotenv
import os
from pathlib import Path
# .env 파일 예시:
"""
# 데이터베이스 설정
DB_HOST=localhost
DB_PORT=5432
DB_NAME=mydb
DB_USER=admin
DB_PASSWORD=secret123
# API 키
OPENAI_API_KEY=sk-xxxxxxxxxxxxx
NAVER_CLIENT_ID=your_client_id
NAVER_CLIENT_SECRET=your_secret
# 환경
ENVIRONMENT=development
DEBUG=true
"""
class EnvConfig:
"""환경 변수 기반 설정 관리"""
def __init__(self, env_file=".env"):
# .env 파일 로드
env_path = Path(env_file)
load_dotenv(env_path)
@staticmethod
def get(key, default=None):
"""환경 변수 가져오기"""
return os.getenv(key, default)
@staticmethod
def get_bool(key, default=False):
"""불린 환경 변수"""
value = os.getenv(key, str(default)).lower()
return value in ('true', '1', 'yes', 'on')
@staticmethod
def get_int(key, default=0):
"""정수 환경 변수"""
try:
return int(os.getenv(key, default))
except ValueError:
return default
@staticmethod
def get_list(key, default=None, separator=','):
"""리스트 환경 변수"""
value = os.getenv(key)
if value is None:
return default or []
return [item.strip() for item in value.split(separator)]
@property
def is_debug(self):
return self.get_bool('DEBUG')
@property
def environment(self):
return self.get('ENVIRONMENT', 'development')
@property
def database_url(self):
"""데이터베이스 연결 URL 생성"""
host = self.get('DB_HOST', 'localhost')
port = self.get('DB_PORT', '5432')
name = self.get('DB_NAME', 'mydb')
user = self.get('DB_USER')
password = self.get('DB_PASSWORD')
if user and password:
return f"postgresql://{user}:{password}@{host}:{port}/{name}"
return f"postgresql://{host}:{port}/{name}"
# 환경별 설정 파일 로드
def load_env_for_environment():
"""환경에 따른 설정 파일 로드"""
env = os.getenv('ENVIRONMENT', 'development')
# 기본 .env 로드
load_dotenv('.env')
# 환경별 .env 로드 (있으면 덮어쓰기)
env_file = f'.env.{env}'
if Path(env_file).exists():
load_dotenv(env_file, override=True)
# 사용 예
config = EnvConfig()
print(f"환경: {config.environment}")
print(f"디버그: {config.is_debug}")
print(f"DB URL: {config.database_url}")
print(f"OpenAI API Key: {config.get('OPENAI_API_KEY', 'Not set')[:20]}...")
9. 백그라운드 실행
9.1 데몬 프로세스 (Linux)
import os
import sys
import time
import signal
import atexit
from pathlib import Path
class Daemon:
"""Unix 데몬 프로세스"""
def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
self.pidfile = pidfile
self.stdin = stdin
self.stdout = stdout
self.stderr = stderr
def daemonize(self):
"""데몬화 (이중 fork)"""
# 첫 번째 fork
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError as e:
sys.stderr.write(f"Fork #1 실패: {e}\n")
sys.exit(1)
# 세션 리더가 됨
os.chdir("/")
os.setsid()
os.umask(0)
# 두 번째 fork
try:
pid = os.fork()
if pid > 0:
sys.exit(0)
except OSError as e:
sys.stderr.write(f"Fork #2 실패: {e}\n")
sys.exit(1)
# 표준 입출력 리다이렉션
sys.stdout.flush()
sys.stderr.flush()
with open(self.stdin, 'r') as si:
os.dup2(si.fileno(), sys.stdin.fileno())
with open(self.stdout, 'a+') as so:
os.dup2(so.fileno(), sys.stdout.fileno())
with open(self.stderr, 'a+') as se:
os.dup2(se.fileno(), sys.stderr.fileno())
# PID 파일 생성
atexit.register(self.delpid)
pid = str(os.getpid())
with open(self.pidfile, 'w+') as f:
f.write(f"{pid}\n")
def delpid(self):
"""PID 파일 삭제"""
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
def start(self):
"""데몬 시작"""
# PID 파일 확인
try:
with open(self.pidfile, 'r') as f:
pid = int(f.read().strip())
except (IOError, ValueError):
pid = None
if pid:
sys.stderr.write(f"PID 파일 {self.pidfile}이 존재합니다. 데몬이 이미 실행 중?\n")
sys.exit(1)
self.daemonize()
self.run()
def stop(self):
"""데몬 중지"""
try:
with open(self.pidfile, 'r') as f:
pid = int(f.read().strip())
except (IOError, ValueError):
pid = None
if not pid:
sys.stderr.write("PID 파일이 없습니다. 데몬이 실행 중이 아닙니다.\n")
return
try:
while True:
os.kill(pid, signal.SIGTERM)
time.sleep(0.1)
except OSError as e:
if str(e).find("No such process") > 0:
if os.path.exists(self.pidfile):
os.remove(self.pidfile)
else:
print(str(e))
sys.exit(1)
def restart(self):
"""데몬 재시작"""
self.stop()
self.start()
def run(self):
"""메인 작업 (서브클래스에서 구현)"""
raise NotImplementedError
# 사용 예
class MyDaemon(Daemon):
def run(self):
while True:
# 실제 작업 수행
with open('/tmp/daemon.log', 'a') as f:
f.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - 데몬 실행 중\n")
time.sleep(60)
# if __name__ == "__main__":
# daemon = MyDaemon('/tmp/mydaemon.pid')
# if len(sys.argv) == 2:
# if sys.argv[1] == 'start':
# daemon.start()
# elif sys.argv[1] == 'stop':
# daemon.stop()
# elif sys.argv[1] == 'restart':
# daemon.restart()
9.2 Windows 서비스
# pip install pywin32
import win32serviceutil
import win32service
import win32event
import servicemanager
import socket
import time
import logging
class MyWindowsService(win32serviceutil.ServiceFramework):
"""Windows 서비스 클래스"""
_svc_name_ = 'MyPythonService'
_svc_display_name_ = 'My Python Service'
_svc_description_ = 'Python으로 만든 Windows 서비스입니다.'
def __init__(self, args):
win32serviceutil.ServiceFramework.__init__(self, args)
self.stop_event = win32event.CreateEvent(None, 0, 0, None)
self.running = True
# 로깅 설정
logging.basicConfig(
filename='C:\\logs\\myservice.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def SvcStop(self):
"""서비스 중지"""
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
win32event.SetEvent(self.stop_event)
self.running = False
logging.info('서비스 중지 요청됨')
def SvcDoRun(self):
"""서비스 메인 루프"""
servicemanager.LogMsg(
servicemanager.EVENTLOG_INFORMATION_TYPE,
servicemanager.PYS_SERVICE_STARTED,
(self._svc_name_, '')
)
logging.info('서비스 시작됨')
self.main()
def main(self):
"""실제 작업 수행"""
while self.running:
# 여기에 실제 작업 로직 구현
logging.info('작업 실행 중...')
time.sleep(60) # 1분마다 실행
if __name__ == '__main__':
win32serviceutil.HandleCommandLine(MyWindowsService)
# 명령어:
# python service.py install # 서비스 설치
# python service.py start # 서비스 시작
# python service.py stop # 서비스 중지
# python service.py remove # 서비스 제거
10. 실전 종합 프로젝트: 일일 업무 자동화 시스템
"""
일일 업무 자동화 시스템
- 뉴스 수집 및 요약
- 날씨 정보 조회
- 일일 리포트 생성
- 이메일 발송
"""
import os
import json
import logging
from datetime import datetime
from pathlib import Path
from dotenv import load_dotenv
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
import requests
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from functools import wraps
import time
# 환경 변수 로드
load_dotenv()
# 디렉토리 설정
BASE_DIR = Path(__file__).parent
LOG_DIR = BASE_DIR / "logs"
DATA_DIR = BASE_DIR / "data"
REPORT_DIR = BASE_DIR / "reports"
for dir_path in [LOG_DIR, DATA_DIR, REPORT_DIR]:
dir_path.mkdir(parents=True, exist_ok=True)
# 로깅 설정
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(LOG_DIR / "automation.log", encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def retry(max_attempts=3, delay=5):
"""재시도 데코레이터"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
logger.error(f"{func.__name__} 최종 실패: {e}")
raise
logger.warning(f"{func.__name__} 실패, 재시도 {attempt + 1}/{max_attempts}")
time.sleep(delay)
return wrapper
return decorator
class NewsCollector:
"""뉴스 수집기"""
def __init__(self):
self.client_id = os.getenv("NAVER_CLIENT_ID")
self.client_secret = os.getenv("NAVER_CLIENT_SECRET")
@retry(max_attempts=3)
def collect(self, query, count=10):
"""네이버 뉴스 검색"""
if not self.client_id or not self.client_secret:
logger.warning("네이버 API 키가 설정되지 않았습니다.")
return []
url = "https://openapi.naver.com/v1/search/news.json"
headers = {
"X-Naver-Client-Id": self.client_id,
"X-Naver-Client-Secret": self.client_secret
}
params = {"query": query, "display": count, "sort": "date"}
response = requests.get(url, headers=headers, params=params, timeout=10)
response.raise_for_status()
items = response.json().get("items", [])
logger.info(f"뉴스 {len(items)}건 수집 완료: {query}")
return items
class WeatherCollector:
"""날씨 정보 수집기"""
def __init__(self):
self.api_key = os.getenv("OPENWEATHERMAP_API_KEY")
@retry(max_attempts=3)
def collect(self, city="Seoul"):
"""날씨 정보 조회"""
if not self.api_key:
logger.warning("OpenWeatherMap API 키가 설정되지 않았습니다.")
return None
url = "https://api.openweathermap.org/data/2.5/weather"
params = {
"q": city,
"appid": self.api_key,
"units": "metric",
"lang": "kr"
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
weather_info = {
"city": city,
"temperature": data["main"]["temp"],
"humidity": data["main"]["humidity"],
"description": data["weather"][0]["description"],
"wind_speed": data["wind"]["speed"]
}
logger.info(f"날씨 정보 수집 완료: {city}")
return weather_info
class ReportGenerator:
"""리포트 생성기"""
def __init__(self):
self.news_collector = NewsCollector()
self.weather_collector = WeatherCollector()
def generate_daily_report(self):
"""일일 리포트 생성"""
logger.info("일일 리포트 생성 시작")
report_data = {
"date": datetime.now().strftime("%Y-%m-%d"),
"generated_at": datetime.now().isoformat(),
"weather": None,
"news": {}
}
# 날씨 정보
try:
report_data["weather"] = self.weather_collector.collect("Seoul")
except Exception as e:
logger.error(f"날씨 수집 실패: {e}")
# 뉴스 수집 (관심 키워드)
keywords = ["인공지능", "Python", "IT"]
for keyword in keywords:
try:
report_data["news"][keyword] = self.news_collector.collect(keyword, 5)
except Exception as e:
logger.error(f"뉴스 수집 실패 ({keyword}): {e}")
# 리포트 저장
report_path = REPORT_DIR / f"report_{datetime.now().strftime('%Y%m%d')}.json"
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(report_data, f, ensure_ascii=False, indent=2)
logger.info(f"리포트 저장: {report_path}")
return report_data
def format_report_html(self, report_data):
"""HTML 형식 리포트 생성"""
html = f"""
일일 업무 리포트 - {report_data['date']}
"""
# 날씨 정보
if report_data.get('weather'):
w = report_data['weather']
html += f"""
오늘의 날씨 ({w['city']})
온도: {w['temperature']}C / 습도: {w['humidity']}%
날씨: {w['description']} / 풍속: {w['wind_speed']}m/s
"""
# 뉴스
for keyword, news_list in report_data.get('news', {}).items():
html += f"{keyword} 관련 뉴스
"
for news in news_list[:5]:
title = news.get('title', '').replace('', '').replace('', '')
link = news.get('link', '#')
pubDate = news.get('pubDate', '')
html += f"""
{pubDate}
"""
html += """
이 리포트는 자동으로 생성되었습니다.
"""
return html
class EmailSender:
"""이메일 발송기"""
def __init__(self):
self.smtp_server = os.getenv("SMTP_SERVER", "smtp.gmail.com")
self.smtp_port = int(os.getenv("SMTP_PORT", 587))
self.sender_email = os.getenv("SENDER_EMAIL")
self.sender_password = os.getenv("SENDER_PASSWORD")
@retry(max_attempts=2)
def send(self, to_email, subject, html_content):
"""이메일 발송"""
if not self.sender_email or not self.sender_password:
logger.warning("이메일 설정이 완료되지 않았습니다.")
return False
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = self.sender_email
msg['To'] = to_email
html_part = MIMEText(html_content, 'html', 'utf-8')
msg.attach(html_part)
try:
with smtplib.SMTP(self.smtp_server, self.smtp_port) as server:
server.starttls()
server.login(self.sender_email, self.sender_password)
server.sendmail(self.sender_email, to_email, msg.as_string())
logger.info(f"이메일 발송 완료: {to_email}")
return True
except Exception as e:
logger.error(f"이메일 발송 실패: {e}")
raise
class DailyAutomation:
"""일일 자동화 메인 클래스"""
def __init__(self):
self.report_generator = ReportGenerator()
self.email_sender = EmailSender()
self.scheduler = BlockingScheduler(timezone='Asia/Seoul')
def morning_routine(self):
"""아침 루틴"""
logger.info("=" * 50)
logger.info("아침 루틴 시작")
try:
# 리포트 생성
report_data = self.report_generator.generate_daily_report()
# HTML 변환
html_content = self.report_generator.format_report_html(report_data)
# 이메일 발송
recipient = os.getenv("REPORT_RECIPIENT")
if recipient:
self.email_sender.send(
recipient,
f"[일일 리포트] {report_data['date']}",
html_content
)
logger.info("아침 루틴 완료")
except Exception as e:
logger.error(f"아침 루틴 실패: {e}")
def evening_routine(self):
"""저녁 루틴"""
logger.info("=" * 50)
logger.info("저녁 루틴 시작")
try:
# 로그 정리, 데이터 백업 등
self._cleanup_old_files()
logger.info("저녁 루틴 완료")
except Exception as e:
logger.error(f"저녁 루틴 실패: {e}")
def _cleanup_old_files(self, days=30):
"""오래된 파일 정리"""
import time
cutoff = time.time() - (days * 86400)
for dir_path in [LOG_DIR, REPORT_DIR]:
for file_path in dir_path.glob("*"):
if file_path.stat().st_mtime < cutoff:
file_path.unlink()
logger.info(f"오래된 파일 삭제: {file_path}")
def setup_schedule(self):
"""스케줄 설정"""
# 매일 오전 8시 아침 루틴
self.scheduler.add_job(
self.morning_routine,
CronTrigger(hour=8, minute=0),
id='morning_routine',
replace_existing=True
)
# 매일 오후 6시 저녁 루틴
self.scheduler.add_job(
self.evening_routine,
CronTrigger(hour=18, minute=0),
id='evening_routine',
replace_existing=True
)
logger.info("스케줄 설정 완료")
logger.info("등록된 작업:")
for job in self.scheduler.get_jobs():
logger.info(f" - {job.id}: 다음 실행 {job.next_run_time}")
def run(self):
"""자동화 시스템 실행"""
logger.info("일일 업무 자동화 시스템 시작")
self.setup_schedule()
try:
self.scheduler.start()
except KeyboardInterrupt:
logger.info("시스템 종료 요청")
self.scheduler.shutdown()
logger.info("시스템 종료 완료")
# 메인 실행
if __name__ == "__main__":
import sys
automation = DailyAutomation()
if len(sys.argv) > 1:
if sys.argv[1] == "test":
# 테스트 실행
print("테스트 모드: 아침 루틴 즉시 실행")
automation.morning_routine()
elif sys.argv[1] == "schedule":
# 스케줄러 실행
automation.run()
else:
print("사용법:")
print(" python automation.py test # 테스트 실행")
print(" python automation.py schedule # 스케줄러 시작")
print()
print(".env 파일 예시:")
print(" NAVER_CLIENT_ID=your_id")
print(" NAVER_CLIENT_SECRET=your_secret")
print(" OPENWEATHERMAP_API_KEY=your_key")
print(" SMTP_SERVER=smtp.gmail.com")
print(" SMTP_PORT=587")
print(" SENDER_EMAIL=your@email.com")
print(" SENDER_PASSWORD=your_app_password")
print(" REPORT_RECIPIENT=recipient@email.com")
마무리
이번 편을 끝으로 Python 자동화 마스터 시리즈를 마무리합니다. 8편에 걸쳐 Python을 활용한 다양한 자동화 기법을 학습했습니다.
이 시리즈에서 다룬 내용들을 정리하면:
- 1-2편: Python 기초와 파일/폴더 자동화
- 3-4편: 엑셀 자동화와 웹 스크래핑
- 5-6편: 이메일 자동화와 데이터베이스 연동
- 7편: API 활용과 데이터 수집
- 8편: 스케줄링과 실전 프로젝트
자동화의 핵심은 반복적인 작업을 코드로 대체하여 시간을 절약하고, 사람의 실수를 줄이며, 더 가치 있는 일에 집중할 수 있게 하는 것입니다. 이 시리즈에서 배운 기술들을 조합하면 거의 모든 종류의 업무를 자동화할 수 있습니다.
실전에서 자동화 시스템을 구축할 때는 다음을 항상 기억하세요:
자동화 시스템 구축 핵심 원칙
1. 점진적 구축: 작은 것부터 시작하여 점차 확장하세요.
2. 철저한 로깅: 문제 발생 시 원인을 파악할 수 있도록 로그를 남기세요.
3. 에러 처리: 예상치 못한 오류에도 시스템이 멈추지 않도록 하세요.
4. 모니터링: 시스템이 정상 동작하는지 주기적으로 확인하세요.
5. 설정 분리: 환경별 설정을 코드와 분리하세요.
Python 자동화는 무한한 가능성을 가지고 있습니다. 이 시리즈를 통해 배운 내용을 바탕으로 여러분만의 자동화 시스템을 구축해보시기 바랍니다. 반복적인 업무에서 해방되어 더 창의적이고 가치 있는 일에 시간을 투자하시길 바랍니다.