CodeQL Script detallado

Acerca de este Archivo

Ubicación: scripts/generate_codeql.py
Módulo importado en: - nbs/vuln/generacion_codeql.ipynb (para ejecutar análisis) - Este notebook (para documentación)

Este archivo contiene la clase CodeQLAnalyzer, responsable de orquestar el análisis de seguridad estático sobre múltiples repositorios. Fue diseñado siguiendo el mismo patrón que scripts/generate_sboms.py, proporcionando una interfaz consistente para automatizar análisis de seguridad.

Cómo se Usa en los Notebooks

En generacion_codeql.ipynb:

from generate_codeql import CodeQLAnalyzer
analizador = CodeQLAnalyzer(repos_path, output_path)
analizador.run()  # Ejecuta todo el flujo de análisis

Explicación Detallada: scripts/generate_codeql.py

Este notebook documenta el funcionamiento interno y diseño del archivo scripts/generate_codeql.py, que implementa un analizador de seguridad estático automatizado para múltiples repositorios.

Descripción General

CodeQL es una herramienta de análisis de código desarrollada por GitHub que permite: - Escanear código fuente en busca de vulnerabilidades y problemas de seguridad - Detectar defectos comunes de programación - Ejecutar consultas personalizadas sobre estructura de código - Generar reportes en formato SARIF (Static Analysis Results Interchange Format)

El archivo generate_codeql.py automatiza este proceso para múltiples repositorios, similar a cómo generate_sboms.py automatiza la generación de SBOMs con Syft.

Arquitectura

Clase Principal: CodeQLAnalyzer

La clase CodeQLAnalyzer es el componente central que coordina todo el flujo de análisis.

class CodeQLAnalyzer:
    def __init__(self, repos_path: str, output_path: str):
        self.repos_path = Path(repos_path)          # Directorio con repositorios
        self.output_path = Path(output_path)        # Directorio de salida para resultados
        self.project_root = Path(__file__).resolve().parents[1]
        self.codeql_bin = "codeql"                  # Nombre ejecutable de CodeQL
        self.dry_run = False                         # Modo preview sin ejecución
        self.codeql_path: str | None = None         # Ruta completa a ejecutable

Flujo de Ejecución

main()
  └─> CodeQLAnalyzer.run()
       ├─> discover_repositories()                   [Descubre repos en data/repos/]
       ├─> Para cada repositorio:
       │   ├─> run_codeql(repo_path)                [Ejecuta análisis]
       │   │   ├─> _detectar_lenguaje_simple()     [Identifica lenguaje principal]
       │   │   ├─> _crear_base_datos_codeql()      [Crea DB de CodeQL]
       │   │   ├─> _resolver_query_suite()         [Busca suite compilada, fallback a pack]
       │   │   └─> _analizar_base_datos_codeql()   [Ejecuta análisis con suite]
       │   ├─> parse_sarif()                        [Convierte SARIF a JSON normalizado]
       │   └─> save_analysis()                      [Guarda {repo}-codeql.json]
       └─> Resumen final

Métodos Principales

1. discover_repositories()

Propósito: Escanear el directorio data/repos/ para encontrar todos los repositorios disponibles.

Funcionamiento:

def discover_repositories(self) -> list[str]:
    # 1. Valida que data/repos/ existe
    self._validar_directorio_repos()
    
    # 2. Lista todos los subdirectorios
    repositorios = sorted(
        str(ruta.relative_to(self.project_root))
        for ruta in self.repos_path.iterdir()
        if ruta.is_dir()  # Solo directorios
    )
    
    # 3. Devuelve lista ordenada
    return repositorios

Ejemplo de salida:

[
    'data/repos/claude-code-action',
    'data/repos/genai-code-review',
    'data/repos/opencode'
]

Excepciones: - FileNotFoundError: Si data/repos/ no existe - NotADirectoryError: Si la ruta no es un directorio

2. _detectar_lenguaje_simple(ruta_repo)

