Grype Script Detallado
Acerca de este Archivo
Ubicación: scripts/generate_grype.py
Módulo importado en: - nbs/vuln/generacion_grype.ipynb (para ejecutar análisis) - Este notebook (para documentación)
Este archivo contiene la clase GrypeAnalyzer, responsable de orquestar el análisis de vulnerabilidades en dependencias sobre múltiples repositorios. Fue diseñado siguiendo el mismo patrón que scripts/generate_codeql.py, proporcionando una interfaz consistente para automatizar análisis de seguridad.
Cómo se Usa en los Notebooks
En generacion_grype.ipynb:
from generate_grype import GrypeAnalyzer
analizador = GrypeAnalyzer(repos_path, output_path)
analizador.run() # Ejecuta todo el flujo de análisisExplicación Detallada: scripts/generate_grype.py
Este notebook documenta el funcionamiento interno y diseño del archivo scripts/generate_grype.py, que implementa un scanner de vulnerabilidades automatizado para múltiples repositorios.
Descripción General
Grype es una herramienta de análisis de seguridad desarrollada por Anchore que permite: - Escanear dependencias en busca de vulnerabilidades conocidas (CVE, GitHub Advisories) - Detectar manifests de dependencias automáticamente - Retornar severidad y versiones de corrección - Generar reportes detallados en JSON
El archivo generate_grype.py automatiza este proceso para múltiples repositorios, similar a cómo generate_codeql.py automatiza el análisis estático de código.
Arquitectura
Clase Principal: GrypeAnalyzer
La clase GrypeAnalyzer es el componente central que coordina todo el flujo de análisis.
class GrypeAnalyzer:
def __init__(self, repos_path: str, output_path: str):
self.repos_path = Path(repos_path) # Directorio con repositorios
self.output_path = Path(output_path) # Directorio de salida para resultados
self.project_root = Path(__file__).resolve().parents[1]
self.grype_bin = "grype" # Nombre ejecutable de Grype
self.dry_run = False # Modo preview sin ejecución
self.grype_path: str | None = None # Ruta completa a ejecutableFlujo de Ejecución
main()
└─> GrypeAnalyzer.run()
├─> discover_repositories() [Descubre repos en data/repos/]
├─> Para cada repositorio:
│ ├─> run_grype(repo_path) [Ejecuta análisis]
│ │ ├─> _detectar_manifests() [Busca package.json, requirements.txt, etc]
│ │ ├─> subprocess: grype <path> --output=json
│ │ └─ Retorna: JSON con vulnerabilidades
│ ├─> parse_grype_output() [Normaliza estructura]
│ └─> save_analysis() [Guarda formatos raw + normalizado]
└─> Resumen final
Métodos Principales
1. discover_repositories()
Propósito: Escanear el directorio data/repos/ para encontrar todos los repositorios disponibles.
Funcionamiento:
def discover_repositories(self) -> list[str]:
# 1. Valida que data/repos/ existe
self._validar_directorio_repos()
# 2. Lista todos los subdirectorios
repositorios = sorted(
str(ruta.relative_to(self.project_root))
for ruta in self.repos_path.iterdir()
if ruta.is_dir() # Solo directorios
)
# 3. Devuelve lista ordenada
return repositoriosEjemplo de salida:
[
'data/repos/claude-code-action',
'data/repos/genai-code-review',
'data/repos/opencode'
]
2. _detectar_manifests(ruta_repo)
Propósito: Detectar manifests de dependencias soportados por Grype.
Estrategia:
def _detectar_manifests(self, ruta_repo: Path) -> list[str]:
# 1. Define manifests soportados por Grype
manifests_soportados = {
"package.json": "npm",
"requirements.txt": "pip",
"pom.xml": "maven",
"build.gradle": "gradle",
"go.mod": "go",
"Cargo.toml": "cargo",
# ... más
}
# 2. Escanea archivos recursivamente
manifests_encontrados = []
for archivo in ruta_repo.rglob("*"): # rglob = recursivo
if archivo.is_file() and archivo.name in manifests_soportados:
manifests_encontrados.append(archivo.name)
# 3. Devuelve lista única ordenada
return sorted(set(manifests_encontrados))Ejemplo: - Entrada: Repo con package.json, package-lock.json, yarn.lock - Salida: ["package-lock.json", "package.json", "yarn.lock"]
Manifests soportados por Grype: - npm: package.json, package-lock.json, yarn.lock - Python: requirements.txt, Pipfile, Pipfile.lock, poetry.lock - Java: pom.xml, build.gradle - Go: go.mod, go.sum - Rust: Cargo.toml, Cargo.lock - Ruby: Gemfile, Gemfile.lock
3. run_grype(repo_path)
Propósito: Ejecutar Grype en el repositorio y retornar JSON con vulnerabilidades.
Funcionamiento:
def run_grype(self, repo_path: str) -> str:
# 1. Validar que repo existe
ruta_repo = self.project_root / repo_path
# ... validaciones ...
# 2. Detectar manifests
manifests = self._detectar_manifests(ruta_repo)
if not manifests:
# Si no hay manifests, retornar JSON vacío
return json.dumps({"matches": [], "source": None})
# 3. Construir comando Grype
comando = [
"grype",
str(ruta_repo), # Ruta a escanear
"--output=json" # Formato JSON
]
# 4. Ejecutar Grype
resultado = subprocess.run(comando, capture_output=True, text=True)
# 5. Retornar JSON string
return resultado.stdoutQué hace Grype: - Busca todos los manifests de dependencias en el directorio - Extrae lista de dependencias y versiones - Compara contra base de datos de vulnerabilidades (actualizada automáticamente) - Retorna matches con CVE, severidad (CVSS score), versiones de corrección
4. parse_grype_output(grype_json_str)
Propósito: Convertir JSON de Grype a un formato normalizado.
Funcionamiento:
def parse_grype_output(self, grype_json_str: str) -> dict:
# 1. Parsear JSON de Grype
grype_data = json.loads(grype_json_str)
# 2. Inicializar estructura de salida
resultados = {
"total_vulnerabilities": 0,
"vulnerabilities_by_severity": {
"critical": 0,
"high": 0,
"medium": 0,
"low": 0
},
"vulnerabilities": [], # Array de hallazgos individuales
"grype_metadata": {} # Información de herramienta y contexto
}
# 3. Procesar cada vulnerabilidad
vulnerabilidades_raw = grype_data.get("matches", [])
for vuln in vulnerabilidades_raw:
# Convertir a formato normalizado
issue = self._procesar_vulnerabilidad_grype(vuln)
resultados["vulnerabilities"].append(issue)
resultados["total_vulnerabilities"] += 1
# Contar por severidad
severity = issue.get("vuln_severity", "low")
resultados["vulnerabilities_by_severity"][severity] += 1
return resultadosSalida normalizada:
{
"total_vulnerabilities": 45,
"vulnerabilities_by_severity": {
"critical": 2,
"high": 8,
"medium": 20,
"low": 15
},
"vulnerabilities": [
{
"package_name": "lodash",
"current_version": "4.17.20",
"vuln_id": "CVE-2021-23337",
"vuln_severity": "high",
"fix_version": "4.17.21",
"message": "Prototype pollution in lodash...",
"cvss_score": 7.5
}
]
}5. _procesar_vulnerabilidad_grype(vuln)
Propósito: Convertir un match individual de Grype a formato normalizado.
Funcionamiento:
def _procesar_vulnerabilidad_grype(self, vuln: dict) -> dict:
# Extraer información del match
artifact = vuln.get("artifact", {}) # Paquete afectado
vulnerability = vuln.get("vulnerability", {}) # Detalles de CVE
metadata = vuln.get("metadata", {}) # CVSS score, CWE, etc
# Mapear CVSS score a severidad
cvss_score = metadata.get("cvss", [{}])[0].get("score", 0)
severity = self._determinar_severidad_por_cvss(cvss_score)
# Retornar estructura normalizada
return {
"package_name": artifact.get("name"), # ej: "lodash"
"current_version": artifact.get("version"), # ej: "4.17.20"
"vuln_id": vulnerability.get("id"), # ej: "CVE-2021-23337"
"vuln_severity": severity, # critical/high/medium/low
"fix_version": vuln.get("fix", {}).get("versions", ["N/A"])[0],
"message": vulnerability.get("description"),
"cvss_score": cvss_score
}Mapeo CVSS → Severidad: - CVSS ≥ 9.0 → critical - CVSS 7.0-8.9 → high - CVSS 4.0-6.9 → medium - CVSS < 4.0 → low
6. save_analysis(repo_name, grype_raw, analysis_data)
Propósito: Guardar el análisis en dos formatos: raw (debug) y normalizado (análisis).
Funcionamiento:
def save_analysis(self, repo_name: str, grype_raw: str, analysis_data: dict) -> tuple[Path, Path]:
# 1. Validar entrada
if not repo_name:
raise ValueError("Repository name cannot be empty.")
# 2. Crear directorio de salida si no existe
self.output_path.mkdir(parents=True, exist_ok=True)
# 3. Guardar formato RAW (original de Grype, para debug)
ruta_raw = self.output_path / f"{repo_name}-grype-raw.json"
ruta_raw.write_text(grype_raw, encoding="utf-8")
# 4. Guardar formato NORMALIZADO (para análisis)
ruta_normalizado = self.output_path / f"{repo_name}-grype.json"
contenido_json = json.dumps(analysis_data, ensure_ascii=False, indent=2)
ruta_normalizado.write_text(contenido_json, encoding="utf-8")
# 5. Retornar ambas rutas
return ruta_raw, ruta_normalizadoArchivos generados por repositorio:
data/results/
├── opencode-grype-raw.json ← Salida original de Grype (debug visual)
├── opencode-grype.json ← Formato normalizado (análisis)
├── claude-code-action-grype-raw.json
├── claude-code-action-grype.json
└── ...
7. run()
Propósito: Método principal que coordina el análisis de todos los repositorios.
Lógica:
def run(self):
# 1. Descubrir repositorios
repositorios = self.discover_repositories()
# 2. Para cada repositorio:
for indice, repo_path in enumerate(repositorios, start=1):
# Log de progreso: [1/3] Scanning data/repos/opencode...
LOGGER.info(f"[{indice}/{len(repositorios)}] ...")
if self.dry_run:
# Mostrar qué se haría sin ejecutar
continue
try:
# 3. Ejecutar análisis
grype_output = self.run_grype(repo_path)
# 4. Procesar resultados
analysis = self.parse_grype_output(grype_output)
# 5. Guardar ambos formatos
self.save_analysis(ruta_repo.name, grype_output, analysis)
repositorios_analizados += 1
except Exception as error:
# Manejar errores, limpiar, y continuar
errores += 1
self._eliminar_archivos_parciales(ruta_repo.name)
LOGGER.error(f"Error scanning {repo_path}: {error}")
# 6. Resumen final
LOGGER.info(f"Summary | total_repos={len(repos)} | ...")Salida típica:
INFO | [1/3] Scanning data/repos/claude-code-action with Grype...
INFO | Manifests found in claude-code-action: package.json, package-lock.json
INFO | Running Grype on claude-code-action...
INFO | Found 0 matches in Grype output
INFO | Raw Grype output saved to data/results/claude-code-action-grype-raw.json
INFO | Normalized analysis saved to data/results/claude-code-action-grype.json
INFO | [2/3] Scanning data/repos/opencode with Grype...
INFO | Manifests found in opencode: package.json, package-lock.json, yarn.lock
INFO | Found 45 matches in Grype output
INFO | ...
INFO | Summary | total_repos=3 | repos_scanned=3 | files_generated=6 | errors=0
Manejo de Errores
Niveles de Validación
- Directorios:
_validar_directorio_repos(): Verifica quedata/repos/existe y es directorio_validar_directorio_salida(): Verifica que ruta de salida es válida
- Repositorio Individual:
- Existe:
FileNotFoundError - Es directorio:
NotADirectoryError - No está vacío:
ValueError
- Existe:
- Análisis:
- Grype instalado:
RuntimeErrorcon mensaje instructivo - Grype ejecutable:
RuntimeErrorsi falla subprocess - JSON válido:
RuntimeErrorsi output no es parseable
- Grype instalado:
Recuperación de Errores
try:
# Intentar análisis
grype_output = self.run_grype(repo_path)
except Exception as error:
# 1. Limpiar archivos parciales
self._eliminar_archivos_parciales(ruta_repo.name)
# 2. Registrar error
LOGGER.error(f"Error: {error}")
# 3. Continuar con siguiente repo
errores += 1Uso del Script
Ejecución Básica
python scripts/generate_grype.pyVista Previa sin Ejecutar (Dry-run)
python scripts/generate_grype.py --dry-runSalida:
INFO | [1/3] Dry-run: would scan data/repos/claude-code-action
INFO | [2/3] Dry-run: would scan data/repos/genai-code-review
INFO | [3/3] Dry-run: would scan data/repos/opencode
Con Rutas Personalizadas
python scripts/generate_grype.py \
--repos-path /path/to/repos \
--output-path /path/to/outputDiagnóstico del Entorno
python scripts/generate_grype.py --diagnoseArgumentos
--repos-path: Directorio con repositorios (default:data/repos)--output-path: Directorio de salida (default:data/results)--dry-run: Mostrar qué se haría sin ejecutar--diagnose: Verificar disponibilidad de Grype y su DB
Esquema de Salida JSON
Archivo Normalizado: {repo}-grype.json
{
"total_vulnerabilities": 45,
"vulnerabilities_by_severity": {
"critical": 2,
"high": 8,
"medium": 20,
"low": 15
},
"vulnerabilities": [
{
"package_name": "lodash",
"current_version": "4.17.20",
"vuln_id": "CVE-2021-23337",
"vuln_severity": "high",
"fix_version": "4.17.21",
"message": "Prototype pollution vulnerability in lodash library",
"cwe": "CWE-1321",
"cvss_score": 7.4,
"type": "vulnerability"
},
{
"package_name": "express",
"current_version": "4.16.2",
"vuln_id": "CVE-2022-24999",
"vuln_severity": "medium",
"fix_version": "4.18.1",
"message": "Denial of Service via regular expression in Express query parser",
"cwe": "CWE-1333",
"cvss_score": 5.3,
"type": "vulnerability"
}
],
"grype_metadata": {
"grype_version": "2.1.0",
"db_location": "/home/user/.grype/db",
"scanned_path": "/home/user/project/data/repos/opencode"
}
}Archivo Raw (Debug): {repo}-grype-raw.json
Contiene la salida original de Grype sin procesamiento. Útil para: - Debug visual y verificación - Análisis directo del formato de Grype - Referencia de estructura completa
Ejemplo snippet:
{
"formatVersion": "2.1.0",
"source": {
"target": "/home/user/project/data/repos/opencode",
"type": "directory",
"dbPath": "/home/user/.grype/db"
},
"matches": [
{
"vulnerability": {
"id": "CVE-2021-23337",
"description": "Prototype pollution...",
"cvssV3": { "score": 7.4 }
},
"artifact": {
"name": "lodash",
"version": "4.17.20"
}
}
]
}Resumen: scripts/generate_grype.py
El archivo generate_grype.py implementa un scanner de vulnerabilidades robusto que:
- Descubre automáticamente múltiples repositorios
- Detecta manifests de dependencias (package.json, requirements.txt, pom.xml, etc)
- Ejecuta Grype CLI para encontrar vulnerabilidades
- Normaliza salida JSON a formato consistente
- Guarda dos versiones: raw (debug) y normalizado (análisis)
- Maneja errores elegantemente y continúa procesando
- Proporciona resumen de estadísticas de vulnerabilidades