/** * Lego 09 — Surveillance Engine (PID Polling & Socket Stream) * * Discovers the PIDs for n8n (node), ollama_llama_server, and the NestJS * process itself. Polls every 500ms via pidusage and exposes the live * metrics payload so the Gateway can broadcast monitor:data to all clients. * * Port-Level Heartbeat: n8n is only included in metrics when both a PID is * found AND port 5678 is actively accepting TCP connections. * * Webhook Probe: every 10 s, NestJS attempts a real HTTP POST to N8N_WEBHOOK_URL * with a ping payload. If n8n responds (any non-network-error), the agent is * considered ready. This status is broadcast via monitor:status so the UI can * show "Agent Ready / Not Ready" independently of process metrics. */ import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common'; import * as net from 'net'; import * as pidusage from 'pidusage'; import find from 'find-process'; export interface MonitorPayload { service: string; pid: number; cpu: number; memory: number; timestamp: Date; } export interface MonitorStatus { n8nWebhookReady: boolean; timestamp: Date; } @Injectable() export class SurveillanceService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(SurveillanceService.name); // Live metrics — the Gateway reads these on every tick private _latestMetrics: MonitorPayload[] = []; // Callback registered by the Gateway so it receives every poll result private onMetricsUpdate: ((metrics: MonitorPayload[]) => void) | null = null; // Callback for webhook probe results private onStatusUpdate: ((status: MonitorStatus) => void) | null = null; private pollInterval: NodeJS.Timeout | null = null; private webhookProbeInterval: NodeJS.Timeout | null = null; // Cached result of the last webhook probe private _n8nWebhookReady = false; private readonly n8nWebhookUrl = process.env['N8N_WEBHOOK_URL'] ?? ''; // Tracked PIDs: resolved once and re-used (re-discovered when null) private pidMap: Record = { NestJS: process.pid, // Always known immediately n8n: null, Ollama: null, }; // ─── Lifecycle ───────────────────────────────────────────────────────────── async onModuleInit() { this.logger.log('🟢 SurveillanceService booting — starting 500ms PID poll loop'); await this.discoverPids(); this.pollInterval = setInterval(() => this.tick(), 500); // Probe the n8n webhook immediately, then every 10 s await this.probeWebhook(); this.webhookProbeInterval = setInterval(() => this.probeWebhook(), 10_000); } onModuleDestroy() { if (this.pollInterval) clearInterval(this.pollInterval); if (this.webhookProbeInterval) clearInterval(this.webhookProbeInterval); this.logger.log('🔴 SurveillanceService stopped'); } // ─── Public API ──────────────────────────────────────────────────────────── registerMetricsCallback(cb: (metrics: MonitorPayload[]) => void) { this.onMetricsUpdate = cb; } registerStatusCallback(cb: (status: MonitorStatus) => void) { this.onStatusUpdate = cb; } getLatestMetrics(): MonitorPayload[] { return this._latestMetrics; } getLatestStatus(): MonitorStatus { return { n8nWebhookReady: this._n8nWebhookReady, timestamp: new Date() }; } // ─── Port-Level Heartbeat ────────────────────────────────────────────────── /** * Opens a short-lived TCP connection to host:port. * Resolves true if the port accepts the connection within 800 ms. * Resolves false on ECONNREFUSED, timeout, or any error. */ private portListening(host: string, port: number): Promise { return new Promise((resolve) => { const socket = new net.Socket(); const done = (result: boolean) => { socket.destroy(); resolve(result); }; socket.setTimeout(800); socket.once('connect', () => done(true)); socket.once('timeout', () => done(false)); socket.once('error', () => done(false)); socket.connect(port, host); }); } // ─── Webhook Probe ───────────────────────────────────────────────────────── /** * POSTs a lightweight ping to the n8n webhook URL. * Any HTTP response (even 4xx) means the webhook is registered and n8n is * accepting requests — we treat that as "ready". * A network error (ECONNREFUSED, timeout, DNS failure) means "not ready". */ private async probeWebhook(): Promise { if (!this.n8nWebhookUrl) { if (this._n8nWebhookReady) { this._n8nWebhookReady = false; this.emitStatus(); } return; } try { const controller = new AbortController(); const timeout = setTimeout(() => controller.abort(), 3_000); const res = await fetch(this.n8nWebhookUrl, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ __ping: true }), signal: controller.signal, }); clearTimeout(timeout); const ready = res.status < 500; // 2xx/3xx/4xx all mean n8n responded if (ready !== this._n8nWebhookReady) { this._n8nWebhookReady = ready; this.logger.log(`🔗 n8n webhook probe: ${ready ? '✅ ready' : '⚠️ 5xx'}`); this.emitStatus(); } } catch { if (this._n8nWebhookReady) { this._n8nWebhookReady = false; this.logger.warn('⚠️ n8n webhook unreachable — agent marked NOT READY'); this.emitStatus(); } } } private emitStatus(): void { if (this.onStatusUpdate) { this.onStatusUpdate({ n8nWebhookReady: this._n8nWebhookReady, timestamp: new Date() }); } } // ─── PID Discovery ───────────────────────────────────────────────────────── /** * Uses find-process to locate n8n and Ollama PIDs. * Called once at boot and retried on each tick for any service still null. * n8n PID is only accepted if port 5678 is also actively listening. */ private async discoverPids() { // --- n8n: a Node.js process whose command line contains "n8n" --- if (!this.pidMap['n8n']) { try { const nodeProcs = await find('name', 'node', true); const n8nProc = nodeProcs.find( (p: any) => p.cmd && p.cmd.toLowerCase().includes('n8n'), ); if (n8nProc) { // Port-level guard: only accept the PID if n8n is actually serving const portUp = await this.portListening('127.0.0.1', 5678); if (portUp) { this.pidMap['n8n'] = n8nProc.pid; this.logger.log(`🔍 n8n PID discovered: ${n8nProc.pid} (port 5678 ✓)`); } else { this.logger.warn(`⚠️ n8n PID ${n8nProc.pid} found but port 5678 is not listening — treating as offline`); } } } catch (e) { // n8n not running yet — will retry next tick } } // --- Ollama: match the server child process by executable name --- if (!this.pidMap['Ollama']) { try { // On Windows the process is "ollama_llama_server.exe"; on Linux "ollama" const ollamaNames = ['ollama_llama_server', 'ollama']; for (const name of ollamaNames) { const procs = await find('name', name, true); if (procs.length > 0) { this.pidMap['Ollama'] = procs[0].pid; this.logger.log(`🔍 Ollama PID discovered: ${procs[0].pid} (${name})`); break; } } } catch (e) { // Ollama not running yet — will retry next tick } } } // ─── Poll Tick ───────────────────────────────────────────────────────────── private async tick() { // Port re-verification: if n8n PID is known, confirm port 5678 is still up. // A zombie n8n process (PID alive, port dead) is evicted so discovery retries. if (this.pidMap['n8n'] !== null) { const portUp = await this.portListening('127.0.0.1', 5678); if (!portUp) { this.logger.warn(`⚠️ n8n port 5678 went silent — evicting PID ${this.pidMap['n8n']}`); this.pidMap['n8n'] = null; } } // Retry PID discovery for any services still unknown const hasMissingPids = Object.values(this.pidMap).some((v) => v === null); if (hasMissingPids) await this.discoverPids(); const activePids = Object.entries(this.pidMap).filter( ([, pid]) => pid !== null, ) as [string, number][]; if (activePids.length === 0) return; const pidsToQuery = activePids.map(([, pid]) => pid); let stats: Record; try { stats = await pidusage(pidsToQuery); } catch (e) { // A process may have died — invalidate its PID so it gets re-discovered this.invalidateDeadPids(e); return; } const metrics: MonitorPayload[] = activePids.map(([service, pid]) => ({ service, pid, cpu: parseFloat((stats[pid]?.cpu ?? 0).toFixed(2)), memory: stats[pid]?.memory ?? 0, // bytes — let the UI format it timestamp: new Date(), })); this._latestMetrics = metrics; if (this.onMetricsUpdate) { this.onMetricsUpdate(metrics); } } // ─── Helpers ─────────────────────────────────────────────────────────────── private invalidateDeadPids(error: any) { // pidusage throws with the bad PID in the message — reset it so we re-search const msg = String(error); for (const [service, pid] of Object.entries(this.pidMap)) { if (pid !== null && msg.includes(String(pid)) && service !== 'NestJS') { this.logger.warn(`⚠️ PID ${pid} (${service}) appears dead — resetting`); this.pidMap[service] = null; } } } }