Grype Script Detallado

Acerca de este Archivo

Ubicación: scripts/generate_grype.py
Módulo importado en: - nbs/vuln/generacion_grype.ipynb (para ejecutar análisis) - Este notebook (para documentación)

Este archivo contiene la clase GrypeAnalyzer, responsable de orquestar el análisis de vulnerabilidades en dependencias sobre múltiples repositorios. Fue diseñado siguiendo el mismo patrón que scripts/generate_codeql.py, proporcionando una interfaz consistente para automatizar análisis de seguridad.

Cómo se Usa en los Notebooks

En generacion_grype.ipynb:

from generate_grype import GrypeAnalyzer
analizador = GrypeAnalyzer(repos_path, output_path)
analizador.run()  # Ejecuta todo el flujo de análisis

Explicación Detallada: scripts/generate_grype.py

Este notebook documenta el funcionamiento interno y diseño del archivo scripts/generate_grype.py, que implementa un scanner de vulnerabilidades automatizado para múltiples repositorios.

Descripción General

Grype es una herramienta de análisis de seguridad desarrollada por Anchore que permite: - Escanear dependencias en busca de vulnerabilidades conocidas (CVE, GitHub Advisories) - Detectar manifests de dependencias automáticamente - Retornar severidad y versiones de corrección - Generar reportes detallados en JSON

El archivo generate_grype.py automatiza este proceso para múltiples repositorios, similar a cómo generate_codeql.py automatiza el análisis estático de código.

Arquitectura

Clase Principal: GrypeAnalyzer

La clase GrypeAnalyzer es el componente central que coordina todo el flujo de análisis.

class GrypeAnalyzer:
    def __init__(self, repos_path: str, output_path: str):
        self.repos_path = Path(repos_path)          # Directorio con repositorios
        self.output_path = Path(output_path)        # Directorio de salida para resultados
        self.project_root = Path(__file__).resolve().parents[1]
        self.grype_bin = "grype"                    # Nombre ejecutable de Grype
        self.dry_run = False                         # Modo preview sin ejecución
        self.grype_path: str | None = None          # Ruta completa a ejecutable

Flujo de Ejecución

main()
  └─> GrypeAnalyzer.run()
       ├─> discover_repositories()                   [Descubre repos en data/repos/]
       ├─> Para cada repositorio:
       │   ├─> run_grype(repo_path)                [Ejecuta análisis]
       │   │   ├─> _detectar_manifests()          [Busca package.json, requirements.txt, etc]
       │   │   ├─> subprocess: grype <path> --output=json
       │   │   └─ Retorna: JSON con vulnerabilidades
       │   ├─> parse_grype_output()                 [Normaliza estructura]
       │   └─> save_analysis()                      [Guarda formatos raw + normalizado]
       └─> Resumen final

Métodos Principales

1. discover_repositories()

Propósito: Escanear el directorio data/repos/ para encontrar todos los repositorios disponibles.

Funcionamiento:

def discover_repositories(self) -> list[str]:
    # 1. Valida que data/repos/ existe
    self._validar_directorio_repos()
    
    # 2. Lista todos los subdirectorios
    repositorios = sorted(
        str(ruta.relative_to(self.project_root))
        for ruta in self.repos_path.iterdir()
        if ruta.is_dir()  # Solo directorios
    )
    
    # 3. Devuelve lista ordenada
    return repositorios

Ejemplo de salida:

[
    'data/repos/claude-code-action',
    'data/repos/genai-code-review',
    'data/repos/opencode'
]

2. _detectar_manifests(ruta_repo)

Propósito: Detectar manifests de dependencias soportados por Grype.

Estrategia:

def _detectar_manifests(self, ruta_repo: Path) -> list[str]:
    # 1. Define manifests soportados por Grype
    manifests_soportados = {
        "package.json": "npm",
        "requirements.txt": "pip",
        "pom.xml": "maven",
        "build.gradle": "gradle",
        "go.mod": "go",
        "Cargo.toml": "cargo",
        # ... más
    }
    
    # 2. Escanea archivos recursivamente
    manifests_encontrados = []
    for archivo in ruta_repo.rglob("*"):  # rglob = recursivo
        if archivo.is_file() and archivo.name in manifests_soportados:
            manifests_encontrados.append(archivo.name)
    
    # 3. Devuelve lista única ordenada
    return sorted(set(manifests_encontrados))

