文件内容
scripts/diff-analysis.py
#!/usr/bin/env python3
"""
diff-analysis.py — OpenEuler 基线 vs CIS Benchmark 差异分析
从基线目录读取收集到的系统配置,与 CIS Benchmark 映射表中的
期望值进行比较,输出合规审计报告。
用法:
python3 diff-analysis.py <基线目录> [输出报告路径]
python3 diff-analysis.py baseline-20250101-120000
python3 diff-analysis.py baseline-20250101-120000 report.md
依赖:
Python 3.6+
yaml (pip install pyyaml) — 可选,用于读取 YAML 配置
"""
import os
import sys
import re
import csv
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Tuple, Optional
# ============================================================
# CIS Benchmark 映射定义(内嵌默认映射)
# 如需自定义,可复制此结构到 references/custom-mapping.json
# ============================================================
CIS_MAPPING = [
# ---- 1.1 文件系统 ----
{
"id": "1.1.1.1",
"title": "禁用 cramfs 文件系统",
"level": "L1",
"section": "文件系统",
"expected": "cramfs: 0",
"check_type": "file_contains",
"check_path": "disabled-modules.txt",
"check_pattern": r"^cramfs:\s*0$",
"description": "lsmod 不应显示 cramfs 已加载"
},
{
"id": "1.1.1.2",
"title": "禁用 freevxfs 文件系统",
"level": "L1",
"section": "文件系统",
"expected": "freevxfs: 0",
"check_type": "file_contains",
"check_path": "disabled-modules.txt",
"check_pattern": r"^freevxfs:\s*0$",
},
{
"id": "1.1.1.6",
"title": "禁用 squashfs 文件系统",
"level": "L1",
"section": "文件系统",
"expected": "squashfs: 0",
"check_type": "file_contains",
"check_path": "disabled-modules.txt",
"check_pattern": r"^squashfs:\s*0$",
},
{
"id": "1.1.2",
"title": "独立 /tmp 分区",
"level": "L1",
"section": "文件系统",
"expected": "/tmp 应有单独分区",
"check_type": "file_contains",
"check_path": "mounts.txt",
"check_pattern": r"\s/tmp\s",
},
{
"id": "1.1.3",
"title": "/tmp 启用 nodev",
"level": "L1",
"section": "文件系统",
"expected": "/tmp 挂载选项含 nodev",
"check_type": "file_contains",
"check_path": "mounts.txt",
"check_pattern": r"\s/tmp\s.*nodev",
},
{
"id": "1.1.6",
"title": "独立 /var 分区",
"level": "L1",
"section": "文件系统",
"expected": "/var 应有单独分区",
"check_type": "file_contains",
"check_path": "mounts.txt",
"check_pattern": r"\s/var\s",
},
{
"id": "1.1.8",
"title": "独立 /var/log 分区",
"level": "L1",
"section": "文件系统",
"expected": "/var/log 应有单独分区",
"check_type": "file_contains",
"check_path": "mounts.txt",
"check_pattern": r"\s/var/log\s",
},
{
"id": "1.1.10",
"title": "独立 /home 分区",
"level": "L1",
"section": "文件系统",
"expected": "/home 应有单独分区",
"check_type": "file_contains",
"check_path": "mounts.txt",
"check_pattern": r"\s/home\s",
},
# ---- 1.5 内核加固 ----
{
"id": "1.5.3",
"title": "ASLR 启用 (kernel.randomize_va_space)",
"level": "L1",
"section": "内核参数",
"expected": "kernel.randomize_va_space = 2",
"check_type": "file_contains",
"check_path": "sysctl-params.txt",
"check_pattern": r"kernel\.randomize_va_space\s*=\s*2",
},
{
"id": "3.1.1",
"title": "IP 转发禁用",
"level": "L1",
"section": "内核参数",
"expected": "net.ipv4.ip_forward = 0",
"check_type": "file_contains",
"check_path": "sysctl-params.txt",
"check_pattern": r"net\.ipv4\.ip_forward\s*=\s*0",
},
{
"id": "3.1.6",
"title": "TCP SYN cookies 启用",
"level": "L1",
"section": "内核参数",
"expected": "net.ipv4.tcp_syncookies = 1",
"check_type": "file_contains",
"check_path": "sysctl-params.txt",
"check_pattern": r"net\.ipv4\.tcp_syncookies\s*=\s*1",
},
{
"id": "3.1.3",
"title": "ICMP redirect 不接受",
"level": "L1",
"section": "内核参数",
"expected": "net.ipv4.conf.all.accept_redirects = 0",
"check_type": "file_contains",
"check_path": "sysctl-params.txt",
"check_pattern": r"net\.ipv4\.conf\.all\.accept_redirects\s*=\s*0",
},
{
"id": "3.1.7",
"title": "日志伪造包 (log_martians)",
"level": "L2",
"section": "内核参数",
"expected": "net.ipv4.conf.all.log_martians = 1",
"check_type": "file_contains",
"check_path": "sysctl-params.txt",
"check_pattern": r"net\.ipv4\.conf\.all\.log_martians\s*=\s*1",
},
# ---- 1.6 SELinux ----
{
"id": "1.6.1.2",
"title": "SELinux 未禁用",
"level": "L1",
"section": "SELinux",
"expected": "SELINUX 不应为 disabled",
"check_type": "file_contains",
"check_path": "selinux.txt",
"check_pattern": r"SELINUX=disabled",
"invert": True,
},
# ---- 2.2 SSH ----
{
"id": "2.2.1",
"title": "SSH Protocol 2",
"level": "L1",
"section": "SSH",
"expected": "protocol 2",
"check_type": "file_contains",
"check_path": "sshd-config.txt",
"check_pattern": r"protocol\s+2",
},
{
"id": "2.2.3",
"title": "X11Forwarding disabled",
"level": "L1",
"section": "SSH",
"expected": "X11Forwarding no",
"check_type": "file_contains",
"check_path": "sshd-config.txt",
"check_pattern": r"x11forwarding\s+no",
},
{
"id": "2.2.4",
"title": "MaxAuthTries ≤ 4",
"level": "L1",
"section": "SSH",
"expected": "MaxAuthTries ≤ 4",
"check_type": "file_contains",
"check_path": "sshd-config.txt",
"check_pattern": r"maxauthtries\s+([0-4]$)",
},
{
"id": "2.2.7",
"title": "PermitRootLogin no",
"level": "L1",
"section": "SSH",
"expected": "PermitRootLogin no",
"check_type": "file_contains",
"check_path": "sshd-config.txt",
"check_pattern": r"permitrootlogin\s+no",
},
{
"id": "2.2.8",
"title": "PermitEmptyPasswords no",
"level": "L1",
"section": "SSH",
"expected": "PermitEmptyPasswords no",
"check_type": "file_contains",
"check_path": "sshd-config.txt",
"check_pattern": r"permitemptypasswords\s+no",
},
{
"id": "2.2.12",
"title": "ClientAliveInterval ≤ 300",
"level": "L1",
"section": "SSH",
"expected": "ClientAliveInterval ≤ 300",
"check_type": "file_contains",
"check_path": "sshd-config.txt",
"check_pattern": r"optionalclientalivecountmax\s+(\d+)",
"custom_check": "check_client_alive_interval",
},
# ---- 3.3 网络协议 ----
{
"id": "3.3.1",
"title": "禁用 DCCP",
"level": "L2",
"section": "网络协议",
"expected": "DCCP 未加载",
"check_type": "file_contains",
"check_path": "disabled-modules.txt",
"check_pattern": r"^dccp:\s*0$",
},
{
"id": "3.3.2",
"title": "禁用 SCTP",
"level": "L2",
"section": "网络协议",
"expected": "SCTP 未加载",
"check_type": "file_contains",
"check_path": "disabled-modules.txt",
"check_pattern": r"^sctp:\s*0$",
},
# ---- 4.1 auditd ----
{
"id": "4.1.1.1",
"title": "auditd 已安装",
"level": "L2",
"section": "Audit",
"expected": "audit 包已安装",
"check_type": "file_contains",
"check_path": "packages.txt",
"check_pattern": r"^audit-",
},
{
"id": "4.1.1.2",
"title": "auditd 已启用并运行",
"level": "L2",
"section": "Audit",
"expected": "auditd active + enabled",
"check_type": "file_contains",
"check_path": "services.txt",
"check_pattern": r"^auditd:.*enabled=.*active=active",
},
# ---- 5.1 密码策略 ----
{
"id": "5.1.1",
"title": "密码过期天数 ≤ 365",
"level": "L1",
"section": "密码策略",
"expected": "PASS_MAX_DAYS ≤ 365",
"check_type": "file_contains",
"check_path": "password-policy.txt",
"check_pattern": r"PASS_MAX_DAYS\s+(\d+)",
"custom_check": "check_pass_max_days",
},
{
"id": "5.1.2",
"title": "密码最少使用天数 ≥ 7",
"level": "L1",
"section": "密码策略",
"expected": "PASS_MIN_DAYS ≥ 7",
"check_type": "file_contains",
"check_path": "password-policy.txt",
"check_pattern": r"PASS_MIN_DAYS\s+(\d+)",
"custom_check": "check_pass_min_days",
},
{
"id": "5.2.1",
"title": "无空密码用户",
"level": "L1",
"section": "用户",
"expected": "无空密码用户",
"check_type": "file_contains",
"check_path": "password-policy.txt",
"check_pattern": r"空密码用户检查.*none",
},
{
"id": "5.2.2",
"title": "Root 唯一 UID 0",
"level": "L1",
"section": "用户",
"expected": "仅 root 有 UID 0",
"check_type": "file_contains",
"check_path": "password-policy.txt",
"check_pattern": r"UID 0.*root$",
},
{
"id": "5.3.1",
"title": "sudo 已安装",
"level": "L1",
"section": "Sudo",
"expected": "sudo 包已安装",
"check_type": "file_contains",
"check_path": "packages.txt",
"check_pattern": r"^sudo-",
},
{
"id": "5.3.4",
"title": "sudo 超时 ≤ 15 min",
"level": "L1",
"section": "Sudo",
"expected": "timestamp_timeout ≤ 15",
"check_type": "file_contains",
"check_path": "sudo.txt",
"check_pattern": r"timestamp_timeout\s*[:=]\s*(\d+)",
"custom_check": "check_sudo_timeout",
},
]
def load_custom_mapping(mapping_path: str) -> List[Dict]:
"""尝试从外部 JSON 文件加载自定义映射"""
path = Path(mapping_path)
if path.exists():
import json
with open(path) as f:
return json.load(f)
return CIS_MAPPING
def load_baseline_file(baseline_dir: Path, filename: str) -> str:
"""读取基线文件的内容"""
filepath = baseline_dir / filename
if filepath.exists():
return filepath.read_text()
return ""
def check_client_alive_interval(content: str) -> Tuple[bool, str]:
"""检查 ClientAliveInterval ≤ 300"""
match = re.search(r"optionalclientalivecountmax\s+(\d+)", content, re.IGNORECASE)
if not match:
return False, "未配置 ClientAliveInterval"
val = int(match.group(1))
return (val <= 300), f"ClientAliveInterval={val} {'√' if val <= 300 else '✗'}"
def check_pass_max_days(content: str) -> Tuple[bool, str]:
"""检查 PASS_MAX_DAYS ≤ 365"""
match = re.search(r"PASS_MAX_DAYS\s+(\d+)", content)
if not match:
return False, "PASS_MAX_DAYS 未配置"
val = int(match.group(1))
return (val <= 365), f"PASS_MAX_DAYS={val} {'√' if val <= 365 else '✗'}"
def check_pass_min_days(content: str) -> Tuple[bool, str]:
"""检查 PASS_MIN_DAYS ≥ 7"""
match = re.search(r"PASS_MIN_DAYS\s+(\d+)", content)
if not match:
return False, "PASS_MIN_DAYS 未配置"
val = int(match.group(1))
return (val >= 7), f"PASS_MIN_DAYS={val} {'√' if val >= 7 else '✗'}"
def check_sudo_timeout(content: str) -> Tuple[bool, str]:
"""检查 sudo timestamp_timeout ≤ 15"""
match = re.search(r"timestamp_timeout\s*[:=]\s*(\d+)", content, re.IGNORECASE)
if not match:
return False, "timestamp_timeout 未配置(默认可能为 15)"
val = int(match.group(1))
return (val <= 15), f"timestamp_timeout={val} {'√' if val <= 15 else '✗'}"
def _detect_os_version(baseline_dir: Path) -> str:
"""从基线文件中提取 OpenEuler 版本"""
content = load_baseline_file(baseline_dir, "system-info.txt")
match = re.search(r"openEuler\s+(\S+)", content)
if match:
return match.group(1)
match = re.search(r"VERSION_ID=\"?([^\"]+)\"?", content)
if match:
return match.group(1)
return "unknown"
def run_checks(baseline_dir: Path, mapping: List[Dict]) -> List[Dict]:
"""执行所有检查,返回结果列表"""
results = []
for item in mapping:
check_id = item["id"]
title = item["title"]
expected = item["expected"]
section = item.get("section", "通用")
level = item.get("level", "L1")
check_path = item.get("check_path", "")
invert = item.get("invert", False)
custom_check_fn = item.get("custom_check")
# 读取需要检查的基线文件
content = load_baseline_file(baseline_dir, check_path)
# 自定义检查器
if custom_check_fn:
fn_name = custom_check_fn
fn_map = {
"check_client_alive_interval": check_client_alive_interval,
"check_pass_max_days": check_pass_max_days,
"check_pass_min_days": check_pass_min_days,
"check_sudo_timeout": check_sudo_timeout,
}
if fn_name in fn_map:
passed, detail = fn_map[fn_name](content)
results.append({
"id": check_id,
"title": title,
"section": section,
"level": level,
"expected": expected,
"actual": detail,
"passed": passed,
"status": "PASS" if passed else "FAIL",
})
else:
results.append({
"id": check_id,
"title": title,
"section": section,
"level": level,
"expected": expected,
"actual": "未知自定义检查",
"passed": False,
"status": "MANUAL",
})
continue
# 正则检查
if check_path:
pattern = item.get("check_pattern", "")
match = re.search(pattern, content, re.IGNORECASE)
found = match is not None
if invert:
passed = not found
detail = "匹配到禁用配置" if found else "未发现禁用配置"
else:
passed = found
detail = match.group(0) if found else "未检测到期望值"
results.append({
"id": check_id,
"title": title,
"section": section,
"level": level,
"expected": expected,
"actual": detail if not passed else detail,
"passed": passed,
"status": "PASS" if passed else "FAIL",
})
else:
results.append({
"id": check_id,
"title": title,
"section": section,
"level": level,
"expected": expected,
"actual": "未知检查类型",
"passed": False,
"status": "MANUAL",
})
return results
def generate_report(baseline_dir: Path, results: List[Dict], os_version: str) -> str:
"""生成 Markdown 格式的合规报告"""
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
total = len(results)
passed = sum(1 for r in results if r["status"] == "PASS")
failed = sum(1 for r in results if r["status"] == "FAIL")
manual = sum(1 for r in results if r["status"] == "MANUAL")
lines = []
lines.append(f"# CIS Benchmark 合规审计报告")
lines.append(f"")
lines.append(f"**目标系统:** OpenEuler {os_version}")
lines.append(f"**基线目录:** {baseline_dir}")
lines.append(f"**审计时间:** {now}")
lines.append(f"")
lines.append(f"## 摘要")
lines.append(f"")
lines.append(f"| 指标 | 数值 |")
lines.append(f"|------|------|")
lines.append(f"| 总计 | {total} |")
lines.append(f"| ✅ 通过 | {passed} |")
lines.append(f"| ❌ 未通过 | {failed} |")
lines.append(f"| ⚠️ 需人工检查 | {manual} |")
lines.append(f"| 通过率 | {(passed/total*100):.1f}% |")
lines.append(f"")
# 按 section 分组
sections = {}
for r in results:
sec = r["section"]
if sec not in sections:
sections[sec] = []
sections[sec].append(r)
lines.append(f"## 详细检查结果")
lines.append(f"")
for section_name, items in sections.items():
lines.append(f"### {section_name}")
lines.append(f"")
lines.append(f"| ID | 标题 | 等级 | 期望 | 实际 | 状态 |")
lines.append(f"|----|------|------|------|------|------|")
for r in items:
status_icon = {"PASS": "✅", "FAIL": "❌", "MANUAL": "⚠️"}.get(r["status"], "❓")
lines.append(f"| {r['id']} | {r['title']} | {r['level']} | {r['expected']} | {r['actual']} | {status_icon} {r['status']} |")
lines.append(f"")
# 失败项汇总
failed_items = [r for r in results if r["status"] == "FAIL"]
if failed_items:
lines.append(f"## ❌ 需修复项目")
lines.append(f"")
for r in failed_items:
lines.append(f"- **{r['id']}** {r['title']}: 期望 `{r['expected']}`,实际 `{r['actual']}`")
lines.append(f"")
# 人工检查项
manual_items = [r for r in results if r["status"] == "MANUAL"]
if manual_items:
lines.append(f"## ⚠️ 需人工检查的项目")
lines.append(f"")
for r in manual_items:
lines.append(f"- **{r['id']}** {r['title']}: {r['actual']}")
lines.append(f"")
return "\n".join(lines)
def main():
if len(sys.argv) < 2:
print("用法: python3 diff-analysis.py <基线目录> [输出报告路径]", file=sys.stderr)
sys.exit(1)
baseline_path = Path(sys.argv[1])
if not baseline_path.exists() or not baseline_path.is_dir():
print(f"错误: 基线目录不存在: {baseline_path}", file=sys.stderr)
sys.exit(1)
output_path = sys.argv[2] if len(sys.argv) > 2 else None
# 检测 OS 版本
os_version = _detect_os_version(baseline_path)
print(f"[INFO] 检测到 OpenEuler 版本: {os_version}")
# 尝试加载自定义映射
mapping_path = baseline_path.parent / ".." / "references" / "cis-mapping.json"
mapping = load_custom_mapping(mapping_path)
print(f"[INFO] 加载 {len(mapping)} 个检查项")
# 执行检查
results = run_checks(baseline_path, mapping)
print(f"[INFO] 完成检查: {len(results)} 项")
# 生成报告
report = generate_report(baseline_path, results, os_version)
if output_path:
output_file = Path(output_path)
output_file.parent.mkdir(parents=True, exist_ok=True)
output_file.write_text(report)
print(f"[INFO] 报告已保存: {output_file}")
else:
print()
print("=" * 60)
print(report)
# 摘要输出
passed = sum(1 for r in results if r["status"] == "PASS")
failed = sum(1 for r in results if r["status"] == "FAIL")
manual = sum(1 for r in results if r["status"] == "MANUAL")
print(f"\n[SUMMARY] ✅ {passed} / ❌ {failed} / ⚠️ {manual} (共 {len(results)} 项)")
if __name__ == "__main__":
main()