Files
checksec_script/04_import_results.py
Fanjun Kong 72f2e1a897 just for testing
Signed-off-by: Fanjun Kong <kongfanjun@iscas.ac.cn>
2026-01-30 16:04:18 +08:00

355 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
将扫描结果批量导入到 SQLite 数据库
"""
import sqlite3
import json
import sys
import re
from pathlib import Path
def validate_package_name(name: str) -> bool:
"""
验证包名格式(防止路径遍历和 SQL 注入)
允许字母、数字、点、下划线、加号、减号、波浪号
波浪号用于 RPM 预发布版本(如 2.14~rc1
Args:
name: 包名
Returns:
bool: True=合法, False=非法
"""
if not name:
return False
# 与 shell 脚本的正则表达式保持一致: ^[a-zA-Z0-9._~+-]+$
pattern = r'^[a-zA-Z0-9._~+-]+$'
return re.match(pattern, name) is not None
def create_tables(cursor):
"""
创建数据库表结构
"""
# 创建包表
cursor.execute("""
CREATE TABLE IF NOT EXISTS packages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
status TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 创建二进制文件表
cursor.execute("""
CREATE TABLE IF NOT EXISTS binaries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
package_id INTEGER NOT NULL,
file_path TEXT NOT NULL,
file_type TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (package_id) REFERENCES packages(id)
)
""")
# 创建安全检查结果表
cursor.execute("""
CREATE TABLE IF NOT EXISTS security_checks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
binary_id INTEGER NOT NULL,
pie TEXT,
nx TEXT,
canary TEXT,
fortify TEXT,
relro TEXT,
bind_now TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (binary_id) REFERENCES binaries(id)
)
""")
# 创建索引以提高查询性能
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_packages_status
ON packages(status)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_binaries_package
ON binaries(package_id)
""")
def parse_checksec_json(json_file):
"""
解析新版 checksec JSON 输出(数组格式)
格式:[{"name": "/path", "checks": {...}}, ...]
"""
try:
with open(json_file) as f:
content = f.read()
# 解析 JSON 数组
data = json.loads(content)
# 确保返回的是列表
if isinstance(data, list):
return data
else:
print(f"警告: {json_file} 不是数组格式", file=sys.stderr)
return []
except json.JSONDecodeError as e:
print(f"JSON 解析失败 {json_file}: {e}", file=sys.stderr)
return []
except Exception as e:
print(f"读取失败 {json_file}: {e}", file=sys.stderr)
return []
def parse_checksec_text(json_file):
"""
解析 checksec 输出
checksec --output=json 可能返回的是文本格式,需要兼容处理
"""
try:
with open(json_file) as f:
content = f.read()
# 尝试解析为 JSON
try:
data = json.loads(content)
return data
except json.JSONDecodeError:
# 如果不是 JSON,解析文本格式
results = {}
lines = content.split('\n')
current_file = None
for line in lines:
line = line.strip()
if not line:
continue
# 检测文件名行
if line.startswith('*') or line.startswith('/') or '.rpm' in line:
current_file = line.lstrip('*').strip()
results[current_file] = {}
elif current_file and ':' in line:
# 解析检查项,例如 "PIE: Enabled"
parts = line.split(':', 1)
if len(parts) == 2:
key = parts[0].strip().lower()
value = parts[1].strip()
results[current_file][key] = value
return results
except Exception as e:
print(f"解析失败 {json_file}: {e}", file=sys.stderr)
return {}
def import_results(db_path, results_dir, clean=False):
"""
批量导入扫描结果到数据库
使用事务确保数据一致性
Args:
db_path: 数据库文件路径
results_dir: 结果目录路径
clean: 是否清空旧数据(默认 False)
"""
results_dir = Path(results_dir)
# 连接数据库
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 创建数据库表
create_tables(cursor)
# 如果需要清理旧数据
if clean:
print("清理旧数据...")
cursor.execute("DELETE FROM security_checks")
cursor.execute("DELETE FROM binaries")
cursor.execute("DELETE FROM packages")
print("清理完成")
# 使用事务确保数据一致性
try:
cursor.execute("BEGIN TRANSACTION")
# 读取状态文件
scanned_file = results_dir / "scanned.txt"
success_file = results_dir / "success.txt"
failed_file = results_dir / "failed.txt"
no_binary_file = results_dir / "no_binary.txt"
# 导入失败的包
failed_count = 0
if failed_file.exists():
with open(failed_file) as f:
for line in f:
line = line.strip()
if ':' in line:
pkg_name, status = line.split(':', 1)
# 验证包名
if not validate_package_name(pkg_name):
print(f"警告: 跳过非法包名: {pkg_name}", file=sys.stderr)
continue
cursor.execute(
"INSERT OR REPLACE INTO packages (name, status) VALUES (?, ?)",
(pkg_name, status)
)
failed_count += 1
print(f"导入失败包: {failed_count}")
# 导入无二进制的包
no_binary_count = 0
if no_binary_file.exists():
with open(no_binary_file) as f:
for line in f:
line = line.strip()
if ':' in line:
pkg_name, status = line.split(':', 1)
# 验证包名
if not validate_package_name(pkg_name):
print(f"警告: 跳过非法包名: {pkg_name}", file=sys.stderr)
continue
cursor.execute(
"INSERT OR REPLACE INTO packages (name, status) VALUES (?, ?)",
(pkg_name, 'no_binaries')
)
no_binary_count += 1
print(f"导入无二进制包: {no_binary_count}")
# 导入成功的包
success_count = 0
binary_count = 0
if success_file.exists():
with open(success_file) as f:
for line in f:
line = line.strip()
if ':' in line:
pkg_name, file_count = line.split(':', 1)
# 验证包名
if not validate_package_name(pkg_name):
print(f"警告: 跳过非法包名: {pkg_name}", file=sys.stderr)
continue
# 插入包记录(使用 REPLACE 支持重新导入)
cursor.execute(
"INSERT OR REPLACE INTO packages (name, status) VALUES (?, ?)",
(pkg_name, 'success')
)
pkg_id = cursor.lastrowid
success_count += 1
# 解析 JSON 结果
json_file = results_dir / f"{pkg_name}.json"
if not json_file.exists():
continue
checksec_results = parse_checksec_json(json_file)
# 导入二进制文件和安全检查结果
for item in checksec_results:
if not isinstance(item, dict):
continue
file_path = item.get('name', 'unknown')
checks = item.get('checks', {})
if not checks:
continue
# 插入二进制文件记录
cursor.execute(
"INSERT INTO binaries (package_id, file_path, file_type) VALUES (?, ?, ?)",
(pkg_id, file_path, 'elf')
)
binary_id = cursor.lastrowid
binary_count += 1
# 插入安全检查结果
cursor.execute(
"""INSERT INTO security_checks
(binary_id, pie, nx, canary, fortify, relro, bind_now)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(
binary_id,
str(checks.get('pie', 'unknown')),
str(checks.get('nx', 'unknown')),
str(checks.get('canary', 'unknown')),
str(checks.get('fortified', 'unknown')),
str(checks.get('relro', 'unknown')),
str(checks.get('bind_now', 'unknown'))
)
)
print(f"导入成功包: {success_count}")
# 提交事务
cursor.execute("COMMIT")
print("事务提交成功")
except Exception as e:
# 发生错误时回滚
cursor.execute("ROLLBACK")
print(f"导入失败,已回滚: {e}", file=sys.stderr)
raise
# 打印统计
cursor.execute("SELECT COUNT(*) FROM packages")
total = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM packages WHERE status='success'")
success = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM binaries")
binaries = cursor.fetchone()[0]
print(f"\n数据库统计:")
print(f" 总包数: {total}")
print(f" 成功: {success}")
print(f" 二进制文件: {binaries}")
conn.close()
def main():
# 设置默认值
default_db = "scan_results.db"
default_results = "scan_workspace/results"
# 解析参数
db_path = default_db
results_dir = default_results
clean = False
# 检查 --clean 选项
if '--clean' in sys.argv:
clean = True
# 获取数据库路径(第一个非选项参数)
args = [arg for arg in sys.argv[1:] if not arg.startswith('--')]
if len(args) >= 1:
db_path = args[0]
if len(args) >= 2:
results_dir = args[1]
print(f"导入扫描结果...")
print(f" 数据库: {db_path}")
print(f" 结果目录: {results_dir}")
if clean:
print(f" 模式: 清理旧数据后重新导入")
print()
import_results(db_path, results_dir, clean=clean)
print("\n导入完成!")
print(f"\n下一步:生成报告")
print(f"python3 03_report.py {db_path}")
if __name__ == '__main__':
main()