| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276 |
- /**
- * Lego 09 — Surveillance Engine (PID Polling & Socket Stream)
- *
- * Discovers the PIDs for n8n (node), ollama_llama_server, and the NestJS
- * process itself. Polls every 500ms via pidusage and exposes the live
- * metrics payload so the Gateway can broadcast monitor:data to all clients.
- *
- * Port-Level Heartbeat: n8n is only included in metrics when both a PID is
- * found AND port 5678 is actively accepting TCP connections.
- *
- * Webhook Probe: every 10 s, NestJS attempts a real HTTP POST to N8N_WEBHOOK_URL
- * with a ping payload. If n8n responds (any non-network-error), the agent is
- * considered ready. This status is broadcast via monitor:status so the UI can
- * show "Agent Ready / Not Ready" independently of process metrics.
- */
- import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
- import * as net from 'net';
- import * as pidusage from 'pidusage';
- import find from 'find-process';
- export interface MonitorPayload {
- service: string;
- pid: number;
- cpu: number;
- memory: number;
- timestamp: Date;
- }
- export interface MonitorStatus {
- n8nWebhookReady: boolean;
- timestamp: Date;
- }
- @Injectable()
- export class SurveillanceService implements OnModuleInit, OnModuleDestroy {
- private readonly logger = new Logger(SurveillanceService.name);
- // Live metrics — the Gateway reads these on every tick
- private _latestMetrics: MonitorPayload[] = [];
- // Callback registered by the Gateway so it receives every poll result
- private onMetricsUpdate: ((metrics: MonitorPayload[]) => void) | null = null;
- // Callback for webhook probe results
- private onStatusUpdate: ((status: MonitorStatus) => void) | null = null;
- private pollInterval: NodeJS.Timeout | null = null;
- private webhookProbeInterval: NodeJS.Timeout | null = null;
- // Cached result of the last webhook probe
- private _n8nWebhookReady = false;
- private readonly n8nWebhookUrl = process.env['N8N_WEBHOOK_URL'] ?? '';
- // Tracked PIDs: resolved once and re-used (re-discovered when null)
- private pidMap: Record<string, number | null> = {
- NestJS: process.pid, // Always known immediately
- n8n: null,
- Ollama: null,
- };
- // ─── Lifecycle ─────────────────────────────────────────────────────────────
- async onModuleInit() {
- this.logger.log('🟢 SurveillanceService booting — starting 500ms PID poll loop');
- await this.discoverPids();
- this.pollInterval = setInterval(() => this.tick(), 500);
- // Probe the n8n webhook immediately, then every 10 s
- await this.probeWebhook();
- this.webhookProbeInterval = setInterval(() => this.probeWebhook(), 10_000);
- }
- onModuleDestroy() {
- if (this.pollInterval) clearInterval(this.pollInterval);
- if (this.webhookProbeInterval) clearInterval(this.webhookProbeInterval);
- this.logger.log('🔴 SurveillanceService stopped');
- }
- // ─── Public API ────────────────────────────────────────────────────────────
- registerMetricsCallback(cb: (metrics: MonitorPayload[]) => void) {
- this.onMetricsUpdate = cb;
- }
- registerStatusCallback(cb: (status: MonitorStatus) => void) {
- this.onStatusUpdate = cb;
- }
- getLatestMetrics(): MonitorPayload[] {
- return this._latestMetrics;
- }
- getLatestStatus(): MonitorStatus {
- return { n8nWebhookReady: this._n8nWebhookReady, timestamp: new Date() };
- }
- // ─── Port-Level Heartbeat ──────────────────────────────────────────────────
- /**
- * Opens a short-lived TCP connection to host:port.
- * Resolves true if the port accepts the connection within 800 ms.
- * Resolves false on ECONNREFUSED, timeout, or any error.
- */
- private portListening(host: string, port: number): Promise<boolean> {
- return new Promise((resolve) => {
- const socket = new net.Socket();
- const done = (result: boolean) => {
- socket.destroy();
- resolve(result);
- };
- socket.setTimeout(800);
- socket.once('connect', () => done(true));
- socket.once('timeout', () => done(false));
- socket.once('error', () => done(false));
- socket.connect(port, host);
- });
- }
- // ─── Webhook Probe ─────────────────────────────────────────────────────────
- /**
- * POSTs a lightweight ping to the n8n webhook URL.
- * Any HTTP response (even 4xx) means the webhook is registered and n8n is
- * accepting requests — we treat that as "ready".
- * A network error (ECONNREFUSED, timeout, DNS failure) means "not ready".
- */
- private async probeWebhook(): Promise<void> {
- if (!this.n8nWebhookUrl) {
- if (this._n8nWebhookReady) {
- this._n8nWebhookReady = false;
- this.emitStatus();
- }
- return;
- }
- try {
- const controller = new AbortController();
- const timeout = setTimeout(() => controller.abort(), 3_000);
- const res = await fetch(this.n8nWebhookUrl, {
- method: 'POST',
- headers: { 'Content-Type': 'application/json' },
- body: JSON.stringify({ __ping: true }),
- signal: controller.signal,
- });
- clearTimeout(timeout);
- const ready = res.status < 500; // 2xx/3xx/4xx all mean n8n responded
- if (ready !== this._n8nWebhookReady) {
- this._n8nWebhookReady = ready;
- this.logger.log(`🔗 n8n webhook probe: ${ready ? '✅ ready' : '⚠️ 5xx'}`);
- this.emitStatus();
- }
- } catch {
- if (this._n8nWebhookReady) {
- this._n8nWebhookReady = false;
- this.logger.warn('⚠️ n8n webhook unreachable — agent marked NOT READY');
- this.emitStatus();
- }
- }
- }
- private emitStatus(): void {
- if (this.onStatusUpdate) {
- this.onStatusUpdate({ n8nWebhookReady: this._n8nWebhookReady, timestamp: new Date() });
- }
- }
- // ─── PID Discovery ─────────────────────────────────────────────────────────
- /**
- * Uses find-process to locate n8n and Ollama PIDs.
- * Called once at boot and retried on each tick for any service still null.
- * n8n PID is only accepted if port 5678 is also actively listening.
- */
- private async discoverPids() {
- // --- n8n: a Node.js process whose command line contains "n8n" ---
- if (!this.pidMap['n8n']) {
- try {
- const nodeProcs = await find('name', 'node', true);
- const n8nProc = nodeProcs.find(
- (p: any) => p.cmd && p.cmd.toLowerCase().includes('n8n'),
- );
- if (n8nProc) {
- // Port-level guard: only accept the PID if n8n is actually serving
- const portUp = await this.portListening('127.0.0.1', 5678);
- if (portUp) {
- this.pidMap['n8n'] = n8nProc.pid;
- this.logger.log(`🔍 n8n PID discovered: ${n8nProc.pid} (port 5678 ✓)`);
- } else {
- this.logger.warn(`⚠️ n8n PID ${n8nProc.pid} found but port 5678 is not listening — treating as offline`);
- }
- }
- } catch (e) {
- // n8n not running yet — will retry next tick
- }
- }
- // --- Ollama: match the server child process by executable name ---
- if (!this.pidMap['Ollama']) {
- try {
- // On Windows the process is "ollama_llama_server.exe"; on Linux "ollama"
- const ollamaNames = ['ollama_llama_server', 'ollama'];
- for (const name of ollamaNames) {
- const procs = await find('name', name, true);
- if (procs.length > 0) {
- this.pidMap['Ollama'] = procs[0].pid;
- this.logger.log(`🔍 Ollama PID discovered: ${procs[0].pid} (${name})`);
- break;
- }
- }
- } catch (e) {
- // Ollama not running yet — will retry next tick
- }
- }
- }
- // ─── Poll Tick ─────────────────────────────────────────────────────────────
- private async tick() {
- // Port re-verification: if n8n PID is known, confirm port 5678 is still up.
- // A zombie n8n process (PID alive, port dead) is evicted so discovery retries.
- if (this.pidMap['n8n'] !== null) {
- const portUp = await this.portListening('127.0.0.1', 5678);
- if (!portUp) {
- this.logger.warn(`⚠️ n8n port 5678 went silent — evicting PID ${this.pidMap['n8n']}`);
- this.pidMap['n8n'] = null;
- }
- }
- // Retry PID discovery for any services still unknown
- const hasMissingPids = Object.values(this.pidMap).some((v) => v === null);
- if (hasMissingPids) await this.discoverPids();
- const activePids = Object.entries(this.pidMap).filter(
- ([, pid]) => pid !== null,
- ) as [string, number][];
- if (activePids.length === 0) return;
- const pidsToQuery = activePids.map(([, pid]) => pid);
- let stats: Record<number, pidusage.Status>;
- try {
- stats = await pidusage(pidsToQuery);
- } catch (e) {
- // A process may have died — invalidate its PID so it gets re-discovered
- this.invalidateDeadPids(e);
- return;
- }
- const metrics: MonitorPayload[] = activePids.map(([service, pid]) => ({
- service,
- pid,
- cpu: parseFloat((stats[pid]?.cpu ?? 0).toFixed(2)),
- memory: stats[pid]?.memory ?? 0, // bytes — let the UI format it
- timestamp: new Date(),
- }));
- this._latestMetrics = metrics;
- if (this.onMetricsUpdate) {
- this.onMetricsUpdate(metrics);
- }
- }
- // ─── Helpers ───────────────────────────────────────────────────────────────
- private invalidateDeadPids(error: any) {
- // pidusage throws with the bad PID in the message — reset it so we re-search
- const msg = String(error);
- for (const [service, pid] of Object.entries(this.pidMap)) {
- if (pid !== null && msg.includes(String(pid)) && service !== 'NestJS') {
- this.logger.warn(`⚠️ PID ${pid} (${service}) appears dead — resetting`);
- this.pidMap[service] = null;
- }
- }
- }
- }
|