Saltar a contenido

MOS — Management Operating System

MOS es la capa transversal de auditoría y automatización por eventos de Gestión Civis. Vive en app/core/mos/ y ofrece tres capacidades:

  1. Auditoría e historial: registrar de forma trazable qué pasó, quién lo hizo y con qué datos (emit_event).
  2. Reglas y acciones: ejecutar automatismos cuando se cumplen condiciones sobre un evento.
  3. Tareas programadas (crons): ejecutar acciones de forma periódica.
flowchart LR
    H["Handler de módulo"] -->|emit_event| EV["events.py"]
    EV --> AL[("core_audit_log")]
    EV --> AP[("core_audit_payload")]
    EV --> HE[("core_history_event")]
    EV --> HP[("core_history_payload")]
    CRON["core_cron_job"] --> SCH["scheduler.py"]
    SCH --> Q["core_event_queue"]
    Q --> REG["registry.py<br/>@register_action"]
    REG --> ACT["Acción del módulo"]

Contexto de petición

El hook before_request llama a init_mos_context() (app/core/mos/context.py), que fija en g:

  • g.request_id — el X-Request-Id entrante o un UUID nuevo.
  • g.user_id — desde g.current_user si existe.

Este contexto enriquece automáticamente la metadata de los eventos.

Auditoría: emit_event()

app/core/mos/events.py. Firma (argumentos por nombre):

def emit_event(
    *,
    entidad_id: int,
    actor_user_id: int | None,
    module: str,
    resource_type: str,
    resource_id: str | int,
    action: str,
    before: dict | None = None,
    after: dict | None = None,
    event_code: str | None = None,
    message: str | None = None,
    severity: str = "INFO",
    meta: dict | None = None,
    request_id: str | None = None,
    actor_label: str | None = None,
):
    ...

Lo que hace:

  1. Crea un registro técnico en core_audit_log (module, resource, action, actor, request_id).
  2. Si hay before/after/meta, guarda los snapshots en core_audit_payload (JSONB).
  3. Si se aporta event_code, crea la narrativa funcional en core_history_event (mensaje, motivo, aprobado por, severidad, vector de búsqueda de texto completo).
  4. Si hay snapshots, calcula el diff en core_history_payload.
  5. Enriquece meta con contexto HTTP (IP, endpoint, método, user-agent, request_id).

Auditoría ≠ automatización

emit_event solo registra. La automatización (disparo de reglas) la gestiona dispatch_event().

Helper de auditoría por módulo

Patrón recomendado: cada módulo expone emit_<modulo>_event(...) que envuelve a emit_event, construye la metadata funcional del recurso, añade el motivo si lo hay y hace commit autónomo para que ningún evento se pierda.

# patrón general
emit_<modulo>_event(
    current_user=...,
    entidad_id=...,
    <recurso>=...,
    action="CREATE|UPDATE|DELETE|...",
    event_code="MODULO_ACCION",
    message="Descripción legible",
    before=snapshot_antes,    # snapshot_estado()
    after=snapshot_despues,   # snapshot_completo() en altas
    motivo="si aplica",
    severity="INFO|WARNING|ERROR",
)

Implementado en módulos como Registro (emit_registro_event) y Padrón (emit_padron_event), con snapshots que omiten datos sensibles.

Regla del proyecto

Todo endpoint nuevo con una acción importante debe emitir un evento para trazabilidad, preferiblemente a través del helper emit_<modulo>_event del módulo (metadata funcional + before/after + motivo separado + severidad razonable).

Reglas, condiciones y acciones

Modelos en app/core/mos/models.py:

Modelo Tabla Propósito
CoreEventCatalog core_event_catalog Catálogo maestro de eventos (code, module, activo).
CoreEventRule core_event_rule Regla que vincula un evento con acciones (prioridad, activo).
CoreEventCondition core_event_condition Condición (campo, operador, valor).
CoreEventAction core_event_action Acción a ejecutar (accion, modo, parametros).
CoreEventActionType core_event_action_type Catálogo de tipos de acción reutilizables.
CoreEventQueue core_event_queue Cola de ejecución (async/scheduled) con reintentos.

Operadores de condición soportados: =, !=, contains, in, >, <, >=, <=.

Registro de acciones

app/core/mos/registry.py. Un módulo expone una acción con el decorador @register_action(code):

@register_action("recaudacion.contabilizar_cobro")
def contabilizar_cobro(payload, params, entidad_id):
    ...

La acción se invoca con (payload, params, entidad_id) cuando la cola la procesa.

Tareas programadas (crons)

Modelo Tabla Propósito
CoreCronJob core_cron_job Tarea periódica (code, action, cron_expr, params, next_run).

scheduler.py:

  • process_cron_jobs() detecta CoreCronJob con next_run <= now, encola un CoreEventQueue y recalcula next_run con croniter.
  • process_event_queue() procesa los pendientes, ejecuta la acción registrada y reintenta con backoff (hasta 5 intentos).

Crons de fábrica

