#!/usr/bin/env python3 """ RPM 安全扫描结果解析与报告生成 """ import sqlite3 import json import sys from pathlib import Path from collections import defaultdict class ScanReporter: def __init__(self, db_path): self.db = sqlite3.connect(db_path) self.db.row_factory = sqlite3.Row def parse_checksec_results(self, results_dir): """解析 checksec JSON 结果并入库""" results_dir = Path(results_dir) for json_file in results_dir.glob("*.json"): try: with open(json_file) as f: data = json.load(f) pkg_name = json_file.stem # 获取 package_id cur = self.db.execute( "SELECT id FROM packages WHERE name=? AND status='success'", (pkg_name.split('-')[0],) ) row = cur.fetchone() if not row: continue pkg_id = row[0] # 解析每个文件的结果 for file_path, checks in data.items(): # 插入 binary 记录 cur = self.db.execute( "INSERT INTO binaries (package_id, file_path, file_type) VALUES (?, ?, ?) RETURNING id", (pkg_id, file_path, checks.get('type', 'unknown')) ) binary_id = cur.fetchone()[0] # 插入安全检查结果 self.db.execute( """INSERT INTO security_checks (binary_id, pie, nx, canary, fortify, relro, bind_now) VALUES (?, ?, ?, ?, ?, ?, ?)""", ( binary_id, checks.get('pie', 'unknown'), checks.get('nx', 'unknown'), checks.get('canary', 'unknown'), checks.get('fortify', 'unknown'), checks.get('relro', 'unknown'), checks.get('bind_now', 'unknown') ) ) self.db.commit() except Exception as e: print(f"解析失败 {json_file}: {e}", file=sys.stderr) def generate_statistics(self): """生成统计报告""" stats = {} # 总体统计 cur = self.db.execute("SELECT COUNT(*) FROM packages WHERE status='success'") stats['total_packages'] = cur.fetchone()[0] cur = self.db.execute("SELECT COUNT(*) FROM binaries") stats['total_binaries'] = cur.fetchone()[0] # 定义每个特性的启用判断规则 feature_rules = { 'pie': { 'enabled': ['PIE Enabled'], 'disabled': ['PIE Disabled'], 'label': 'PIE' }, 'nx': { 'enabled': ['NX enabled', 'NX enabled'], 'disabled': ['NX disabled', 'NX disabled'], 'label': 'NX' }, 'canary': { 'enabled': ['Canary Found'], 'disabled': ['No Canary Found', 'No canary found'], 'label': 'CANARY' }, 'fortify': { 'enabled_values': lambda x: x and x != '0' and x != 'unknown', 'label': 'FORTIFY' }, 'relro': { 'enabled': ['Full RELRO'], 'partial': ['Partial RELRO'], 'disabled': ['No RELRO'], 'label': 'RELRO' } } stats['coverage'] = {} for feature, rules in feature_rules.items(): cur = self.db.execute(f""" SELECT {feature}, COUNT(*) as count FROM security_checks GROUP BY {feature} """) feature_stats = {} total = 0 enabled = 0 for row in cur: value = row[0] count = row[1] feature_stats[value] = count total += count # 根据规则判断是否启用 if 'enabled' in rules and value in rules['enabled']: enabled += count elif 'enabled_values' in rules and rules['enabled_values'](value): enabled += count coverage_rate = f"{enabled/total*100:.2f}%" if total > 0 else "0.00%" stats['coverage'][feature] = { 'label': rules['label'], 'details': feature_stats, 'rate': coverage_rate, 'total': total, 'enabled': enabled } return stats def export_csv(self, output_file): """导出 CSV 报告""" cur = self.db.execute(""" SELECT p.name, b.file_path, s.pie, s.nx, s.canary, s.fortify, s.relro, s.bind_now FROM packages p JOIN binaries b ON p.id = b.package_id JOIN security_checks s ON b.id = s.binary_id ORDER BY p.name, b.file_path """) with open(output_file, 'w') as f: f.write("Package,File,PIE,NX,Canary,Fortify,RELRO,BIND_NOW\n") for row in cur: f.write(','.join(str(x) for x in row) + '\n') def print_report(self): """打印统计报告""" stats = self.generate_statistics() # Markdown 格式输出 print("# RPM 安全扫描统计报告") print() print(f"**扫描包数量**: {stats['total_packages']}") print(f"**二进制文件数**: {stats['total_binaries']}") print() print("## 安全特性覆盖率") print() # 按顺序输出: PIE, NX, CANARY, FORTIFY, RELRO feature_order = ['pie', 'nx', 'canary', 'fortify', 'relro'] for feature in feature_order: if feature not in stats['coverage']: continue data = stats['coverage'][feature] label = data['label'] rate = data['rate'] details = data['details'] # 输出标题和覆盖率 print(f"### {label}") print() print(f"**覆盖率**: {rate}") print() print("| 状态 | 数量 | 百分比 |") print("|------|------|--------|") # 输出详细统计(按数量降序排列) sorted_details = sorted(details.items(), key=lambda x: x[1], reverse=True) for value, count in sorted_details: percentage = count * 100 / data['total'] if data['total'] > 0 else 0 print(f"| {value} | {count} | {percentage:.2f}% |") print() print("## 综合安全评分") print() # 计算综合安全评分 total_binaries = stats['total_binaries'] if total_binaries > 0: pie_enabled = stats['coverage']['pie']['enabled'] nx_enabled = stats['coverage']['nx']['enabled'] canary_enabled = stats['coverage']['canary']['enabled'] relro_full = stats['coverage']['relro']['enabled'] # 计算所有保护都开启的文件数(取最小值) fully_protected = min(pie_enabled, nx_enabled, canary_enabled, relro_full) security_score = fully_protected * 100 / total_binaries print(f"- **综合安全评分**: {security_score:.2f}%") print(f"- **完全加固的文件**: {fully_protected}/{total_binaries}") print(f"- **标准**: PIE + NX + CANARY + Full RELRO") print() def main(): # 设置默认数据库路径 default_db = "scan_results.db" # 解析参数 db_path = default_db args_start = 0 # 如果第一个参数不是选项,则作为数据库路径 if len(sys.argv) > 1 and not sys.argv[1].startswith('--'): db_path = sys.argv[1] args_start = 1 reporter = ScanReporter(db_path) # 处理选项参数 if '--parse-results' in sys.argv: idx = sys.argv.index('--parse-results') # 检查参数是否越界 if idx + 1 >= len(sys.argv): print("错误: --parse-results 需要指定结果目录路径", file=sys.stderr) sys.exit(1) results_dir = sys.argv[idx + 1] print(f"解析结果目录: {results_dir}") reporter.parse_checksec_results(results_dir) if '--export-csv' in sys.argv: idx = sys.argv.index('--export-csv') # 检查参数是否越界 if idx + 1 >= len(sys.argv): print("错误: --export-csv 需要指定输出 CSV 文件路径", file=sys.stderr) sys.exit(1) output_file = sys.argv[idx + 1] print(f"导出 CSV: {output_file}") reporter.export_csv(output_file) reporter.print_report() if __name__ == '__main__': main()