Pārlūkot izejas kodu

Add webhook probe functionality and status broadcasting to SurveillanceService

Dr-Swopt 3 dienas atpakaļ
vecāks
revīzija
0ca0b708e7

+ 4 - 1
src/palm-oil/vision.gateway.ts

@@ -170,7 +170,10 @@ export class VisionGateway
         body: JSON.stringify({ chatInput: data.message }),
       });
 
-      const result = await response.json();
+      const raw = await response.json();
+      // n8n often returns an array of results — unwrap to the first element so
+      // the frontend always receives a predictable object, never a bare array.
+      const result = Array.isArray(raw) ? raw[0] : raw;
       client.emit('chat:result', result);
     } catch (err: any) {
       this.logger.error(`❌ n8n proxy failed: ${err.message}`);

+ 7 - 1
src/surveillance/surveillance.gateway.ts

@@ -17,7 +17,7 @@ import {
 } from '@nestjs/websockets';
 import { Logger, OnModuleInit } from '@nestjs/common';
 import { Server, Socket } from 'socket.io';
-import { SurveillanceService, MonitorPayload } from './surveillance.service';
+import { SurveillanceService, MonitorPayload, MonitorStatus } from './surveillance.service';
 
 @WebSocketGateway({
   cors: { origin: '*' },   // Angular dev server on any port
@@ -40,6 +40,10 @@ export class SurveillanceGateway
     this.surveillanceService.registerMetricsCallback((metrics: MonitorPayload[]) => {
       this.broadcast(metrics);
     });
+    // Wire webhook probe results → broadcast monitor:status to all clients
+    this.surveillanceService.registerStatusCallback((status: MonitorStatus) => {
+      if (this.server) this.server.emit('monitor:status', status);
+    });
   }
 
   afterInit(server: Server) {
@@ -53,6 +57,8 @@ export class SurveillanceGateway
     if (snapshot.length > 0) {
       client.emit('monitor:data', snapshot);
     }
+    // Push the latest webhook status immediately so the UI doesn't wait 10 s
+    client.emit('monitor:status', this.surveillanceService.getLatestStatus());
   }
 
   handleDisconnect(client: Socket) {

+ 79 - 3
src/surveillance/surveillance.service.ts

@@ -5,9 +5,13 @@
  * process itself. Polls every 500ms via pidusage and exposes the live
  * metrics payload so the Gateway can broadcast monitor:data to all clients.
  *
- * Port-Level Heartbeat (added): n8n is only included in metrics when both
- * a PID is found AND port 5678 is actively accepting TCP connections.
- * This prevents a stale/zombie n8n process from showing as "online".
+ * Port-Level Heartbeat: n8n is only included in metrics when both a PID is
+ * found AND port 5678 is actively accepting TCP connections.
+ *
+ * Webhook Probe: every 10 s, NestJS attempts a real HTTP POST to N8N_WEBHOOK_URL
+ * with a ping payload. If n8n responds (any non-network-error), the agent is
+ * considered ready. This status is broadcast via monitor:status so the UI can
+ * show "Agent Ready / Not Ready" independently of process metrics.
  */
 
 import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
@@ -23,6 +27,11 @@ export interface MonitorPayload {
   timestamp: Date;
 }
 
+export interface MonitorStatus {
+  n8nWebhookReady: boolean;
+  timestamp: Date;
+}
+
 @Injectable()
 export class SurveillanceService implements OnModuleInit, OnModuleDestroy {
   private readonly logger = new Logger(SurveillanceService.name);
@@ -32,8 +41,15 @@ export class SurveillanceService implements OnModuleInit, OnModuleDestroy {
 
   // Callback registered by the Gateway so it receives every poll result
   private onMetricsUpdate: ((metrics: MonitorPayload[]) => void) | null = null;
+  // Callback for webhook probe results
+  private onStatusUpdate: ((status: MonitorStatus) => void) | null = null;
 
   private pollInterval: NodeJS.Timeout | null = null;
+  private webhookProbeInterval: NodeJS.Timeout | null = null;
+
+  // Cached result of the last webhook probe
+  private _n8nWebhookReady = false;
+  private readonly n8nWebhookUrl = process.env['N8N_WEBHOOK_URL'] ?? '';
 
   // Tracked PIDs: resolved once and re-used (re-discovered when null)
   private pidMap: Record<string, number | null> = {
@@ -48,10 +64,14 @@ export class SurveillanceService implements OnModuleInit, OnModuleDestroy {
     this.logger.log('🟢 SurveillanceService booting — starting 500ms PID poll loop');
     await this.discoverPids();
     this.pollInterval = setInterval(() => this.tick(), 500);
+    // Probe the n8n webhook immediately, then every 10 s
+    await this.probeWebhook();
+    this.webhookProbeInterval = setInterval(() => this.probeWebhook(), 10_000);
   }
 
   onModuleDestroy() {
     if (this.pollInterval) clearInterval(this.pollInterval);
+    if (this.webhookProbeInterval) clearInterval(this.webhookProbeInterval);
     this.logger.log('🔴 SurveillanceService stopped');
   }
 
@@ -61,10 +81,18 @@ export class SurveillanceService implements OnModuleInit, OnModuleDestroy {
     this.onMetricsUpdate = cb;
   }
 
+  registerStatusCallback(cb: (status: MonitorStatus) => void) {
+    this.onStatusUpdate = cb;
+  }
+
   getLatestMetrics(): MonitorPayload[] {
     return this._latestMetrics;
   }
 
+  getLatestStatus(): MonitorStatus {
+    return { n8nWebhookReady: this._n8nWebhookReady, timestamp: new Date() };
+  }
+
   // ─── Port-Level Heartbeat ──────────────────────────────────────────────────
 
   /**
@@ -87,6 +115,54 @@ export class SurveillanceService implements OnModuleInit, OnModuleDestroy {
     });
   }
 
+  // ─── Webhook Probe ─────────────────────────────────────────────────────────
+
+  /**
+   * POSTs a lightweight ping to the n8n webhook URL.
+   * Any HTTP response (even 4xx) means the webhook is registered and n8n is
+   * accepting requests — we treat that as "ready".
+   * A network error (ECONNREFUSED, timeout, DNS failure) means "not ready".
+   */
+  private async probeWebhook(): Promise<void> {
+    if (!this.n8nWebhookUrl) {
+      if (this._n8nWebhookReady) {
+        this._n8nWebhookReady = false;
+        this.emitStatus();
+      }
+      return;
+    }
+
+    try {
+      const controller = new AbortController();
+      const timeout = setTimeout(() => controller.abort(), 3_000);
+      const res = await fetch(this.n8nWebhookUrl, {
+        method: 'POST',
+        headers: { 'Content-Type': 'application/json' },
+        body: JSON.stringify({ __ping: true }),
+        signal: controller.signal,
+      });
+      clearTimeout(timeout);
+      const ready = res.status < 500; // 2xx/3xx/4xx all mean n8n responded
+      if (ready !== this._n8nWebhookReady) {
+        this._n8nWebhookReady = ready;
+        this.logger.log(`🔗 n8n webhook probe: ${ready ? '✅ ready' : '⚠️ 5xx'}`);
+        this.emitStatus();
+      }
+    } catch {
+      if (this._n8nWebhookReady) {
+        this._n8nWebhookReady = false;
+        this.logger.warn('⚠️ n8n webhook unreachable — agent marked NOT READY');
+        this.emitStatus();
+      }
+    }
+  }
+
+  private emitStatus(): void {
+    if (this.onStatusUpdate) {
+      this.onStatusUpdate({ n8nWebhookReady: this._n8nWebhookReady, timestamp: new Date() });
+    }
+  }
+
   // ─── PID Discovery ─────────────────────────────────────────────────────────
 
   /**