Introduction: The Final Piece - Scheduling

The key to making all the techniques we've learned - file processing, web scraping, API integration - truly useful is 'scheduling'. Scripts that run automatically at scheduled times completely replace manual work and perform tasks 24/7 without rest.

In this final part, we'll learn various methods for scheduling and running tasks in Python, along with logging, error handling, and configuration management techniques for building stable automation systems. Finally, we'll build a real-world project that combines everything we've learned in this series.

1. Time Handling: time and datetime Modules

1.1 time Module Basics

import time

# Current time (Unix timestamp)
timestamp = time.time()
print(f"Current timestamp: {timestamp}")  # e.g., 1737529200.123456

# Program pause
print("Waiting 3 seconds...")
time.sleep(3)
print("Wait complete!")

# Measure execution time
start_time = time.time()
# Perform task
for i in range(1000000):
    pass
end_time = time.time()
print(f"Execution time: {end_time - start_time:.4f} seconds")

# More precise time measurement
start = time.perf_counter()
# Perform task
time.sleep(0.1)
end = time.perf_counter()
print(f"Precise measurement: {end - start:.6f} seconds")

# Structured time
local_time = time.localtime()
print(f"Current: {local_time.tm_year}-{local_time.tm_mon}-{local_time.tm_mday}")
print(f"Time: {local_time.tm_hour}:{local_time.tm_min}:{local_time.tm_sec}")

# Convert to string
formatted = time.strftime("%Y-%m-%d %H:%M:%S", local_time)
print(f"Formatted time: {formatted}")

1.2 Using the datetime Module

from datetime import datetime, date, time, timedelta
import pytz  # pip install pytz

# Current date and time
now = datetime.now()
print(f"Now: {now}")
print(f"Date: {now.date()}")
print(f"Time: {now.time()}")

# Create specific date/time
specific_date = datetime(2026, 1, 22, 9, 30, 0)
print(f"Specific time: {specific_date}")

# Date formatting
formatted = now.strftime("%Y/%m/%d %H:%M:%S")
print(f"Formatted: {formatted}")

# String parsing
date_str = "2026-01-22 14:30:00"
parsed = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
print(f"Parsed date: {parsed}")

# Date arithmetic
tomorrow = now + timedelta(days=1)
next_week = now + timedelta(weeks=1)
two_hours_later = now + timedelta(hours=2)

print(f"Tomorrow: {tomorrow.date()}")
print(f"Next week: {next_week.date()}")
print(f"2 hours later: {two_hours_later.time()}")

# Difference between two dates
date1 = datetime(2026, 12, 31)
date2 = datetime.now()
diff = date1 - date2
print(f"{diff.days} days until end of 2026")

# Timezone handling
eastern = pytz.timezone('US/Eastern')
utc = pytz.UTC

now_eastern = datetime.now(eastern)
now_utc = datetime.now(utc)

print(f"Eastern Time: {now_eastern}")
print(f"UTC Time: {now_utc}")

# Convert UTC to Eastern
utc_time = datetime(2026, 1, 22, 12, 0, 0, tzinfo=utc)
eastern_time = utc_time.astimezone(eastern)
print(f"UTC 12:00 = Eastern {eastern_time.strftime('%H:%M')}")

1.3 Useful Date Utility Functions

from datetime import datetime, timedelta
import calendar

def get_weekday_name(date_obj):
    """Return weekday name"""
    weekdays = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
    return weekdays[date_obj.weekday()]

def get_month_range(year, month):
    """Return first and last day of the month"""
    first_day = datetime(year, month, 1)
    last_day_num = calendar.monthrange(year, month)[1]
    last_day = datetime(year, month, last_day_num)
    return first_day, last_day

def get_business_days(start_date, end_date):
    """Calculate business days between two dates"""
    business_days = 0
    current = start_date

    while current <= end_date:
        if current.weekday() < 5:  # Mon-Fri
            business_days += 1
        current += timedelta(days=1)

    return business_days

def is_business_hour(check_time=None, start_hour=9, end_hour=18):
    """Check if it's business hours"""
    if check_time is None:
        check_time = datetime.now()

    # Weekend check
    if check_time.weekday() >= 5:
        return False

    # Hour check
    hour = check_time.hour
    return start_hour <= hour < end_hour

# Usage examples
today = datetime.now()
print(f"Today is {get_weekday_name(today)}.")

first, last = get_month_range(2026, 1)
print(f"January 2026: {first.date()} ~ {last.date()}")