Ejemplo: - Entrada: Repo con package.json, package-lock.json, yarn.lock - Salida: ["package-lock.json", "package.json", "yarn.lock"]

Manifests soportados por Grype: - npm: package.json, package-lock.json, yarn.lock - Python: requirements.txt, Pipfile, Pipfile.lock, poetry.lock - Java: pom.xml, build.gradle - Go: go.mod, go.sum - Rust: Cargo.toml, Cargo.lock - Ruby: Gemfile, Gemfile.lock

3. run_grype(repo_path)

Propósito: Ejecutar Grype en el repositorio y retornar JSON con vulnerabilidades.

Funcionamiento:

def run_grype(self, repo_path: str) -> str:
    # 1. Validar que repo existe
    ruta_repo = self.project_root / repo_path
    # ... validaciones ...
    
    # 2. Detectar manifests
    manifests = self._detectar_manifests(ruta_repo)
    if not manifests:
        # Si no hay manifests, retornar JSON vacío
        return json.dumps({"matches": [], "source": None})
    
    # 3. Construir comando Grype
    comando = [
        "grype",
        str(ruta_repo),           # Ruta a escanear
        "--output=json"           # Formato JSON
    ]
    
    # 4. Ejecutar Grype
    resultado = subprocess.run(comando, capture_output=True, text=True)
    
    # 5. Retornar JSON string
    return resultado.stdout

Qué hace Grype: - Busca todos los manifests de dependencias en el directorio - Extrae lista de dependencias y versiones - Compara contra base de datos de vulnerabilidades (actualizada automáticamente) - Retorna matches con CVE, severidad (CVSS score), versiones de corrección

4. parse_grype_output(grype_json_str)

Propósito: Convertir JSON de Grype a un formato normalizado.

Funcionamiento:

def parse_grype_output(self, grype_json_str: str) -> dict:
    # 1. Parsear JSON de Grype
    grype_data = json.loads(grype_json_str)
    
    # 2. Inicializar estructura de salida
    resultados = {
        "total_vulnerabilities": 0,
        "vulnerabilities_by_severity": {
            "critical": 0,
            "high": 0,
            "medium": 0,
            "low": 0
        },
        "vulnerabilities": [],      # Array de hallazgos individuales
        "grype_metadata": {}         # Información de herramienta y contexto
    }
    
    # 3. Procesar cada vulnerabilidad
    vulnerabilidades_raw = grype_data.get("matches", [])
    for vuln in vulnerabilidades_raw:
        # Convertir a formato normalizado
        issue = self._procesar_vulnerabilidad_grype(vuln)
        resultados["vulnerabilities"].append(issue)
        resultados["total_vulnerabilities"] += 1
        
        # Contar por severidad
        severity = issue.get("vuln_severity", "low")
        resultados["vulnerabilities_by_severity"][severity] += 1
    
    return resultados

Salida normalizada:

{
  "total_vulnerabilities": 45,
  "vulnerabilities_by_severity": {
    "critical": 2,
    "high": 8,
    "medium": 20,
    "low": 15
  },
  "vulnerabilities": [
    {
      "package_name": "lodash",
      "current_version": "4.17.20",
      "vuln_id": "CVE-2021-23337",
      "vuln_severity": "high",
      "fix_version": "4.17.21",
      "message": "Prototype pollution in lodash...",
      "cvss_score": 7.5
    }
  ]
}

5. _procesar_vulnerabilidad_grype(vuln)

Propósito: Convertir un match individual de Grype a formato normalizado.

Funcionamiento:

def _procesar_vulnerabilidad_grype(self, vuln: dict) -> dict:
    # Extraer información del match
    artifact = vuln.get("artifact", {})        # Paquete afectado
    vulnerability = vuln.get("vulnerability", {})  # Detalles de CVE
    metadata = vuln.get("metadata", {})        # CVSS score, CWE, etc
    
    # Mapear CVSS score a severidad
    cvss_score = metadata.get("cvss", [{}])[0].get("score", 0)
    severity = self._determinar_severidad_por_cvss(cvss_score)
    
    # Retornar estructura normalizada
    return {
        "package_name": artifact.get("name"),       # ej: "lodash"
        "current_version": artifact.get("version"), # ej: "4.17.20"
        "vuln_id": vulnerability.get("id"),        # ej: "CVE-2021-23337"
        "vuln_severity": severity,                  # critical/high/medium/low
        "fix_version": vuln.get("fix", {}).get("versions", ["N/A"])[0],
        "message": vulnerability.get("description"),
        "cvss_score": cvss_score
    }

