Dr-Swopt před 4 dny
rodič
revize
bf790c7c11

+ 1 - 0
src/main.ts

@@ -1,3 +1,4 @@
+import 'dotenv/config';
 import { NestFactory } from '@nestjs/core';
 import { NestFactory } from '@nestjs/core';
 import { AppModule } from './app.module';
 import { AppModule } from './app.module';
 
 

+ 53 - 0
src/palm-oil/vision.gateway.ts

@@ -37,6 +37,15 @@ interface VisionStreamPayload {
   sourceLabel?: string;
   sourceLabel?: string;
 }
 }
 
 
+// ─── Payload shape Angular sends on chat:send ─────────────────────────────────
+interface ChatPayload {
+  /** User's chat message */
+  message: string;
+}
+
+// N8N webhook URL from .env (loaded via dotenv/config in main.ts)
+const N8N_WEBHOOK_URL = process.env['N8N_WEBHOOK_URL'] ?? '';
+
 @WebSocketGateway({
 @WebSocketGateway({
   cors: { origin: '*' },
   cors: { origin: '*' },
   namespace: '/vision',
   namespace: '/vision',
@@ -124,4 +133,48 @@ export class VisionGateway
     // ── STEP 4 — emit AFTER the SQLite write is confirmed ────────────────────
     // ── STEP 4 — emit AFTER the SQLite write is confirmed ────────────────────
     client.emit('vision:result', result);
     client.emit('vision:result', result);
   }
   }
+
+  // ─── chat:send handler ──────────────────────────────────────────────────────
+
+  /**
+   * Lego 03 / Lego 06 — RAG Chat Proxy (CORS Killshot)
+   *
+   * Receives the user's message from Angular and forwards it to n8n via a
+   * server-to-server HTTP POST — no browser CORS restrictions apply.
+   * The webhook URL is read from .env (N8N_WEBHOOK_URL) so the Angular client
+   * never needs to store or send the URL itself.
+   *
+   * Response is emitted back to the requesting client via chat:result.
+   */
+  @SubscribeMessage('chat:send')
+  async handleChat(
+    @MessageBody() data: ChatPayload,
+    @ConnectedSocket() client: Socket,
+  ): Promise<void> {
+    if (!data?.message?.trim()) {
+      client.emit('chat:error', { message: 'Empty message received' });
+      return;
+    }
+
+    if (!N8N_WEBHOOK_URL) {
+      client.emit('chat:error', { message: 'N8N_WEBHOOK_URL not configured in .env' });
+      return;
+    }
+
+    this.logger.log(`💬 chat:send from ${client.id} — proxying to n8n`);
+
+    try {
+      const response = await fetch(N8N_WEBHOOK_URL, {
+        method: 'POST',
+        headers: { 'Content-Type': 'application/json' },
+        body: JSON.stringify({ chatInput: data.message }),
+      });
+
+      const result = await response.json();
+      client.emit('chat:result', result);
+    } catch (err: any) {
+      this.logger.error(`❌ n8n proxy failed: ${err.message}`);
+      client.emit('chat:error', { message: `n8n proxy failed: ${err.message}` });
+    }
+  }
 }
 }

+ 46 - 2
src/surveillance/surveillance.service.ts

@@ -4,9 +4,14 @@
  * Discovers the PIDs for n8n (node), ollama_llama_server, and the NestJS
  * Discovers the PIDs for n8n (node), ollama_llama_server, and the NestJS
  * process itself. Polls every 500ms via pidusage and exposes the live
  * process itself. Polls every 500ms via pidusage and exposes the live
  * metrics payload so the Gateway can broadcast monitor:data to all clients.
  * metrics payload so the Gateway can broadcast monitor:data to all clients.
+ *
+ * Port-Level Heartbeat (added): n8n is only included in metrics when both
+ * a PID is found AND port 5678 is actively accepting TCP connections.
+ * This prevents a stale/zombie n8n process from showing as "online".
  */
  */
 
 
 import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
 import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
+import * as net from 'net';
 import * as pidusage from 'pidusage';
 import * as pidusage from 'pidusage';
 import find from 'find-process';
 import find from 'find-process';
 
 
