Files
Fanjun Kong 72f2e1a897 just for testing
Signed-off-by: Fanjun Kong <kongfanjun@iscas.ac.cn>
2026-01-30 16:04:18 +08:00

265 lines
9.0 KiB
Python

#!/usr/bin/env python3
"""
RPM 安全扫描结果解析与报告生成
"""
import sqlite3
import json
import sys
from pathlib import Path
from collections import defaultdict
class ScanReporter:
def __init__(self, db_path):
self.db = sqlite3.connect(db_path)
self.db.row_factory = sqlite3.Row
def parse_checksec_results(self, results_dir):
"""解析 checksec JSON 结果并入库"""
results_dir = Path(results_dir)
for json_file in results_dir.glob("*.json"):
try:
with open(json_file) as f:
data = json.load(f)
pkg_name = json_file.stem
# 获取 package_id
cur = self.db.execute(
"SELECT id FROM packages WHERE name=? AND status='success'",
(pkg_name.split('-')[0],)
)
row = cur.fetchone()
if not row:
continue
pkg_id = row[0]
# 解析每个文件的结果
for file_path, checks in data.items():
# 插入 binary 记录
cur = self.db.execute(
"INSERT INTO binaries (package_id, file_path, file_type) VALUES (?, ?, ?) RETURNING id",
(pkg_id, file_path, checks.get('type', 'unknown'))
)
binary_id = cur.fetchone()[0]
# 插入安全检查结果
self.db.execute(
"""INSERT INTO security_checks
(binary_id, pie, nx, canary, fortify, relro, bind_now)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
binary_id,
checks.get('pie', 'unknown'),
checks.get('nx', 'unknown'),
checks.get('canary', 'unknown'),
checks.get('fortify', 'unknown'),
checks.get('relro', 'unknown'),
checks.get('bind_now', 'unknown')
)
)
self.db.commit()
except Exception as e:
print(f"解析失败 {json_file}: {e}", file=sys.stderr)
def generate_statistics(self):
"""生成统计报告"""
stats = {}
# 总体统计
cur = self.db.execute("SELECT COUNT(*) FROM packages WHERE status='success'")
stats['total_packages'] = cur.fetchone()[0]
cur = self.db.execute("SELECT COUNT(*) FROM binaries")
stats['total_binaries'] = cur.fetchone()[0]
# 定义每个特性的启用判断规则
feature_rules = {
'pie': {
'enabled': ['PIE Enabled'],
'disabled': ['PIE Disabled'],
'label': 'PIE'
},
'nx': {
'enabled': ['NX enabled', 'NX enabled'],
'disabled': ['NX disabled', 'NX disabled'],
'label': 'NX'
},
'canary': {
'enabled': ['Canary Found'],
'disabled': ['No Canary Found', 'No canary found'],
'label': 'CANARY'
},
'fortify': {
'enabled_values': lambda x: x and x != '0' and x != 'unknown',
'label': 'FORTIFY'
},
'relro': {
'enabled': ['Full RELRO'],
'partial': ['Partial RELRO'],
'disabled': ['No RELRO'],
'label': 'RELRO'
}
}
stats['coverage'] = {}
for feature, rules in feature_rules.items():
cur = self.db.execute(f"""
SELECT
{feature},
COUNT(*) as count
FROM security_checks
GROUP BY {feature}
""")
feature_stats = {}
total = 0
enabled = 0
for row in cur:
value = row[0]
count = row[1]
feature_stats[value] = count
total += count
# 根据规则判断是否启用
if 'enabled' in rules and value in rules['enabled']:
enabled += count
elif 'enabled_values' in rules and rules['enabled_values'](value):
enabled += count
coverage_rate = f"{enabled/total*100:.2f}%" if total > 0 else "0.00%"
stats['coverage'][feature] = {
'label': rules['label'],
'details': feature_stats,
'rate': coverage_rate,
'total': total,
'enabled': enabled
}
return stats
def export_csv(self, output_file):
"""导出 CSV 报告"""
cur = self.db.execute("""
SELECT
p.name,
b.file_path,
s.pie, s.nx, s.canary, s.fortify, s.relro, s.bind_now
FROM packages p
JOIN binaries b ON p.id = b.package_id
JOIN security_checks s ON b.id = s.binary_id
ORDER BY p.name, b.file_path
""")
with open(output_file, 'w') as f:
f.write("Package,File,PIE,NX,Canary,Fortify,RELRO,BIND_NOW\n")
for row in cur:
f.write(','.join(str(x) for x in row) + '\n')
def print_report(self):
"""打印统计报告"""
stats = self.generate_statistics()
# Markdown 格式输出
print("# RPM 安全扫描统计报告")
print()
print(f"**扫描包数量**: {stats['total_packages']}")
print(f"**二进制文件数**: {stats['total_binaries']}")
print()
print("## 安全特性覆盖率")
print()
# 按顺序输出: PIE, NX, CANARY, FORTIFY, RELRO
feature_order = ['pie', 'nx', 'canary', 'fortify', 'relro']
for feature in feature_order:
if feature not in stats['coverage']:
continue
data = stats['coverage'][feature]
label = data['label']
rate = data['rate']
details = data['details']
# 输出标题和覆盖率
print(f"### {label}")
print()
print(f"**覆盖率**: {rate}")
print()
print("| 状态 | 数量 | 百分比 |")
print("|------|------|--------|")
# 输出详细统计(按数量降序排列)
sorted_details = sorted(details.items(), key=lambda x: x[1], reverse=True)
for value, count in sorted_details:
percentage = count * 100 / data['total'] if data['total'] > 0 else 0
print(f"| {value} | {count} | {percentage:.2f}% |")
print()
print("## 综合安全评分")
print()
# 计算综合安全评分
total_binaries = stats['total_binaries']
if total_binaries > 0:
pie_enabled = stats['coverage']['pie']['enabled']
nx_enabled = stats['coverage']['nx']['enabled']
canary_enabled = stats['coverage']['canary']['enabled']
relro_full = stats['coverage']['relro']['enabled']
# 计算所有保护都开启的文件数(取最小值)
fully_protected = min(pie_enabled, nx_enabled, canary_enabled, relro_full)
security_score = fully_protected * 100 / total_binaries
print(f"- **综合安全评分**: {security_score:.2f}%")
print(f"- **完全加固的文件**: {fully_protected}/{total_binaries}")
print(f"- **标准**: PIE + NX + CANARY + Full RELRO")
print()
def main():
# 设置默认数据库路径
default_db = "scan_results.db"
# 解析参数
db_path = default_db
args_start = 0
# 如果第一个参数不是选项,则作为数据库路径
if len(sys.argv) > 1 and not sys.argv[1].startswith('--'):
db_path = sys.argv[1]
args_start = 1
reporter = ScanReporter(db_path)
# 处理选项参数
if '--parse-results' in sys.argv:
idx = sys.argv.index('--parse-results')
# 检查参数是否越界
if idx + 1 >= len(sys.argv):
print("错误: --parse-results 需要指定结果目录路径", file=sys.stderr)
sys.exit(1)
results_dir = sys.argv[idx + 1]
print(f"解析结果目录: {results_dir}")
reporter.parse_checksec_results(results_dir)
if '--export-csv' in sys.argv:
idx = sys.argv.index('--export-csv')
# 检查参数是否越界
if idx + 1 >= len(sys.argv):
print("错误: --export-csv 需要指定输出 CSV 文件路径", file=sys.stderr)
sys.exit(1)
output_file = sys.argv[idx + 1]
print(f"导出 CSV: {output_file}")
reporter.export_csv(output_file)
reporter.print_report()
if __name__ == '__main__':
main()