Propósito: Detectar el lenguaje principal del repositorio y retornar UNO solo (el más frecuente).

Estrategia:

def _detectar_lenguaje_simple(self, ruta_repo: Path) -> str | None:
    # 1. Define mapeo de extensiones a lenguajes
    extensiones = {
        ".py": "python",
        ".js": "javascript",
        ".ts": "javascript",  # TypeScript → JavaScript
        ".java": "java",
        # ... más extensiones
    }
    
    # 2. Escanea archivos y cuenta por lenguaje
    conteos = {}
    for archivo in ruta_repo.rglob("*"):
        if archivo.is_file() and archivo.suffix.lower() in extensiones:
            lenguaje = extensiones[archivo.suffix.lower()]
            conteos[lenguaje] = conteos.get(lenguaje, 0) + 1
    
    # 3. Devuelve el lenguaje más frecuente
    if not conteos:
        return None  # No detectó nada
    
    return max(conteos, key=conteos.get)

Ejemplo: - Entrada: Repo con 150 archivos .py, 30 .js, 5 .yml - Salida: "python" (lenguaje más frecuente)

¿Por qué un solo lenguaje? - CodeQL procesa mejor repos de un solo lenguaje - Reduce falsos positivos de análisis cruzados - Mejora rendimiento del análisis - Para análisis multi-lenguaje, ejecutar script varias veces

Lenguajes soportados: - Python, JavaScript/TypeScript, Java, C/C++, C#, Go

3. _crear_base_datos_codeql(ruta_repo, lenguaje)

Propósito: Crear la base de datos de CodeQL que indexa el código fuente.

Funcionamiento:

def _crear_base_datos_codeql(self, ruta_repo: Path, lenguaje: str) -> Path:
    # 1. Crear directorio temporal para la DB
    db_path = self.temp_dir / f"{ruta_repo.name}_db"
    
    # 2. Construir comando CodeQL
    comando = [
        "codeql",
        "database", "create",
        str(db_path),
        "--language", lenguaje,
        "--source-root", str(ruta_repo),
        "--overwrite"  # Permitir sobrescribir DBs existentes
    ]
    # Ejecuta: codeql database create /tmp/codeql_XXX/repo_db \
    #            --language python \
    #            --source-root /home/user/repo/ \
    #            --overwrite
    
    # 3. Ejecutar comando
    resultado = subprocess.run(comando, ...)
    
    # 4. Validar éxito y devolver ruta
    return db_path

Qué hace CodeQL: - Analiza la estructura sintáctica del código - Construye un grafo de dependencias - Indexa todas las definiciones y usos de variables - Almacena en base de datos optimizada para consultas

Flag --overwrite: - Permite re-crear la base de datos incluso si ya existe - Útil para ejecuciones idempotentes del script - Evita conflictos si se corre el análisis múltiples veces

Errores comunes: - CodeQL no instalado: RuntimeError: CodeQL CLI is not installed - Lenguaje no detectado: El análisis se omite

4.a _resolver_query_suite(lenguaje)

Propósito: Resolver la suite de consultas óptima para el lenguaje (con fallback automático).

Estrategia de fallback:

def _resolver_query_suite(self, lenguaje: str) -> str:
    # 1. INTENTO: Buscar suite compilada (más rápida)
    codeql_packages = Path.home() / ".codeql" / "packages" / "codeql"
    suite_pattern = f"{lenguaje}-queries/*/codeql-suites/{lenguaje}-security-and-quality.qls"
    suite_files = list(codeql_packages.glob(suite_pattern))
    
    if suite_files:
        # Suite compilada encontrada (ej: Python sí tiene)
        return str(suite_files[0])  # ~.codeql/packages/.../python-security-and-quality.qls
    
    # 2. FALLBACK: Usar query pack directo (universal)
    # Funciona incluso sin suite compilada (ej: JavaScript)
    return f"codeql/{lenguaje}-queries"