@@ -60,11 +65,34 @@ export class SurveillanceService implements OnModuleInit, OnModuleDestroy {
     return this._latestMetrics;
     return this._latestMetrics;
   }
   }
 
 
+  // ─── Port-Level Heartbeat ──────────────────────────────────────────────────
+
+  /**
+   * Opens a short-lived TCP connection to host:port.
+   * Resolves true if the port accepts the connection within 800 ms.
+   * Resolves false on ECONNREFUSED, timeout, or any error.
+   */
+  private portListening(host: string, port: number): Promise<boolean> {
+    return new Promise((resolve) => {
+      const socket = new net.Socket();
+      const done = (result: boolean) => {
+        socket.destroy();
+        resolve(result);
+      };
+      socket.setTimeout(800);
+      socket.once('connect', () => done(true));
+      socket.once('timeout', () => done(false));
+      socket.once('error', () => done(false));
+      socket.connect(port, host);
+    });
+  }
+
   // ─── PID Discovery ─────────────────────────────────────────────────────────
   // ─── PID Discovery ─────────────────────────────────────────────────────────
 
 
   /**
   /**
    * Uses find-process to locate n8n and Ollama PIDs.
    * Uses find-process to locate n8n and Ollama PIDs.
    * Called once at boot and retried on each tick for any service still null.
    * Called once at boot and retried on each tick for any service still null.
+   * n8n PID is only accepted if port 5678 is also actively listening.
    */
    */
   private async discoverPids() {
   private async discoverPids() {
     // --- n8n: a Node.js process whose command line contains "n8n" ---
     // --- n8n: a Node.js process whose command line contains "n8n" ---
@@ -75,8 +103,14 @@ export class SurveillanceService implements OnModuleInit, OnModuleDestroy {
           (p: any) => p.cmd && p.cmd.toLowerCase().includes('n8n'),
           (p: any) => p.cmd && p.cmd.toLowerCase().includes('n8n'),
         );
         );
         if (n8nProc) {
         if (n8nProc) {
-          this.pidMap['n8n'] = n8nProc.pid;
-          this.logger.log(`🔍 n8n PID discovered: ${n8nProc.pid}`);
+          // Port-level guard: only accept the PID if n8n is actually serving
+          const portUp = await this.portListening('127.0.0.1', 5678);
+          if (portUp) {
+            this.pidMap['n8n'] = n8nProc.pid;
+            this.logger.log(`🔍 n8n PID discovered: ${n8nProc.pid} (port 5678 ✓)`);
+          } else {
+            this.logger.warn(`⚠️ n8n PID ${n8nProc.pid} found but port 5678 is not listening — treating as offline`);
+          }
         }
         }
       } catch (e) {
       } catch (e) {
         // n8n not running yet — will retry next tick
         // n8n not running yet — will retry next tick
@@ -105,6 +139,16 @@ export class SurveillanceService implements OnModuleInit, OnModuleDestroy {
   // ─── Poll Tick ─────────────────────────────────────────────────────────────
   // ─── Poll Tick ─────────────────────────────────────────────────────────────
 
 
   private async tick() {
   private async tick() {
+    // Port re-verification: if n8n PID is known, confirm port 5678 is still up.
+    // A zombie n8n process (PID alive, port dead) is evicted so discovery retries.
+    if (this.pidMap['n8n'] !== null) {
+      const portUp = await this.portListening('127.0.0.1', 5678);
+      if (!portUp) {
+        this.logger.warn(`⚠️ n8n port 5678 went silent — evicting PID ${this.pidMap['n8n']}`);
+        this.pidMap['n8n'] = null;
+      }
+    }
+
     // Retry PID discovery for any services still unknown
     // Retry PID discovery for any services still unknown
     const hasMissingPids = Object.values(this.pidMap).some((v) => v === null);
     const hasMissingPids = Object.values(this.pidMap).some((v) => v === null);
     if (hasMissingPids) await this.discoverPids();
     if (hasMissingPids) await this.discoverPids();