days = get_business_days(datetime(2026, 1, 1), datetime(2026, 1, 31))
print(f"Business days in January 2026: {days}")

print(f"Is business hour now: {is_business_hour()}")

2. The schedule Library

2.1 Basic Usage

import schedule
import time

# pip install schedule

def job():
    print(f"[{time.strftime('%H:%M:%S')}] Job executed!")

def morning_report():
    print("Generating morning report.")

def backup_data():
    print("Backing up data.")

# Various scheduling patterns
schedule.every(10).seconds.do(job)  # Every 10 seconds
schedule.every(5).minutes.do(job)   # Every 5 minutes
schedule.every().hour.do(job)        # Every hour
schedule.every().day.at("09:00").do(morning_report)  # Daily at 9 AM
schedule.every().monday.do(job)      # Every Monday
schedule.every().wednesday.at("13:15").do(job)  # Every Wednesday at 13:15

# Run at specific time
schedule.every().day.at("00:00").do(backup_data)  # Backup at midnight

# Tag jobs
schedule.every().second.do(job).tag('limited')

# Main loop
print("Scheduler started...")
while True:
    schedule.run_pending()
    time.sleep(1)

2.2 Advanced schedule Patterns

import schedule
import time
from functools import partial

# Job with arguments
def greet(name):
    print(f"Hello, {name}!")

# Pass arguments using partial
schedule.every().day.at("08:00").do(partial(greet, "John"))

# Pass arguments using lambda
schedule.every().day.at("08:30").do(lambda: greet("Jane"))

# Job management with tags
def data_sync():
    print("Syncing data...")

def send_notification():
    print("Sending notification...")

schedule.every(30).minutes.do(data_sync).tag('sync', 'database')
schedule.every().hour.do(send_notification).tag('notification')

# Cancel jobs with specific tag
# schedule.clear('sync')

# Get jobs with specific tag
sync_jobs = schedule.get_jobs('sync')
print(f"Number of sync jobs: {len(sync_jobs)}")

# Cancel job (run only once)
def run_once():
    print("This job runs only once")
    return schedule.CancelJob

schedule.every().second.do(run_once)

# Check next run time
next_run = schedule.next_run()
print(f"Next job run time: {next_run}")

# Check idle time
idle_seconds = schedule.idle_seconds()
print(f"{idle_seconds} seconds until next job")

# Run scheduler
while True:
    schedule.run_pending()
    time.sleep(1)

2.3 Scheduler with Exception Handling

import schedule
import time
import traceback
from functools import wraps

def catch_exceptions(job_func):
    """Decorator that catches and logs exceptions"""
    @wraps(job_func)
    def wrapper(*args, **kwargs):
        try:
            return job_func(*args, **kwargs)
        except Exception as e:
            print(f"Error during job execution: {e}")
            traceback.print_exc()
            # Scheduler continues running even on error
            return None
    return wrapper

@catch_exceptions
def risky_job():
    """A job that might fail"""
    import random
    if random.random() < 0.3:
        raise Exception("Random error occurred!")
    print("Job succeeded!")

schedule.every(5).seconds.do(risky_job)

# Safe scheduler loop
def run_scheduler():
    while True:
        try:
            schedule.run_pending()
        except Exception as e:
            print(f"Scheduler error: {e}")
        time.sleep(1)

run_scheduler()

3. The APScheduler Library

3.1 Introduction and Installation

# pip install apscheduler

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
from datetime import datetime, timedelta

def my_job(message="Default message"):
    print(f"[{datetime.now().strftime('%H:%M:%S')}] {message}")

# BlockingScheduler - Runs in main thread (blocking)
scheduler = BlockingScheduler()

# BackgroundScheduler - Runs in background (non-blocking)
# scheduler = BackgroundScheduler()

# Interval trigger - Run at regular intervals
scheduler.add_job(
    my_job,
    IntervalTrigger(seconds=10),
    args=["Run every 10 seconds"],
    id='interval_job'
)

# Cron trigger - Use cron expressions
scheduler.add_job(
    my_job,
    CronTrigger(hour=9, minute=0),
    args=["Daily at 9 AM"],
    id='daily_job'
)

# Date trigger - Run once at specific date/time
scheduler.add_job(
    my_job,
    DateTrigger(run_date=datetime.now() + timedelta(seconds=5)),
    args=["Run once in 5 seconds"],
    id='one_time_job'
)

# Start scheduler
try:
    print("APScheduler started...")
    scheduler.start()