Comportamiento: - Python: ~/.codeql/packages/codeql/python-queries/.../python-security-and-quality.qls ✅ Suite compilada ⚡ Rápido - JavaScript: codeql/javascript-queries ✅ Query pack 🔄 Más lento pero funciona

4.b _analizar_base_datos_codeql(db_path, lenguaje)

Propósito: Ejecutar consultas de seguridad sobre la base de datos y generar SARIF.

Funcionamiento:

def _analizar_base_datos_codeql(self, db_path: Path, lenguaje: str, repo_name: str) -> str:
    # 1. Resolver la suite óptima
    query_suite = self._resolver_query_suite(lenguaje)
    
    # 2. Construir comando CodeQL
    comando = [
        "codeql", "database", "analyze",
        str(db_path),
        query_suite,  # Suite compilada O query pack
        "--format=sarifv2.1.0",  # Versión SARIF
        f"--output={str(sarif_output)}"  # Archivo (no stdout)
    ]
    
    # 3. Ejecutar análisis
    resultado = subprocess.run(comando, ...)
    
    # 4. Guardar resultado en archivo y devolver contenido
    if sarif_output.exists():
        return sarif_output.read_text(encoding="utf-8")

Formato SARIF (versión 2.1.0): SARIF es el estándar de la industria para reportes de análisis estático.


{
    "$schema": "https://json.schemastore.org/sarif-2.1.0.json",
    "version": "2.1.0",
    "runs": [
      {
        "tool": {
          "driver": {
            "name": "CodeQL",
            "organization": "GitHub",
            "semanticVersion": "2.25.1",
            "notifications": [
              {
                "id": "js/diagnostics/successfully-extracted-files",
                "name": "js/diagnostics/successfully-extracted-files",
                "shortDescription": { "text": "Extracted files" },
                "fullDescription": {
                    "text": "Lists all files in the source code directory that were extracted."
                },
                "defaultConfiguration": { "enabled": true },
                "properties": {
                    "tags": ["successfully-extracted-files"],
                    "description": "Lists all files in the source code directory that were extracted.",
                    "id": "js/diagnostics/successfully-extracted-files",
                    "kind": "diagnostic",
                    "name": "Extracted files"
                }
              },
    ]},
}}]}

5. parse_sarif(sarif_data)

Propósito: Convertir la salida SARIF de CodeQL a un formato JSON normalizado y aplanado para mejor usabilidad.

Funcionamiento:

def parse_sarif(self, sarif_data: str) -> dict:
    # 1. Parsear JSON SARIF
    sarif_json = json.loads(sarif_data)
    
    # 2. Inicializar estructura de salida
    resultados = {
        "total_issues": 0,
        "issues_by_severity": {
            "error": 0,
            "warning": 0,
            "note": 0
        },
        "issues": [],      # Array de hallazgos individuales
        "sarif_metadata": {}    # Información de la herramienta
    }
    
    # 3. Extraer información de la herramienta
    resultados["sarif_metadata"] = self._extraer_tool_metadata(sarif_json)
    
    # 4. Procesar cada resultado SARIF
    runs = sarif_json.get("runs", [])
    for run in runs:
        for resultado_sarif in run.get("results", []):
            # Convertir a formato normalizado
            issue = self._procesar_resultado_sarif(resultado_sarif)
            resultados["issues"].append(issue)
            resultados["total_issues"] += 1
            
            # Contar por severidad
            nivel = resultado_sarif.get("level", "warning")
            resultados["issues_by_severity"][nivel] += 1
    
    return resultados

Salida normalizada:

{
  "total_issues": 5,
  "issues_by_severity": {
    "error": 1,
    "warning": 3,
    "note": 1
  },
  "issues": [
    {
      "rule_id": "py/sql-injection",
      "level": "warning",
      "message": "Possible SQL injection",
      "file": "src/database.py",
      "region": { "startLine": 42 }
    }
  ]
}