Mapeo CVSS → Severidad: - CVSS ≥ 9.0 → critical - CVSS 7.0-8.9 → high - CVSS 4.0-6.9 → medium - CVSS < 4.0 → low

6. save_analysis(repo_name, grype_raw, analysis_data)

Propósito: Guardar el análisis en dos formatos: raw (debug) y normalizado (análisis).

Funcionamiento:

def save_analysis(self, repo_name: str, grype_raw: str, analysis_data: dict) -> tuple[Path, Path]:
    # 1. Validar entrada
    if not repo_name:
        raise ValueError("Repository name cannot be empty.")
    
    # 2. Crear directorio de salida si no existe
    self.output_path.mkdir(parents=True, exist_ok=True)
    
    # 3. Guardar formato RAW (original de Grype, para debug)
    ruta_raw = self.output_path / f"{repo_name}-grype-raw.json"
    ruta_raw.write_text(grype_raw, encoding="utf-8")
    
    # 4. Guardar formato NORMALIZADO (para análisis)
    ruta_normalizado = self.output_path / f"{repo_name}-grype.json"
    contenido_json = json.dumps(analysis_data, ensure_ascii=False, indent=2)
    ruta_normalizado.write_text(contenido_json, encoding="utf-8")
    
    # 5. Retornar ambas rutas
    return ruta_raw, ruta_normalizado

Archivos generados por repositorio:

data/results/
  ├── opencode-grype-raw.json        ← Salida original de Grype (debug visual)
  ├── opencode-grype.json            ← Formato normalizado (análisis)
  ├── claude-code-action-grype-raw.json
  ├── claude-code-action-grype.json
  └── ...

7. run()

Propósito: Método principal que coordina el análisis de todos los repositorios.

Lógica:

def run(self):
    # 1. Descubrir repositorios
    repositorios = self.discover_repositories()
    
    # 2. Para cada repositorio:
    for indice, repo_path in enumerate(repositorios, start=1):
        # Log de progreso: [1/3] Scanning data/repos/opencode...
        LOGGER.info(f"[{indice}/{len(repositorios)}] ...")
        
        if self.dry_run:
            # Mostrar qué se haría sin ejecutar
            continue
        
        try:
            # 3. Ejecutar análisis
            grype_output = self.run_grype(repo_path)
            
            # 4. Procesar resultados
            analysis = self.parse_grype_output(grype_output)
            
            # 5. Guardar ambos formatos
            self.save_analysis(ruta_repo.name, grype_output, analysis)
            
            repositorios_analizados += 1
        except Exception as error:
            # Manejar errores, limpiar, y continuar
            errores += 1
            self._eliminar_archivos_parciales(ruta_repo.name)
            LOGGER.error(f"Error scanning {repo_path}: {error}")
    
    # 6. Resumen final
    LOGGER.info(f"Summary | total_repos={len(repos)} | ...")

Salida típica:

INFO | [1/3] Scanning data/repos/claude-code-action with Grype...
INFO | Manifests found in claude-code-action: package.json, package-lock.json
INFO | Running Grype on claude-code-action...
INFO | Found 0 matches in Grype output
INFO | Raw Grype output saved to data/results/claude-code-action-grype-raw.json
INFO | Normalized analysis saved to data/results/claude-code-action-grype.json
INFO | [2/3] Scanning data/repos/opencode with Grype...
INFO | Manifests found in opencode: package.json, package-lock.json, yarn.lock
INFO | Found 45 matches in Grype output
INFO | ...
INFO | Summary | total_repos=3 | repos_scanned=3 | files_generated=6 | errors=0

Manejo de Errores