except KeyboardInterrupt:
    print("Scheduler shutdown")
    scheduler.shutdown()

3.2 Cron Expressions

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime

scheduler = BlockingScheduler()

def job(name):
    print(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {name} executed")

# Cron expression pattern
# minute (0-59)
# | hour (0-23)
# | | day of month (1-31)
# | | | month (1-12)
# | | | | day of week (0-6, 0=Monday)
# | | | | |
# * * * * *

# Run every minute
scheduler.add_job(job, CronTrigger(minute='*'), args=["Every minute"])

# Run at 30 minutes past every hour
scheduler.add_job(job, CronTrigger(minute=30), args=["30 past every hour"])

# Run at 9 AM and 6 PM daily
scheduler.add_job(job, CronTrigger(hour='9,18', minute=0), args=["9 AM and 6 PM"])

# Run at 9 AM on weekdays (Mon-Fri)
scheduler.add_job(job, CronTrigger(day_of_week='mon-fri', hour=9, minute=0), args=["Weekdays 9 AM"])

# Run at midnight on the 1st of every month
scheduler.add_job(job, CronTrigger(day=1, hour=0, minute=0), args=["1st of month"])

# Run every Monday at 8:30 AM
scheduler.add_job(job, CronTrigger(day_of_week='mon', hour=8, minute=30), args=["Monday 8:30"])

# Run every 15 minutes
scheduler.add_job(job, CronTrigger(minute='*/15'), args=["Every 15 minutes"])

# Run every 30 minutes during business hours (9-18)
scheduler.add_job(
    job,
    CronTrigger(hour='9-17', minute='0,30'),
    args=["Business hours every 30 min"]
)

# Pass cron expression as string
scheduler.add_job(
    job,
    CronTrigger.from_crontab('0 9 * * 1-5'),  # Weekdays 9 AM
    args=["crontab format"]
)

scheduler.start()

3.3 Job Stores and Event Listeners

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from datetime import datetime
import time

# Job store configuration (using SQLite)
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}

# Executor configuration
executors = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}

# Default settings
job_defaults = {
    'coalesce': False,  # Whether to merge missed jobs
    'max_instances': 3,  # Max concurrent instances
    'misfire_grace_time': 60  # Grace time for missed jobs (seconds)
}

scheduler = BackgroundScheduler(
    jobstores=jobstores,
    executors=executors,
    job_defaults=job_defaults,
    timezone='America/New_York'
)

# Event listeners
def job_executed_listener(event):
    """Called when job completes"""
    print(f"Job completed: {event.job_id}")
    print(f"Scheduled run time: {event.scheduled_run_time}")
    print(f"Return value: {event.retval}")

def job_error_listener(event):
    """Called when job errors"""
    print(f"Job error: {event.job_id}")
    print(f"Exception: {event.exception}")
    print(f"Traceback: {event.traceback}")

scheduler.add_listener(job_executed_listener, EVENT_JOB_EXECUTED)
scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR)

# Define jobs
def sample_job():
    print(f"Job running at {datetime.now()}")
    return "Success"

scheduler.add_job(sample_job, 'interval', seconds=10, id='sample')

scheduler.start()

# Keep main thread running
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    scheduler.shutdown()

4. Windows Task Scheduler Integration

# Create a batch file to run Python script
# run_script.bat
"""
@echo off
cd /d "C:\path\to\your\script"
"C:\Python311\python.exe" your_script.py
"""

# Or use PowerShell to create scheduled task programmatically
import subprocess
import os

def create_windows_task(task_name, script_path, schedule_time="09:00"):
    """Create Windows Task Scheduler task"""
    python_path = os.sys.executable

    # schtasks command
    cmd = f'''schtasks /create /tn "{task_name}" /tr "\\"{python_path}\\" \\"{script_path}\\"" /sc daily /st {schedule_time} /f'''

    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)

    if result.returncode == 0:
        print(f"Task '{task_name}' created successfully")
    else:
        print(f"Error creating task: {result.stderr}")

def delete_windows_task(task_name):
    """Delete Windows Task Scheduler task"""
    cmd = f'schtasks /delete /tn "{task_name}" /f'
    subprocess.run(cmd, shell=True)

def list_windows_tasks():
    """List all scheduled tasks"""
    cmd = 'schtasks /query /fo LIST'
    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
    print(result.stdout)

# Usage
# create_windows_task("DailyReport", "C:/scripts/daily_report.py", "09:00")