6. save_analysis(repo_name, analysis_data)

Propósito: Guardar el análisis normalizado en data/results/{repo-name}-codeql.json.

Funcionamiento:

def save_analysis(self, repo_name: str, analysis_data: dict) -> Path:
    # 1. Validar entrada
    if not repo_name:
        raise ValueError("El nombre del repositorio no puede estar vacio.")
    
    # 2. Crear directorio de salida si no existe
    self.output_path.mkdir(parents=True, exist_ok=True)
    
    # 3. Determinar ruta de salida
    ruta_salida = self.output_path / f"{repo_name}-codeql.json"
    
    # 4. Serializar y guardar
    contenido_json = json.dumps(analysis_data, ensure_ascii=False, indent=2)
    ruta_salida.write_text(contenido_json, encoding="utf-8")
    
    # 5. Registrar en log
    LOGGER.info(f"Analisis CodeQL guardado en {ruta_salida}")
    
    return ruta_salida

Archivos generados:

data/results/
  ├── claude-code-action-codeql.json
  ├── genai-code-review-codeql.json
  └── opencode-codeql.json

7. run_codeql(repo_path)

Propósito: Orquestar todo el flujo de análisis para un repositorio individual.

Flujo:

def run_codeql(self, repo_path: str) -> str:
    # 1. Validar que repo existe
    ruta_repo = self.project_root / repo_path
    # ... validaciones ...
    
    # 2. Detectar lenguaje PRINCIPAL (el más frecuente)
    lenguaje = self._detectar_lenguaje_simple(ruta_repo)
    if not lenguaje:
        # Devolver SARIF vacío si no detectó nada
        LOGGER.warning(f"No se detectó lenguaje en {ruta_repo.name}")
        return json.dumps({"runs": []})
    
    LOGGER.info(f"Lenguaje detectado: {lenguaje}")
    
    # 3. Crear base de datos con lenguaje explícito
    db_path = self._crear_base_datos_codeql(ruta_repo, lenguaje)
    
    try:
        # 4. Analizar (con estrategia de fallback de suite)
        sarif_output = self._analizar_base_datos_codeql(db_path, lenguaje, ruta_repo.name)
        return sarif_output
    finally:
        # 5. Limpiar base de datos temporal
        if db_path.exists():
            shutil.rmtree(db_path, ignore_errors=True)

Diagrama de Flujo:

Repositorio Git
    ↓
[detectar lenguaje] → Python | JavaScript | Java
    ↓
[crear DB] → codeql database create --language {lenguaje} --overwrite
    ↓
[resolver suite] → Suite compilada O Query pack (fallback)

    ↓

[analizar] → codeql database analyze WITH suite{repo}-codeql.json

    ↓    ↓

SARIF JSON 2.1.0Parse + Normalizar
    ↓

Métodos de Orquestación

run()

Propósito: Método principal que coordina el análisis de todos los repositorios.

Lógica:

def run(self):
    # 1. Descubrir repositorios
    repositorios = self.discover_repositories()
    
    # 2. Para cada repositorio:
    for indice, repo_path in enumerate(repositorios, start=1):
        # Log de progreso: [1/3] Procesando data/repos/claude-code-action
        LOGGER.info(f"[{indice}/{len(repositorios)}] ...")
        
        if self.dry_run:
            # Mostrar qué se haría sin ejecutar
            continue
        
        try:
            # 3. Ejecutar análisis
            sarif_data = self.run_codeql(repo_path)
            
            # 4. Procesar resultados
            analysis = self.parse_sarif(sarif_data)
            
            # 5. Guardar
            self.save_analysis(ruta_repo.name, analysis)
            
            repositorios_analizados += 1
        except Exception as error:
            # Manejar errores, limpiar, y continuar
            errores += 1
            self._eliminar_archivos_parciales(ruta_repo.name)
            LOGGER.error(f"Error al procesar {repo_path}: {error}")
    
    # 6. Resumen final
    LOGGER.info(f"Resumen final | total_repos=3 | ...")

