72f2e1a897
Signed-off-by: Fanjun Kong <kongfanjun@iscas.ac.cn>
265 lines
9.0 KiB
Python
265 lines
9.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
RPM 安全扫描结果解析与报告生成
|
|
"""
|
|
import sqlite3
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
|
|
class ScanReporter:
|
|
def __init__(self, db_path):
|
|
self.db = sqlite3.connect(db_path)
|
|
self.db.row_factory = sqlite3.Row
|
|
|
|
def parse_checksec_results(self, results_dir):
|
|
"""解析 checksec JSON 结果并入库"""
|
|
results_dir = Path(results_dir)
|
|
|
|
for json_file in results_dir.glob("*.json"):
|
|
try:
|
|
with open(json_file) as f:
|
|
data = json.load(f)
|
|
|
|
pkg_name = json_file.stem
|
|
|
|
# 获取 package_id
|
|
cur = self.db.execute(
|
|
"SELECT id FROM packages WHERE name=? AND status='success'",
|
|
(pkg_name.split('-')[0],)
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
continue
|
|
|
|
pkg_id = row[0]
|
|
|
|
# 解析每个文件的结果
|
|
for file_path, checks in data.items():
|
|
# 插入 binary 记录
|
|
cur = self.db.execute(
|
|
"INSERT INTO binaries (package_id, file_path, file_type) VALUES (?, ?, ?) RETURNING id",
|
|
(pkg_id, file_path, checks.get('type', 'unknown'))
|
|
)
|
|
binary_id = cur.fetchone()[0]
|
|
|
|
# 插入安全检查结果
|
|
self.db.execute(
|
|
"""INSERT INTO security_checks
|
|
(binary_id, pie, nx, canary, fortify, relro, bind_now)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
binary_id,
|
|
checks.get('pie', 'unknown'),
|
|
checks.get('nx', 'unknown'),
|
|
checks.get('canary', 'unknown'),
|
|
checks.get('fortify', 'unknown'),
|
|
checks.get('relro', 'unknown'),
|
|
checks.get('bind_now', 'unknown')
|
|
)
|
|
)
|
|
|
|
self.db.commit()
|
|
except Exception as e:
|
|
print(f"解析失败 {json_file}: {e}", file=sys.stderr)
|
|
|
|
def generate_statistics(self):
|
|
"""生成统计报告"""
|
|
stats = {}
|
|
|
|
# 总体统计
|
|
cur = self.db.execute("SELECT COUNT(*) FROM packages WHERE status='success'")
|
|
stats['total_packages'] = cur.fetchone()[0]
|
|
|
|
cur = self.db.execute("SELECT COUNT(*) FROM binaries")
|
|
stats['total_binaries'] = cur.fetchone()[0]
|
|
|
|
# 定义每个特性的启用判断规则
|
|
feature_rules = {
|
|
'pie': {
|
|
'enabled': ['PIE Enabled'],
|
|
'disabled': ['PIE Disabled'],
|
|
'label': 'PIE'
|
|
},
|
|
'nx': {
|
|
'enabled': ['NX enabled', 'NX enabled'],
|
|
'disabled': ['NX disabled', 'NX disabled'],
|
|
'label': 'NX'
|
|
},
|
|
'canary': {
|
|
'enabled': ['Canary Found'],
|
|
'disabled': ['No Canary Found', 'No canary found'],
|
|
'label': 'CANARY'
|
|
},
|
|
'fortify': {
|
|
'enabled_values': lambda x: x and x != '0' and x != 'unknown',
|
|
'label': 'FORTIFY'
|
|
},
|
|
'relro': {
|
|
'enabled': ['Full RELRO'],
|
|
'partial': ['Partial RELRO'],
|
|
'disabled': ['No RELRO'],
|
|
'label': 'RELRO'
|
|
}
|
|
}
|
|
|
|
stats['coverage'] = {}
|
|
|
|
for feature, rules in feature_rules.items():
|
|
cur = self.db.execute(f"""
|
|
SELECT
|
|
{feature},
|
|
COUNT(*) as count
|
|
FROM security_checks
|
|
GROUP BY {feature}
|
|
""")
|
|
|
|
feature_stats = {}
|
|
total = 0
|
|
enabled = 0
|
|
|
|
for row in cur:
|
|
value = row[0]
|
|
count = row[1]
|
|
feature_stats[value] = count
|
|
total += count
|
|
|
|
# 根据规则判断是否启用
|
|
if 'enabled' in rules and value in rules['enabled']:
|
|
enabled += count
|
|
elif 'enabled_values' in rules and rules['enabled_values'](value):
|
|
enabled += count
|
|
|
|
coverage_rate = f"{enabled/total*100:.2f}%" if total > 0 else "0.00%"
|
|
|
|
stats['coverage'][feature] = {
|
|
'label': rules['label'],
|
|
'details': feature_stats,
|
|
'rate': coverage_rate,
|
|
'total': total,
|
|
'enabled': enabled
|
|
}
|
|
|
|
return stats
|
|
|
|
def export_csv(self, output_file):
|
|
"""导出 CSV 报告"""
|
|
cur = self.db.execute("""
|
|
SELECT
|
|
p.name,
|
|
b.file_path,
|
|
s.pie, s.nx, s.canary, s.fortify, s.relro, s.bind_now
|
|
FROM packages p
|
|
JOIN binaries b ON p.id = b.package_id
|
|
JOIN security_checks s ON b.id = s.binary_id
|
|
ORDER BY p.name, b.file_path
|
|
""")
|
|
|
|
with open(output_file, 'w') as f:
|
|
f.write("Package,File,PIE,NX,Canary,Fortify,RELRO,BIND_NOW\n")
|
|
for row in cur:
|
|
f.write(','.join(str(x) for x in row) + '\n')
|
|
|
|
def print_report(self):
|
|
"""打印统计报告"""
|
|
stats = self.generate_statistics()
|
|
|
|
# Markdown 格式输出
|
|
print("# RPM 安全扫描统计报告")
|
|
print()
|
|
print(f"**扫描包数量**: {stats['total_packages']}")
|
|
print(f"**二进制文件数**: {stats['total_binaries']}")
|
|
print()
|
|
print("## 安全特性覆盖率")
|
|
print()
|
|
|
|
# 按顺序输出: PIE, NX, CANARY, FORTIFY, RELRO
|
|
feature_order = ['pie', 'nx', 'canary', 'fortify', 'relro']
|
|
|
|
for feature in feature_order:
|
|
if feature not in stats['coverage']:
|
|
continue
|
|
|
|
data = stats['coverage'][feature]
|
|
label = data['label']
|
|
rate = data['rate']
|
|
details = data['details']
|
|
|
|
# 输出标题和覆盖率
|
|
print(f"### {label}")
|
|
print()
|
|
print(f"**覆盖率**: {rate}")
|
|
print()
|
|
print("| 状态 | 数量 | 百分比 |")
|
|
print("|------|------|--------|")
|
|
|
|
# 输出详细统计(按数量降序排列)
|
|
sorted_details = sorted(details.items(), key=lambda x: x[1], reverse=True)
|
|
for value, count in sorted_details:
|
|
percentage = count * 100 / data['total'] if data['total'] > 0 else 0
|
|
print(f"| {value} | {count} | {percentage:.2f}% |")
|
|
|
|
print()
|
|
|
|
print("## 综合安全评分")
|
|
print()
|
|
|
|
# 计算综合安全评分
|
|
total_binaries = stats['total_binaries']
|
|
if total_binaries > 0:
|
|
pie_enabled = stats['coverage']['pie']['enabled']
|
|
nx_enabled = stats['coverage']['nx']['enabled']
|
|
canary_enabled = stats['coverage']['canary']['enabled']
|
|
relro_full = stats['coverage']['relro']['enabled']
|
|
|
|
# 计算所有保护都开启的文件数(取最小值)
|
|
fully_protected = min(pie_enabled, nx_enabled, canary_enabled, relro_full)
|
|
security_score = fully_protected * 100 / total_binaries
|
|
|
|
print(f"- **综合安全评分**: {security_score:.2f}%")
|
|
print(f"- **完全加固的文件**: {fully_protected}/{total_binaries}")
|
|
print(f"- **标准**: PIE + NX + CANARY + Full RELRO")
|
|
print()
|
|
|
|
def main():
|
|
# 设置默认数据库路径
|
|
default_db = "scan_results.db"
|
|
|
|
# 解析参数
|
|
db_path = default_db
|
|
args_start = 0
|
|
|
|
# 如果第一个参数不是选项,则作为数据库路径
|
|
if len(sys.argv) > 1 and not sys.argv[1].startswith('--'):
|
|
db_path = sys.argv[1]
|
|
args_start = 1
|
|
|
|
reporter = ScanReporter(db_path)
|
|
|
|
# 处理选项参数
|
|
if '--parse-results' in sys.argv:
|
|
idx = sys.argv.index('--parse-results')
|
|
# 检查参数是否越界
|
|
if idx + 1 >= len(sys.argv):
|
|
print("错误: --parse-results 需要指定结果目录路径", file=sys.stderr)
|
|
sys.exit(1)
|
|
results_dir = sys.argv[idx + 1]
|
|
print(f"解析结果目录: {results_dir}")
|
|
reporter.parse_checksec_results(results_dir)
|
|
|
|
if '--export-csv' in sys.argv:
|
|
idx = sys.argv.index('--export-csv')
|
|
# 检查参数是否越界
|
|
if idx + 1 >= len(sys.argv):
|
|
print("错误: --export-csv 需要指定输出 CSV 文件路径", file=sys.stderr)
|
|
sys.exit(1)
|
|
output_file = sys.argv[idx + 1]
|
|
print(f"导出 CSV: {output_file}")
|
|
reporter.export_csv(output_file)
|
|
|
|
reporter.print_report()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|