5. Linux Cron Integration

# View current crontab
# crontab -l

# Edit crontab
# crontab -e

# Cron expression format
# minute hour day_of_month month day_of_week command

# Examples:
# 0 9 * * * /usr/bin/python3 /path/to/script.py   # Daily at 9 AM
# */15 * * * * /usr/bin/python3 /path/to/script.py  # Every 15 minutes
# 0 0 1 * * /usr/bin/python3 /path/to/backup.py  # Monthly on 1st

# Python script to manage crontab
from crontab import CronTab  # pip install python-crontab

def add_cron_job(command, comment, schedule):
    """Add a cron job"""
    cron = CronTab(user=True)

    # Check if job already exists
    for job in cron:
        if job.comment == comment:
            print(f"Job '{comment}' already exists")
            return

    job = cron.new(command=command, comment=comment)
    job.setall(schedule)  # e.g., '0 9 * * *'

    cron.write()
    print(f"Job '{comment}' added: {schedule}")

def remove_cron_job(comment):
    """Remove a cron job by comment"""
    cron = CronTab(user=True)
    cron.remove_all(comment=comment)
    cron.write()
    print(f"Job '{comment}' removed")

def list_cron_jobs():
    """List all cron jobs"""
    cron = CronTab(user=True)
    for job in cron:
        print(f"{job.comment}: {job}")

# Usage
# add_cron_job(
#     '/usr/bin/python3 /home/user/scripts/report.py',
#     'daily_report',
#     '0 9 * * *'
# )

6. Logging for Automation

import logging
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
from datetime import datetime
import os

def setup_logging(log_dir='logs', log_name='automation'):
    """Set up comprehensive logging"""

    # Create log directory
    os.makedirs(log_dir, exist_ok=True)

    # Create logger
    logger = logging.getLogger(log_name)
    logger.setLevel(logging.DEBUG)

    # Console handler (INFO level)
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    console_format = logging.Formatter(
        '%(asctime)s - %(levelname)s - %(message)s',
        datefmt='%H:%M:%S'
    )
    console_handler.setFormatter(console_format)

    # File handler with rotation (DEBUG level)
    file_handler = RotatingFileHandler(
        os.path.join(log_dir, f'{log_name}.log'),
        maxBytes=10*1024*1024,  # 10MB
        backupCount=5
    )
    file_handler.setLevel(logging.DEBUG)
    file_format = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
    )
    file_handler.setFormatter(file_format)

    # Error file handler
    error_handler = RotatingFileHandler(
        os.path.join(log_dir, f'{log_name}_error.log'),
        maxBytes=10*1024*1024,
        backupCount=5
    )
    error_handler.setLevel(logging.ERROR)
    error_handler.setFormatter(file_format)

    # Add handlers
    logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    logger.addHandler(error_handler)

    return logger

# Usage
logger = setup_logging()

def automated_task():
    logger.info("Task started")
    try:
        # Your task code here
        logger.debug("Processing data...")
        result = process_data()
        logger.info(f"Task completed: {result}")
    except Exception as e:
        logger.error(f"Task failed: {e}", exc_info=True)
        raise

def process_data():
    return "Success"

7. Configuration Management

# Using .env files
# pip install python-dotenv

from dotenv import load_dotenv
import os

# .env file content:
# API_KEY=your_api_key
# DATABASE_URL=postgresql://user:pass@localhost/db
# DEBUG=true

load_dotenv()

API_KEY = os.getenv('API_KEY')
DATABASE_URL = os.getenv('DATABASE_URL')
DEBUG = os.getenv('DEBUG', 'false').lower() == 'true'

# Using configparser for INI files
import configparser

def load_config(config_file='config.ini'):
    """Load configuration from INI file"""
    config = configparser.ConfigParser()
    config.read(config_file)
    return config

# config.ini content:
# [database]
# host = localhost
# port = 5432
# name = mydb
#
# [api]
# key = your_api_key
# timeout = 30

config = load_config()
db_host = config.get('database', 'host')
api_timeout = config.getint('api', 'timeout')

# Using YAML for complex configuration
# pip install pyyaml

import yaml

def load_yaml_config(config_file='config.yaml'):
    """Load configuration from YAML file"""
    with open(config_file, 'r') as f:
        return yaml.safe_load(f)

# config.yaml content:
# database:
#   host: localhost
#   port: 5432
# tasks:
#   - name: daily_report
#     schedule: "0 9 * * *"
#   - name: backup
#     schedule: "0 0 * * *"

