surveillance.service.ts 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. /**
  2. * Lego 09 — Surveillance Engine (PID Polling & Socket Stream)
  3. *
  4. * Discovers the PIDs for n8n (node), ollama_llama_server, and the NestJS
  5. * process itself. Polls every 500ms via pidusage and exposes the live
  6. * metrics payload so the Gateway can broadcast monitor:data to all clients.
  7. *
  8. * Port-Level Heartbeat: n8n is only included in metrics when both a PID is
  9. * found AND port 5678 is actively accepting TCP connections.
  10. *
  11. * Webhook Probe: every 10 s, NestJS attempts a real HTTP POST to N8N_WEBHOOK_URL
  12. * with a ping payload. If n8n responds (any non-network-error), the agent is
  13. * considered ready. This status is broadcast via monitor:status so the UI can
  14. * show "Agent Ready / Not Ready" independently of process metrics.
  15. */
  16. import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
  17. import * as net from 'net';
  18. import * as pidusage from 'pidusage';
  19. import find from 'find-process';
  20. export interface MonitorPayload {
  21. service: string;
  22. pid: number;
  23. cpu: number;
  24. memory: number;
  25. timestamp: Date;
  26. }
  27. export interface MonitorStatus {
  28. n8nWebhookReady: boolean;
  29. timestamp: Date;
  30. }
  31. @Injectable()
  32. export class SurveillanceService implements OnModuleInit, OnModuleDestroy {
  33. private readonly logger = new Logger(SurveillanceService.name);
  34. // Live metrics — the Gateway reads these on every tick
  35. private _latestMetrics: MonitorPayload[] = [];
  36. // Callback registered by the Gateway so it receives every poll result
  37. private onMetricsUpdate: ((metrics: MonitorPayload[]) => void) | null = null;
  38. // Callback for webhook probe results
  39. private onStatusUpdate: ((status: MonitorStatus) => void) | null = null;
  40. private pollInterval: NodeJS.Timeout | null = null;
  41. private webhookProbeInterval: NodeJS.Timeout | null = null;
  42. // Cached result of the last webhook probe
  43. private _n8nWebhookReady = false;
  44. private readonly n8nWebhookUrl = process.env['N8N_WEBHOOK_URL'] ?? '';
  45. // Tracked PIDs: resolved once and re-used (re-discovered when null)
  46. private pidMap: Record<string, number | null> = {
  47. NestJS: process.pid, // Always known immediately
  48. n8n: null,
  49. Ollama: null,
  50. };
  51. // ─── Lifecycle ─────────────────────────────────────────────────────────────
  52. async onModuleInit() {
  53. this.logger.log('🟢 SurveillanceService booting — starting 500ms PID poll loop');
  54. await this.discoverPids();
  55. this.pollInterval = setInterval(() => this.tick(), 500);
  56. // Probe the n8n webhook immediately, then every 10 s
  57. await this.probeWebhook();
  58. this.webhookProbeInterval = setInterval(() => this.probeWebhook(), 10_000);
  59. }
  60. onModuleDestroy() {
  61. if (this.pollInterval) clearInterval(this.pollInterval);
  62. if (this.webhookProbeInterval) clearInterval(this.webhookProbeInterval);
  63. this.logger.log('🔴 SurveillanceService stopped');
  64. }
  65. // ─── Public API ────────────────────────────────────────────────────────────
  66. registerMetricsCallback(cb: (metrics: MonitorPayload[]) => void) {
  67. this.onMetricsUpdate = cb;
  68. }
  69. registerStatusCallback(cb: (status: MonitorStatus) => void) {
  70. this.onStatusUpdate = cb;
  71. }
  72. getLatestMetrics(): MonitorPayload[] {
  73. return this._latestMetrics;
  74. }
  75. getLatestStatus(): MonitorStatus {
  76. return { n8nWebhookReady: this._n8nWebhookReady, timestamp: new Date() };
  77. }
  78. // ─── Port-Level Heartbeat ──────────────────────────────────────────────────
  79. /**
  80. * Opens a short-lived TCP connection to host:port.
  81. * Resolves true if the port accepts the connection within 800 ms.
  82. * Resolves false on ECONNREFUSED, timeout, or any error.
  83. */
  84. private portListening(host: string, port: number): Promise<boolean> {
  85. return new Promise((resolve) => {
  86. const socket = new net.Socket();
  87. const done = (result: boolean) => {
  88. socket.destroy();
  89. resolve(result);
  90. };
  91. socket.setTimeout(800);
  92. socket.once('connect', () => done(true));
  93. socket.once('timeout', () => done(false));
  94. socket.once('error', () => done(false));
  95. socket.connect(port, host);
  96. });
  97. }
  98. // ─── Webhook Probe ─────────────────────────────────────────────────────────
  99. /**
  100. * POSTs a lightweight ping to the n8n webhook URL.
  101. * Any HTTP response (even 4xx) means the webhook is registered and n8n is
  102. * accepting requests — we treat that as "ready".
  103. * A network error (ECONNREFUSED, timeout, DNS failure) means "not ready".
  104. */
  105. private async probeWebhook(): Promise<void> {
  106. if (!this.n8nWebhookUrl) {
  107. if (this._n8nWebhookReady) {
  108. this._n8nWebhookReady = false;
  109. this.emitStatus();
  110. }
  111. return;
  112. }
  113. try {
  114. const controller = new AbortController();
  115. const timeout = setTimeout(() => controller.abort(), 3_000);
  116. const res = await fetch(this.n8nWebhookUrl, {
  117. method: 'POST',
  118. headers: { 'Content-Type': 'application/json' },
  119. body: JSON.stringify({ __ping: true }),
  120. signal: controller.signal,
  121. });
  122. clearTimeout(timeout);
  123. const ready = res.status < 500; // 2xx/3xx/4xx all mean n8n responded
  124. if (ready !== this._n8nWebhookReady) {
  125. this._n8nWebhookReady = ready;
  126. this.logger.log(`🔗 n8n webhook probe: ${ready ? '✅ ready' : '⚠️ 5xx'}`);
  127. this.emitStatus();
  128. }
  129. } catch {
  130. if (this._n8nWebhookReady) {
  131. this._n8nWebhookReady = false;
  132. this.logger.warn('⚠️ n8n webhook unreachable — agent marked NOT READY');
  133. this.emitStatus();
  134. }
  135. }
  136. }
  137. private emitStatus(): void {
  138. if (this.onStatusUpdate) {
  139. this.onStatusUpdate({ n8nWebhookReady: this._n8nWebhookReady, timestamp: new Date() });
  140. }
  141. }
  142. // ─── PID Discovery ─────────────────────────────────────────────────────────
  143. /**
  144. * Uses find-process to locate n8n and Ollama PIDs.
  145. * Called once at boot and retried on each tick for any service still null.
  146. * n8n PID is only accepted if port 5678 is also actively listening.
  147. */
  148. private async discoverPids() {
  149. // --- n8n: a Node.js process whose command line contains "n8n" ---
  150. if (!this.pidMap['n8n']) {
  151. try {
  152. const nodeProcs = await find('name', 'node', true);
  153. const n8nProc = nodeProcs.find(
  154. (p: any) => p.cmd && p.cmd.toLowerCase().includes('n8n'),
  155. );
  156. if (n8nProc) {
  157. // Port-level guard: only accept the PID if n8n is actually serving
  158. const portUp = await this.portListening('127.0.0.1', 5678);
  159. if (portUp) {
  160. this.pidMap['n8n'] = n8nProc.pid;
  161. this.logger.log(`🔍 n8n PID discovered: ${n8nProc.pid} (port 5678 ✓)`);
  162. } else {
  163. this.logger.warn(`⚠️ n8n PID ${n8nProc.pid} found but port 5678 is not listening — treating as offline`);
  164. }
  165. }
  166. } catch (e) {
  167. // n8n not running yet — will retry next tick
  168. }
  169. }
  170. // --- Ollama: match the server child process by executable name ---
  171. if (!this.pidMap['Ollama']) {
  172. try {
  173. // On Windows the process is "ollama_llama_server.exe"; on Linux "ollama"
  174. const ollamaNames = ['ollama_llama_server', 'ollama'];
  175. for (const name of ollamaNames) {
  176. const procs = await find('name', name, true);
  177. if (procs.length > 0) {
  178. this.pidMap['Ollama'] = procs[0].pid;
  179. this.logger.log(`🔍 Ollama PID discovered: ${procs[0].pid} (${name})`);
  180. break;
  181. }
  182. }
  183. } catch (e) {
  184. // Ollama not running yet — will retry next tick
  185. }
  186. }
  187. }
  188. // ─── Poll Tick ─────────────────────────────────────────────────────────────
  189. private async tick() {
  190. // Port re-verification: if n8n PID is known, confirm port 5678 is still up.
  191. // A zombie n8n process (PID alive, port dead) is evicted so discovery retries.
  192. if (this.pidMap['n8n'] !== null) {
  193. const portUp = await this.portListening('127.0.0.1', 5678);
  194. if (!portUp) {
  195. this.logger.warn(`⚠️ n8n port 5678 went silent — evicting PID ${this.pidMap['n8n']}`);
  196. this.pidMap['n8n'] = null;
  197. }
  198. }
  199. // Retry PID discovery for any services still unknown
  200. const hasMissingPids = Object.values(this.pidMap).some((v) => v === null);
  201. if (hasMissingPids) await this.discoverPids();
  202. const activePids = Object.entries(this.pidMap).filter(
  203. ([, pid]) => pid !== null,
  204. ) as [string, number][];
  205. if (activePids.length === 0) return;
  206. const pidsToQuery = activePids.map(([, pid]) => pid);
  207. let stats: Record<number, pidusage.Status>;
  208. try {
  209. stats = await pidusage(pidsToQuery);
  210. } catch (e) {
  211. // A process may have died — invalidate its PID so it gets re-discovered
  212. this.invalidateDeadPids(e);
  213. return;
  214. }
  215. const metrics: MonitorPayload[] = activePids.map(([service, pid]) => ({
  216. service,
  217. pid,
  218. cpu: parseFloat((stats[pid]?.cpu ?? 0).toFixed(2)),
  219. memory: stats[pid]?.memory ?? 0, // bytes — let the UI format it
  220. timestamp: new Date(),
  221. }));
  222. this._latestMetrics = metrics;
  223. if (this.onMetricsUpdate) {
  224. this.onMetricsUpdate(metrics);
  225. }
  226. }
  227. // ─── Helpers ───────────────────────────────────────────────────────────────
  228. private invalidateDeadPids(error: any) {
  229. // pidusage throws with the bad PID in the message — reset it so we re-search
  230. const msg = String(error);
  231. for (const [service, pid] of Object.entries(this.pidMap)) {
  232. if (pid !== null && msg.includes(String(pid)) && service !== 'NestJS') {
  233. this.logger.warn(`⚠️ PID ${pid} (${service}) appears dead — resetting`);
  234. this.pidMap[service] = null;
  235. }
  236. }
  237. }
  238. }