Salida:

INFO | [1/3] Procesando repositorio data/repos/claude-code-action
INFO | Usando CodeQL CLI: /usr/local/bin/codeql
INFO | [1/3] Analisis CodeQL guardado en data/results/claude-code-action-codeql.json
INFO | [2/3] Procesando repositorio data/repos/genai-code-review
INFO | [2/3] Analisis CodeQL guardado en data/results/genai-code-review-codeql.json
...
INFO | Resumen final | total_repos=3 | repos_analizados=3 | archivos_generados=3 | omitidos=0 | errores=0

Manejo de Errores

Niveles de Validación

  1. Directorios:
    • _validar_directorio_repos(): Verifica que data/repos/ existe y es directorio
    • _validar_directorio_salida(): Verifica que ruta de salida es válida
  2. Repositorio Individual:
    • Existe: FileNotFoundError
    • Es directorio: NotADirectoryError
    • No está vacío: ValueError
  3. Análisis:
    • CodeQL instalado: RuntimeError con mensaje instructivo
    • Base de datos creada: RuntimeError con detalle de stderr
    • SARIF válido: RuntimeError si JSON no es válido

Recuperación de Errores


try:
    # Intentar análisis
    sarif_data = self.run_codeql(repo_path)
except Exception as error:
    # 1. Limpiar archivos parciales
    self._eliminar_archivos_parciales(ruta_repo.name)
    
    # 2. Registrar error
    LOGGER.error(f"Error: {error}")
    
    # 3. Continuar con siguiente repo
    errores += 1

Uso del Script

Ejecución Básica

python scripts/generate_codeql.py

Vista Previa sin Ejecutar (Dry-run)

python scripts/generate_codeql.py --dry-run

Salida:

INFO | [1/3] Dry-run: se generaria data/results/claude-code-action-codeql.json
INFO | [2/3] Dry-run: se generaria data/results/genai-code-review-codeql.json
INFO | [3/3] Dry-run: se generaria data/results/opencode-codeql.json

Con Rutas Personalizadas

python scripts/generate_codeql.py \
  --repos-path /path/to/repos \
  --output-path /path/to/output

Argumentos

  • --repos-path: Directorio con repositorios (default: data/repos)
  • --output-path: Directorio de salida (default: data/results)
  • --dry-run: Mostrar qué se haría sin ejecutar

Esquema de Salida JSON

Cada archivo {repo}-codeql.json contiene:


{
  "total_issues": 15,
  "issues_by_severity": {
    "error": 2,
    "warning": 10,
    "note": 3
  },
  "issues": [
    {
      "rule_id": "py/sql-injection",
      "rule_index": 42,
      "level": "warning",
      "message": "User input used in SQL query without sanitization",
      "file": "src/database.py",
      "region": {
        "startLine": 156,
        "endLine": 156,
        "startColumn": 21,
        "endColumn": 45
      },
      "kind": "fail",
      "properties": {}
    }
  ],
  "sarif_metadata": {
    "version": "2.1.0",
    "schema_uri": "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/...",
    "tool": {
      "name": "CodeQL",
      "version": "2.11.6",
      "information_uri": "https://codeql.github.com"
    }
  }
}

Resumen: scripts/generate_codeql.py

El archivo generate_codeql.py implementa un analizador de seguridad robusto que:

  1. Descubre automáticamente múltiples repositorios
  2. Detecta el lenguaje principal presente usando conteo de extensiones
  3. Crea bases de datos de CodeQL indexadas con --overwrite para idempotencia
  4. Resuelve suite de consultas (compilada → fallback a query pack)
  5. Ejecuta consultas de seguridad y calidad sobre la DB
  6. Convierte SARIF 2.1.0 a JSON normalizado
  7. Maneja errores elegantemente y continúa procesando
  8. Proporciona resumen de estadísticas