config = load_yaml_config()
tasks = config['tasks']

8. Complete Automation System Example

"""
Complete Automation System
- Daily report generation
- API data collection
- Email notifications
- Error handling and logging
"""

import schedule
import time
import logging
from datetime import datetime
import os
from functools import wraps

# Configuration
class Config:
    LOG_DIR = 'logs'
    REPORT_DIR = 'reports'
    SCHEDULE_REPORT_TIME = "09:00"
    SCHEDULE_BACKUP_TIME = "00:00"

# Set up logging
def setup_logger():
    os.makedirs(Config.LOG_DIR, exist_ok=True)

    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(f'{Config.LOG_DIR}/automation.log'),
            logging.StreamHandler()
        ]
    )
    return logging.getLogger(__name__)

logger = setup_logger()

# Decorator for job error handling
def job_wrapper(job_func):
    @wraps(job_func)
    def wrapper(*args, **kwargs):
        job_name = job_func.__name__
        logger.info(f"Starting job: {job_name}")
        start_time = time.time()

        try:
            result = job_func(*args, **kwargs)
            elapsed = time.time() - start_time
            logger.info(f"Job {job_name} completed in {elapsed:.2f}s")
            return result
        except Exception as e:
            logger.error(f"Job {job_name} failed: {e}", exc_info=True)
            send_error_notification(job_name, str(e))
            return None
    return wrapper

def send_error_notification(job_name, error_message):
    """Send notification when job fails"""
    # Implement your notification logic here
    logger.warning(f"Error notification sent for {job_name}")

@job_wrapper
def generate_daily_report():
    """Generate and send daily report"""
    os.makedirs(Config.REPORT_DIR, exist_ok=True)

    # Generate report
    report_date = datetime.now().strftime('%Y-%m-%d')
    report_file = f"{Config.REPORT_DIR}/report_{report_date}.txt"

    with open(report_file, 'w') as f:
        f.write(f"Daily Report - {report_date}\n")
        f.write("=" * 50 + "\n")
        f.write(f"Generated at: {datetime.now()}\n")
        # Add your report content here

    logger.info(f"Report generated: {report_file}")
    return report_file

@job_wrapper
def collect_api_data():
    """Collect data from external APIs"""
    # Implement your API collection logic
    logger.info("API data collected")
    return True

@job_wrapper
def perform_backup():
    """Perform system backup"""
    # Implement your backup logic
    logger.info("Backup completed")
    return True

def setup_schedule():
    """Set up all scheduled jobs"""
    schedule.every().day.at(Config.SCHEDULE_REPORT_TIME).do(generate_daily_report)
    schedule.every().day.at(Config.SCHEDULE_BACKUP_TIME).do(perform_backup)
    schedule.every(30).minutes.do(collect_api_data)

    logger.info("Schedule configured:")
    logger.info(f"  - Daily report: {Config.SCHEDULE_REPORT_TIME}")
    logger.info(f"  - Backup: {Config.SCHEDULE_BACKUP_TIME}")
    logger.info(f"  - API collection: every 30 minutes")

def run():
    """Main entry point"""
    logger.info("Automation system starting...")
    setup_schedule()

    while True:
        try:
            schedule.run_pending()
            time.sleep(60)
        except KeyboardInterrupt:
            logger.info("Shutdown requested...")
            break
        except Exception as e:
            logger.error(f"Scheduler error: {e}")
            time.sleep(60)

    logger.info("Automation system stopped")

if __name__ == "__main__":
    run()

Conclusion

Congratulations! You've completed the Python Automation Master series. In this final part, we covered the essential components for building production-ready automation systems:

  • Time handling: Working with time and datetime modules
  • schedule library: Simple and intuitive Python-based scheduling
  • APScheduler: Advanced scheduling with persistence and event handling
  • OS integration: Windows Task Scheduler and Linux cron
  • Logging: Comprehensive logging for monitoring and debugging
  • Configuration management: Using .env, INI, and YAML files
  • Error handling: Building resilient automation systems

Throughout this series, you've learned all the essential skills for Python automation:

  1. Part 1-4: Python basics and file system operations
  2. Part 5: Excel and CSV automation
  3. Part 6: Email and notification systems
  4. Part 7: API integration and data collection
  5. Part 8: Scheduling and production deployment

Now you have all the tools to build powerful automation solutions that can save hours of manual work every day. Start with small projects, iterate, and gradually build more complex systems. Happy automating!