72f2e1a897
Signed-off-by: Fanjun Kong <kongfanjun@iscas.ac.cn>
355 lines
12 KiB
Python
355 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
将扫描结果批量导入到 SQLite 数据库
|
||
"""
|
||
import sqlite3
|
||
import json
|
||
import sys
|
||
import re
|
||
from pathlib import Path
|
||
|
||
def validate_package_name(name: str) -> bool:
|
||
"""
|
||
验证包名格式(防止路径遍历和 SQL 注入)
|
||
允许字母、数字、点、下划线、加号、减号、波浪号
|
||
波浪号用于 RPM 预发布版本(如 2.14~rc1)
|
||
|
||
Args:
|
||
name: 包名
|
||
|
||
Returns:
|
||
bool: True=合法, False=非法
|
||
"""
|
||
if not name:
|
||
return False
|
||
# 与 shell 脚本的正则表达式保持一致: ^[a-zA-Z0-9._~+-]+$
|
||
pattern = r'^[a-zA-Z0-9._~+-]+$'
|
||
return re.match(pattern, name) is not None
|
||
|
||
def create_tables(cursor):
|
||
"""
|
||
创建数据库表结构
|
||
"""
|
||
# 创建包表
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS packages (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
name TEXT NOT NULL UNIQUE,
|
||
status TEXT NOT NULL,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
""")
|
||
|
||
# 创建二进制文件表
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS binaries (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
package_id INTEGER NOT NULL,
|
||
file_path TEXT NOT NULL,
|
||
file_type TEXT,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (package_id) REFERENCES packages(id)
|
||
)
|
||
""")
|
||
|
||
# 创建安全检查结果表
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS security_checks (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
binary_id INTEGER NOT NULL,
|
||
pie TEXT,
|
||
nx TEXT,
|
||
canary TEXT,
|
||
fortify TEXT,
|
||
relro TEXT,
|
||
bind_now TEXT,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (binary_id) REFERENCES binaries(id)
|
||
)
|
||
""")
|
||
|
||
# 创建索引以提高查询性能
|
||
cursor.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_packages_status
|
||
ON packages(status)
|
||
""")
|
||
|
||
cursor.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_binaries_package
|
||
ON binaries(package_id)
|
||
""")
|
||
|
||
def parse_checksec_json(json_file):
|
||
"""
|
||
解析新版 checksec JSON 输出(数组格式)
|
||
格式:[{"name": "/path", "checks": {...}}, ...]
|
||
"""
|
||
try:
|
||
with open(json_file) as f:
|
||
content = f.read()
|
||
|
||
# 解析 JSON 数组
|
||
data = json.loads(content)
|
||
|
||
# 确保返回的是列表
|
||
if isinstance(data, list):
|
||
return data
|
||
else:
|
||
print(f"警告: {json_file} 不是数组格式", file=sys.stderr)
|
||
return []
|
||
|
||
except json.JSONDecodeError as e:
|
||
print(f"JSON 解析失败 {json_file}: {e}", file=sys.stderr)
|
||
return []
|
||
except Exception as e:
|
||
print(f"读取失败 {json_file}: {e}", file=sys.stderr)
|
||
return []
|
||
|
||
def parse_checksec_text(json_file):
|
||
"""
|
||
解析 checksec 输出
|
||
checksec --output=json 可能返回的是文本格式,需要兼容处理
|
||
"""
|
||
try:
|
||
with open(json_file) as f:
|
||
content = f.read()
|
||
|
||
# 尝试解析为 JSON
|
||
try:
|
||
data = json.loads(content)
|
||
return data
|
||
except json.JSONDecodeError:
|
||
# 如果不是 JSON,解析文本格式
|
||
results = {}
|
||
lines = content.split('\n')
|
||
current_file = None
|
||
|
||
for line in lines:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
|
||
# 检测文件名行
|
||
if line.startswith('*') or line.startswith('/') or '.rpm' in line:
|
||
current_file = line.lstrip('*').strip()
|
||
results[current_file] = {}
|
||
elif current_file and ':' in line:
|
||
# 解析检查项,例如 "PIE: Enabled"
|
||
parts = line.split(':', 1)
|
||
if len(parts) == 2:
|
||
key = parts[0].strip().lower()
|
||
value = parts[1].strip()
|
||
results[current_file][key] = value
|
||
|
||
return results
|
||
except Exception as e:
|
||
print(f"解析失败 {json_file}: {e}", file=sys.stderr)
|
||
return {}
|
||
|
||
def import_results(db_path, results_dir, clean=False):
|
||
"""
|
||
批量导入扫描结果到数据库
|
||
使用事务确保数据一致性
|
||
|
||
Args:
|
||
db_path: 数据库文件路径
|
||
results_dir: 结果目录路径
|
||
clean: 是否清空旧数据(默认 False)
|
||
"""
|
||
results_dir = Path(results_dir)
|
||
|
||
# 连接数据库
|
||
conn = sqlite3.connect(db_path)
|
||
cursor = conn.cursor()
|
||
|
||
# 创建数据库表
|
||
create_tables(cursor)
|
||
|
||
# 如果需要清理旧数据
|
||
if clean:
|
||
print("清理旧数据...")
|
||
cursor.execute("DELETE FROM security_checks")
|
||
cursor.execute("DELETE FROM binaries")
|
||
cursor.execute("DELETE FROM packages")
|
||
print("清理完成")
|
||
|
||
# 使用事务确保数据一致性
|
||
try:
|
||
cursor.execute("BEGIN TRANSACTION")
|
||
|
||
# 读取状态文件
|
||
scanned_file = results_dir / "scanned.txt"
|
||
success_file = results_dir / "success.txt"
|
||
failed_file = results_dir / "failed.txt"
|
||
no_binary_file = results_dir / "no_binary.txt"
|
||
|
||
# 导入失败的包
|
||
failed_count = 0
|
||
if failed_file.exists():
|
||
with open(failed_file) as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if ':' in line:
|
||
pkg_name, status = line.split(':', 1)
|
||
# 验证包名
|
||
if not validate_package_name(pkg_name):
|
||
print(f"警告: 跳过非法包名: {pkg_name}", file=sys.stderr)
|
||
continue
|
||
cursor.execute(
|
||
"INSERT OR REPLACE INTO packages (name, status) VALUES (?, ?)",
|
||
(pkg_name, status)
|
||
)
|
||
failed_count += 1
|
||
print(f"导入失败包: {failed_count} 个")
|
||
|
||
# 导入无二进制的包
|
||
no_binary_count = 0
|
||
if no_binary_file.exists():
|
||
with open(no_binary_file) as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if ':' in line:
|
||
pkg_name, status = line.split(':', 1)
|
||
# 验证包名
|
||
if not validate_package_name(pkg_name):
|
||
print(f"警告: 跳过非法包名: {pkg_name}", file=sys.stderr)
|
||
continue
|
||
cursor.execute(
|
||
"INSERT OR REPLACE INTO packages (name, status) VALUES (?, ?)",
|
||
(pkg_name, 'no_binaries')
|
||
)
|
||
no_binary_count += 1
|
||
print(f"导入无二进制包: {no_binary_count} 个")
|
||
|
||
# 导入成功的包
|
||
success_count = 0
|
||
binary_count = 0
|
||
if success_file.exists():
|
||
with open(success_file) as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if ':' in line:
|
||
pkg_name, file_count = line.split(':', 1)
|
||
|
||
# 验证包名
|
||
if not validate_package_name(pkg_name):
|
||
print(f"警告: 跳过非法包名: {pkg_name}", file=sys.stderr)
|
||
continue
|
||
|
||
# 插入包记录(使用 REPLACE 支持重新导入)
|
||
cursor.execute(
|
||
"INSERT OR REPLACE INTO packages (name, status) VALUES (?, ?)",
|
||
(pkg_name, 'success')
|
||
)
|
||
pkg_id = cursor.lastrowid
|
||
success_count += 1
|
||
|
||
# 解析 JSON 结果
|
||
json_file = results_dir / f"{pkg_name}.json"
|
||
if not json_file.exists():
|
||
continue
|
||
|
||
checksec_results = parse_checksec_json(json_file)
|
||
|
||
# 导入二进制文件和安全检查结果
|
||
for item in checksec_results:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
|
||
file_path = item.get('name', 'unknown')
|
||
checks = item.get('checks', {})
|
||
|
||
if not checks:
|
||
continue
|
||
|
||
# 插入二进制文件记录
|
||
cursor.execute(
|
||
"INSERT INTO binaries (package_id, file_path, file_type) VALUES (?, ?, ?)",
|
||
(pkg_id, file_path, 'elf')
|
||
)
|
||
binary_id = cursor.lastrowid
|
||
binary_count += 1
|
||
|
||
# 插入安全检查结果
|
||
cursor.execute(
|
||
"""INSERT INTO security_checks
|
||
(binary_id, pie, nx, canary, fortify, relro, bind_now)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
||
(
|
||
binary_id,
|
||
str(checks.get('pie', 'unknown')),
|
||
str(checks.get('nx', 'unknown')),
|
||
str(checks.get('canary', 'unknown')),
|
||
str(checks.get('fortified', 'unknown')),
|
||
str(checks.get('relro', 'unknown')),
|
||
str(checks.get('bind_now', 'unknown'))
|
||
)
|
||
)
|
||
|
||
print(f"导入成功包: {success_count} 个")
|
||
|
||
# 提交事务
|
||
cursor.execute("COMMIT")
|
||
print("事务提交成功")
|
||
|
||
except Exception as e:
|
||
# 发生错误时回滚
|
||
cursor.execute("ROLLBACK")
|
||
print(f"导入失败,已回滚: {e}", file=sys.stderr)
|
||
raise
|
||
|
||
# 打印统计
|
||
cursor.execute("SELECT COUNT(*) FROM packages")
|
||
total = cursor.fetchone()[0]
|
||
|
||
cursor.execute("SELECT COUNT(*) FROM packages WHERE status='success'")
|
||
success = cursor.fetchone()[0]
|
||
|
||
cursor.execute("SELECT COUNT(*) FROM binaries")
|
||
binaries = cursor.fetchone()[0]
|
||
|
||
print(f"\n数据库统计:")
|
||
print(f" 总包数: {total}")
|
||
print(f" 成功: {success}")
|
||
print(f" 二进制文件: {binaries}")
|
||
|
||
conn.close()
|
||
|
||
def main():
|
||
# 设置默认值
|
||
default_db = "scan_results.db"
|
||
default_results = "scan_workspace/results"
|
||
|
||
# 解析参数
|
||
db_path = default_db
|
||
results_dir = default_results
|
||
clean = False
|
||
|
||
# 检查 --clean 选项
|
||
if '--clean' in sys.argv:
|
||
clean = True
|
||
|
||
# 获取数据库路径(第一个非选项参数)
|
||
args = [arg for arg in sys.argv[1:] if not arg.startswith('--')]
|
||
if len(args) >= 1:
|
||
db_path = args[0]
|
||
|
||
if len(args) >= 2:
|
||
results_dir = args[1]
|
||
|
||
print(f"导入扫描结果...")
|
||
print(f" 数据库: {db_path}")
|
||
print(f" 结果目录: {results_dir}")
|
||
if clean:
|
||
print(f" 模式: 清理旧数据后重新导入")
|
||
print()
|
||
|
||
import_results(db_path, results_dir, clean=clean)
|
||
|
||
print("\n导入完成!")
|
||
print(f"\n下一步:生成报告")
|
||
print(f"python3 03_report.py {db_path}")
|
||
|
||
if __name__ == '__main__':
|
||
main()
|