Niveles de Validación

  1. Directorios:
    • _validar_directorio_repos(): Verifica que data/repos/ existe y es directorio
    • _validar_directorio_salida(): Verifica que ruta de salida es válida
  2. Repositorio Individual:
    • Existe: FileNotFoundError
    • Es directorio: NotADirectoryError
    • No está vacío: ValueError
  3. Análisis:
    • Grype instalado: RuntimeError con mensaje instructivo
    • Grype ejecutable: RuntimeError si falla subprocess
    • JSON válido: RuntimeError si output no es parseable

Recuperación de Errores

try:
    # Intentar análisis
    grype_output = self.run_grype(repo_path)
except Exception as error:
    # 1. Limpiar archivos parciales
    self._eliminar_archivos_parciales(ruta_repo.name)
    
    # 2. Registrar error
    LOGGER.error(f"Error: {error}")
    
    # 3. Continuar con siguiente repo
    errores += 1

Uso del Script

Ejecución Básica

python scripts/generate_grype.py

Vista Previa sin Ejecutar (Dry-run)

python scripts/generate_grype.py --dry-run

Salida:

INFO | [1/3] Dry-run: would scan data/repos/claude-code-action
INFO | [2/3] Dry-run: would scan data/repos/genai-code-review
INFO | [3/3] Dry-run: would scan data/repos/opencode

Con Rutas Personalizadas

python scripts/generate_grype.py \
  --repos-path /path/to/repos \
  --output-path /path/to/output

Diagnóstico del Entorno

python scripts/generate_grype.py --diagnose

Argumentos

  • --repos-path: Directorio con repositorios (default: data/repos)
  • --output-path: Directorio de salida (default: data/results)
  • --dry-run: Mostrar qué se haría sin ejecutar
  • --diagnose: Verificar disponibilidad de Grype y su DB

Esquema de Salida JSON

Archivo Normalizado: {repo}-grype.json

{
  "total_vulnerabilities": 45,
  "vulnerabilities_by_severity": {
    "critical": 2,
    "high": 8,
    "medium": 20,
    "low": 15
  },
  "vulnerabilities": [
    {
      "package_name": "lodash",
      "current_version": "4.17.20",
      "vuln_id": "CVE-2021-23337",
      "vuln_severity": "high",
      "fix_version": "4.17.21",
      "message": "Prototype pollution vulnerability in lodash library",
      "cwe": "CWE-1321",
      "cvss_score": 7.4,
      "type": "vulnerability"
    },
    {
      "package_name": "express",
      "current_version": "4.16.2",
      "vuln_id": "CVE-2022-24999",
      "vuln_severity": "medium",
      "fix_version": "4.18.1",
      "message": "Denial of Service via regular expression in Express query parser",
      "cwe": "CWE-1333",
      "cvss_score": 5.3,
      "type": "vulnerability"
    }
  ],
  "grype_metadata": {
    "grype_version": "2.1.0",
    "db_location": "/home/user/.grype/db",
    "scanned_path": "/home/user/project/data/repos/opencode"
  }
}

Archivo Raw (Debug): {repo}-grype-raw.json

Contiene la salida original de Grype sin procesamiento. Útil para: - Debug visual y verificación - Análisis directo del formato de Grype - Referencia de estructura completa

Ejemplo snippet:

{
  "formatVersion": "2.1.0",
  "source": {
    "target": "/home/user/project/data/repos/opencode",
    "type": "directory",
    "dbPath": "/home/user/.grype/db"
  },
  "matches": [
    {
      "vulnerability": {
        "id": "CVE-2021-23337",
        "description": "Prototype pollution...",
        "cvssV3": { "score": 7.4 }
      },
      "artifact": {
        "name": "lodash",
        "version": "4.17.20"
      }
    }
  ]
}

Resumen: scripts/generate_grype.py

El archivo generate_grype.py implementa un scanner de vulnerabilidades robusto que:

  1. Descubre automáticamente múltiples repositorios
  2. Detecta manifests de dependencias (package.json, requirements.txt, pom.xml, etc)
  3. Ejecuta Grype CLI para encontrar vulnerabilidades
  4. Normaliza salida JSON a formato consistente
  5. Guarda dos versiones: raw (debug) y normalizado (análisis)
  6. Maneja errores elegantemente y continúa procesando
  7. Proporciona resumen de estadísticas de vulnerabilidades