Los siguientes cron jobs se siembran automáticamente en cada entidad desde app/core/mos/seed_cron_jobs.py (CRON_JOBS_DEFAULT). Son idempotentes: la primera vez que se ejecuta el seed se crean; en sucesivas se respeta lo que el administrador haya cambiado desde la UI de "Tareas programadas".

code Acción Periodicidad Módulo
der_ing_facturar_auto contabilidad.derechos_ingreso.facturar_automatico 0 6 * * * Contabilidad
sla_facturas_sin_propuesta contabilidad.sla_facturas_sin_propuesta 0 8 * * * Contabilidad
sla_facturas_retenidas contabilidad.sla_facturas_retenidas 0 8 * * * Contabilidad
workflow_verificar_sla workflow.verificar_sla */15 * * * * Documental
recaudacion_prescripcion recaudacion.prescripcion.ejecutar 0 4 * * * Recaudación
tesoreria_pj_aviso tesoreria.pj.aviso_vencimiento 0 9 * * * Tesorería
recaudacion_plan_pago_sepa recaudacion.plan_pago.sepa_orquestar 0 5 * * * Recaudación
padron_renovaciones_detectar padron_habitantes.renovaciones.detectar 0 5 1 * * Padrón
padron_inspeccion_diaria padron_habitantes.inspeccion.detectar 0 4 * * * Padrón
padron_snapshot_anual padron_habitantes.snapshot.generar_anual 0 3 2 1 * Padrón
padron_ida_mensual padron_habitantes.ida.generar_mensual 0 6 5 * * Padrón
padron_cert_sello_caducidad padron_habitantes.cert_sello.vigilar_caducidad 0 7 * * * Padrón
notificaciones_vigilar_vencimientos notificaciones.vigilar_vencimientos 0 5 * * * Notificaciones

Crons registrados como acciones pero no sembrados de fábrica (se activan cuando el módulo entra en producción real):

Acción Periodicidad sugerida Módulo
registro.sir.enviar_pendientes */5 * * * * Registro (SIR salida)
registro.sir.descargar_pendientes */10 * * * * Registro (SIR entrada)

Arranque del scheduler

Para que los crons de la tabla anterior se ejecuten, el sistema necesita un proceso permanente que invoque process_cron_jobs() y process_event_queue() de forma continua. Hay dos comandos en la plataforma y conviene no confundirlos:

Comando Para qué sirve Modo de invocación
python run_scheduler.py Scheduler MOS real: procesa CoreCronJob y CoreEventQueue en un bucle infinito. Es el que ejecuta los 14 crons de fábrica. Worker permanente (systemd / Servicio Windows / Cloud Run).
flask civis-system-run Tareas de mantenimiento del propio MOS: particiones mensuales de core_audit_log y core_history_event (cobertura 12 meses adelante) y política de retención (audit 5 años, history 3 años). Programación periódica (cron del SO / Programador de tareas), basta una vez al día.

Es obligatorio arrancar run_scheduler.py como servicio

Sin ese proceso permanente vivo, los 14 cron jobs de fábrica no se ejecutan: las renovaciones de Padrón no se detectan, las prescripciones de Recaudación no se aplican, los SLA de facturas no avisan, las notificaciones DEHú vencidas no se marcan como rechazadas, etc. flask civis-system-run por sí solo no es suficiente — solo se ocupa del mantenimiento de las tablas particionadas.

Una sola instancia

El scheduler usa SELECT ... FOR UPDATE SKIP LOCKED en la cola y un advisory lock PostgreSQL en las tareas de mantenimiento, por lo que arrancar varios run_scheduler.py contra la misma BD es seguro (uno toma cada job). Aun así, para el despliegue típico de un ayuntamiento se recomienda un único worker por simplicidad operativa.

Detalles concretos de cómo dejar esto arrancado en cada plataforma:

Pausar el scheduler

Existe un kill-switch global: el valor scheduler_enabled en core_system_setting. Si vale "false", el bucle queda en espera (dormido 5 s) sin tomar jobs. Se cambia desde el endpoint POST /api/core/scheduler/toggle (útil durante una migración delicada de BD).

Endpoints MOS

Blueprint mos_bp, prefijo /api/core. Selección:

Método Ruta Propósito
GET /api/core/audit Listado de auditoría técnica (cursor).
GET /api/core/historial Historial funcional con búsqueda de texto.
GET /api/core/catalog Catálogo de eventos.
GET/POST /api/core/rules Reglas de automatización.
GET/POST /api/core/conditions Condiciones de reglas.
GET/POST /api/core/actions Acciones de reglas.
GET/POST /api/core/cron-jobs Tareas programadas.
POST /api/core/cron-jobs/<id>/run-now Ejecutar un cron manualmente.
GET /api/core/scheduler/status Estado de cola y jobs.
GET /api/core/admin/system-health Tamaño de tablas particionadas.

Tablas particionadas

core_audit_log y core_history_event están particionadas por rango sobre created_at para sostener el volumen de auditoría.