Ver Fonte

update to work with n8n RAG

Dr-Swopt há 9 horas atrás
pai
commit
1dbe2590fb

+ 67 - 19
CLAUDE.md

@@ -4,7 +4,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
 
 ## What This Is
 
-NestJS backend for the PalmOilAI system. Provides server-side YOLOv8 ONNX inference, SQLite history persistence, WebSocket gateways for real-time vision streaming and chat proxying, and a process surveillance monitor for n8n and Ollama.
+NestJS backend for the PalmOilAI system. Provides server-side YOLOv8 ONNX inference, SQLite history persistence (with disk image archiving), WebSocket gateways for real-time vision streaming and n8n chat proxying, and a process surveillance monitor for n8n and Ollama.
 
 Detection classes: `Ripe`, `Unripe`, `Underripe`, `Overripe`, `Abnormal`, `Empty_Bunch` (MPOB standard).
 
@@ -24,32 +24,80 @@ npm run test:e2e                # End-to-end tests
 jest --testPathPattern=palm-oil # Run a single test file
 ```
 
-Set `PORT` env var to override the default port 3000. Set `N8N_WEBHOOK_URL` in `.env` for chat proxy and agent readiness probing.
+Set `PORT` env var to override port 3000. Set `N8N_WEBHOOK_URL` in `.env` for chat proxy and agent readiness probing.
 
 ## Architecture
 
 ### Modules
 
-**`PalmOilModule`** (`src/palm-oil/`) — Core feature module:
-- `ScannerProvider` — ONNX inference pipeline: `sharp` resizes to 640×640, strips alpha, transposes HWC→CHW, normalizes to `[0.0, 1.0]`, feeds `[1, 3, 640, 640]` tensor to `onnxruntime-node`. Output is `[1, N, 6]` (`x1, y1, x2, y2, confidence, class_index`), filtered at 0.25 confidence by default.
-- `PalmOilService` — Orchestrates inference + SQLite persistence. `analyzeImage()` is the main entry point; it fully awaits the `historyRepository.save()` before returning (intentional blocking — guarantees DB write before socket emit).
-- `VisionGateway` — WebSocket gateway on `/vision` namespace. Handles `vision:analyze` (Base64 image → ONNX inference → `vision:result`) and `chat:send` (proxies to n8n webhook → `chat:result`). Receives raw, uncompressed Base64 strings only — no binary frames, no WebRTC.
-- `HistoryEntity` — TypeORM entity; fields: `archive_id`, `filename`, `total_count`, `industrial_summary` (JSON), `detections` (JSON), `inference_ms`, `processing_ms`, `created_at`.
-- `mpob-standards.ts` — Source of truth for class names, grade colors, and health alert flag list (`Abnormal`, `Empty_Bunch`).
+**`PalmOilModule`** (`src/palm-oil/`)
 
-**`SurveillanceModule`** (`src/surveillance/`) — Process monitoring module:
-- `SurveillanceService` — Boots on `OnModuleInit`. Discovers PIDs for n8n (Node.js process containing "n8n" in cmd, port 5678 must be accepting TCP connections) and Ollama (`ollama_llama_server` or `ollama`). Polls every 500ms via `pidusage`. Probes `N8N_WEBHOOK_URL` every 10s to determine agent readiness. PIDs are evicted if the process dies or n8n's port goes silent.
-- `SurveillanceGateway` — WebSocket gateway on `/monitor` namespace. Broadcasts `monitor:data` (CPU/memory metrics) on every 500ms tick and `monitor:status` (n8n webhook ready/not ready) when status changes. Pushes an immediate snapshot on client connect.
+**`ScannerProvider`** — ONNX inference engine loaded on `OnModuleInit`:
+- `preprocess(buffer)` — `sharp` resize to 640×640, strip alpha, convert HWC→CHW, normalize to `[0.0, 1.0]`, output `[1, 3, 640, 640]` tensor
+- `inference(tensor)` — runs `onnxruntime-node` session
+- `postprocess(tensor, origW, origH, threshold=0.25)` — output is `[1, N, 6]` (`x1, y1, x2, y2, confidence, class_index`):
+  - Captures first 5 raw rows as `raw_tensor_sample` for technical evidence
+  - Filters by confidence threshold
+  - Scales coords to original image dimensions
+  - Maps `class_index` → `MPOB_CLASSES`
+  - Sets `is_health_alert` for `Abnormal` | `Empty_Bunch`
+
+**`PalmOilService`** — Orchestrates inference + persistence. `analyzeImage(imageBuffer, filename, batchId?)` pipeline:
+1. Preprocess via `ScannerProvider`
+2. Inference with timing (`inference_ms`, `processing_ms`)
+3. Postprocess → detections + `raw_tensor_sample`
+4. Save image buffer to `archive/` directory on disk
+5. Persist to SQLite (`HistoryEntity`) — **fully awaited before returning** (guarantees DB write before socket emit)
+6. Returns `AnalysisResponse` with Base64 image, `archive_id`, full `technical_evidence` block
+
+Additional methods: `getHistory()` (last 50 records), `getRecordByArchiveId()`, `deleteRecord()` (removes DB row + disk image), `clearAllHistory()` (removes all records + disk images).
+
+**`VisionGateway`** — WebSocket gateway on `/vision` namespace:
+- `vision:analyze` handler: receives `VisionStreamPayload` (`frame: string, sourceLabel?, batchId?`), strips data-URI prefix, decodes Base64 → Buffer, calls `PalmOilService.analyzeImage()`, emits `vision:result` or `vision:error`
+- `chat:send` handler: receives `ChatPayload`, POSTs to `N8N_WEBHOOK_URL` server-to-server (bypasses browser CORS), unwraps array response to first element, emits `chat:result` or `chat:error`
+- Hard rule: accepts raw, uncompressed Base64 strings only — no binary frames, no WebRTC
+
+**`HistoryEntity`** — TypeORM entity fields:
+- `id` (PK auto), `archive_id` (unique), `batch_id` (nullable, indexed), `filename`
+- `total_count`, `industrial_summary` (simple-json), `detections` (simple-json array)
+- `inference_ms`, `processing_ms`, `image_path` (disk reference)
+- `created_at` (auto timestamp)
+
+**`mpob-standards.ts`** (`src/palm-oil/constants/`) — Source of truth:
+- `MPOB_CLASSES`: index 0–5 → class name strings
+- `GRADE_COLORS`: class name → hex color
+- `HEALTH_ALERT_CLASSES`: `['Abnormal', 'Empty_Bunch']`
+
+---
+
+**`SurveillanceModule`** (`src/surveillance/`)
+
+**`SurveillanceService`** — Boots on `OnModuleInit`, runs two background loops:
+- **500ms poll loop**: discovers PIDs for n8n (Node.js process containing "n8n" in cmd) and Ollama (`ollama_llama_server` or `ollama`), then calls `pidusage` for CPU/memory metrics
+- **Port-level heartbeat**: n8n PID only included if port 5678 actively accepts TCP connections (800ms timeout); evicts PID if port goes silent or process dies
+- Exposes: `getLatestMetrics()`, callback registration for tick events
+
+**`SurveillanceGateway`** — WebSocket gateway on `/monitor` namespace:
+- Wires `SurveillanceService` callbacks: every 500ms tick broadcasts `monitor:data` to all clients
+- On client connect: immediately pushes latest snapshot (no blank load screen)
+- `monitor:subscribe` handler: acknowledges and re-sends current snapshot
 
 ### Database
-SQLite file `palm_history.db` in the project root. Managed by TypeORM with `synchronize: true` (auto-creates tables — dev only). No external DB setup required.
+SQLite file `palm_history.db` in the project root. Managed by TypeORM with `synchronize: true` (auto-creates/migrates tables — dev only). No external DB setup required.
 
-### Required File
-`best.onnx` must be placed in the **project root directory** (not `src/`). This is the YOLOv8 ONNX model loaded by `ScannerProvider` at inference time.
+### Required Files
+- `best.onnx` — YOLOv8 ONNX model, must be placed in the **project root** (not `src/`). Loaded by `ScannerProvider` at startup.
+- `archive/` directory — created automatically by `PalmOilService` for disk image storage.
+
+### Configuration (`.env`)
+```
+N8N_WEBHOOK_URL=<n8n webhook URL>
+PORT=3000   # optional
+```
 
 ### WebSocket Event Contracts
-| Namespace | Client → Server | Server → Client |
-|---|---|---|
-| `/vision` | `vision:analyze` `{ frame: string, sourceLabel?: string }` | `vision:result`, `vision:error` |
-| `/vision` | `chat:send` `{ message: string }` | `chat:result`, `chat:error` |
-| `/monitor` | `monitor:subscribe` | `monitor:data` (MonitorPayload[]), `monitor:status` |
+| Namespace | Client → Server | Payload | Server → Client |
+|---|---|---|---|
+| `/vision` | `vision:analyze` | `{ frame: string, sourceLabel?: string, batchId?: string }` | `vision:result`, `vision:error` |
+| `/vision` | `chat:send` | `{ message: string }` | `chat:result`, `chat:error` |
+| `/monitor` | `monitor:subscribe` | — | `monitor:data` (MonitorPayload[]) |

+ 15 - 3
src/palm-oil/vision.gateway.ts

@@ -14,6 +14,7 @@
  *   BEFORE vision:result is emitted to the client — intentional blocking.
  */
 
+import * as crypto from 'crypto';
 import {
   WebSocketGateway,
   WebSocketServer,
@@ -69,7 +70,8 @@ export class VisionGateway
   }
 
   handleConnection(client: Socket) {
-    this.logger.log(`📡 Vision client connected: ${client.id}`);
+    client.data.sessionId = crypto.randomUUID();
+    this.logger.log(`📡 Vision client connected: ${client.id} (session: ${client.data.sessionId})`);
   }
 
   handleDisconnect(client: Socket) {
@@ -148,6 +150,15 @@ export class VisionGateway
     });
   }
 
+  // ─── chat:clear handler ─────────────────────────────────────────────────────
+
+  @SubscribeMessage('chat:clear')
+  handleChatClear(@ConnectedSocket() client: Socket): void {
+    client.data.sessionId = crypto.randomUUID();
+    this.logger.log(`🔄 chat:clear — new session for ${client.id}: ${client.data.sessionId}`);
+    client.emit('chat:cleared', { status: 'success' });
+  }
+
   // ─── chat:send handler ──────────────────────────────────────────────────────
 
   /**
@@ -175,13 +186,14 @@ export class VisionGateway
       return;
     }
 
-    this.logger.log(`💬 chat:send from ${client.id} — proxying to n8n`);
+    const body = { chatInput: data.message, action: 'sendMessage', sessionId: client.data.sessionId };
+    this.logger.log(`💬 chat:send from ${client.id} — payload: ${JSON.stringify(body)} → ${N8N_WEBHOOK_URL}`);
 
     try {
       const response = await fetch(N8N_WEBHOOK_URL, {
         method: 'POST',
         headers: { 'Content-Type': 'application/json' },
-        body: JSON.stringify({ chatInput: data.message }),
+        body: JSON.stringify([body]),
       });
 
       const raw = await response.json();

+ 1 - 8
src/surveillance/surveillance.gateway.ts

@@ -17,7 +17,7 @@ import {
 } from '@nestjs/websockets';
 import { Logger, OnModuleInit } from '@nestjs/common';
 import { Server, Socket } from 'socket.io';
-import { SurveillanceService, MonitorPayload, MonitorStatus } from './surveillance.service';
+import { SurveillanceService, MonitorPayload } from './surveillance.service';
 
 @WebSocketGateway({
   cors: { origin: '*' },   // Angular dev server on any port
@@ -40,10 +40,6 @@ export class SurveillanceGateway
     this.surveillanceService.registerMetricsCallback((metrics: MonitorPayload[]) => {
       this.broadcast(metrics);
     });
-    // Wire webhook probe results → broadcast monitor:status to all clients
-    this.surveillanceService.registerStatusCallback((status: MonitorStatus) => {
-      if (this.server) this.server.emit('monitor:status', status);
-    });
   }
 
   afterInit(server: Server) {
@@ -52,13 +48,10 @@ export class SurveillanceGateway
 
   handleConnection(client: Socket) {
     this.logger.log(`📡 Client connected: ${client.id}`);
-    // Immediately push the current snapshot so the UI isn't blank on load
     const snapshot = this.surveillanceService.getLatestMetrics();
     if (snapshot.length > 0) {
       client.emit('monitor:data', snapshot);
     }
-    // Push the latest webhook status immediately so the UI doesn't wait 10 s
-    client.emit('monitor:status', this.surveillanceService.getLatestStatus());
   }
 
   handleDisconnect(client: Socket) {

+ 0 - 71
src/surveillance/surveillance.service.ts

@@ -27,10 +27,6 @@ export interface MonitorPayload {
   timestamp: Date;
 }
 
-export interface MonitorStatus {
-  n8nWebhookReady: boolean;
-  timestamp: Date;
-}
 
 @Injectable()
 export class SurveillanceService implements OnModuleInit, OnModuleDestroy {
@@ -41,15 +37,8 @@ export class SurveillanceService implements OnModuleInit, OnModuleDestroy {
 
   // Callback registered by the Gateway so it receives every poll result
   private onMetricsUpdate: ((metrics: MonitorPayload[]) => void) | null = null;
-  // Callback for webhook probe results
-  private onStatusUpdate: ((status: MonitorStatus) => void) | null = null;
 
   private pollInterval: NodeJS.Timeout | null = null;
-  private webhookProbeInterval: NodeJS.Timeout | null = null;
-
-  // Cached result of the last webhook probe
-  private _n8nWebhookReady = false;
-  private readonly n8nWebhookUrl = process.env['N8N_WEBHOOK_URL'] ?? '';
 
   // Tracked PIDs: resolved once and re-used (re-discovered when null)
   private pidMap: Record<string, number | null> = {
@@ -64,14 +53,10 @@ export class SurveillanceService implements OnModuleInit, OnModuleDestroy {
     this.logger.log('🟢 SurveillanceService booting — starting 500ms PID poll loop');
     await this.discoverPids();
     this.pollInterval = setInterval(() => this.tick(), 500);
-    // Probe the n8n webhook immediately, then every 10 s
-    await this.probeWebhook();
-    this.webhookProbeInterval = setInterval(() => this.probeWebhook(), 10_000);
   }
 
   onModuleDestroy() {
     if (this.pollInterval) clearInterval(this.pollInterval);
-    if (this.webhookProbeInterval) clearInterval(this.webhookProbeInterval);
     this.logger.log('🔴 SurveillanceService stopped');
   }
 
@@ -81,18 +66,10 @@ export class SurveillanceService implements OnModuleInit, OnModuleDestroy {
     this.onMetricsUpdate = cb;
   }
 
-  registerStatusCallback(cb: (status: MonitorStatus) => void) {
-    this.onStatusUpdate = cb;
-  }
-
   getLatestMetrics(): MonitorPayload[] {
     return this._latestMetrics;
   }
 
-  getLatestStatus(): MonitorStatus {
-    return { n8nWebhookReady: this._n8nWebhookReady, timestamp: new Date() };
-  }
-
   // ─── Port-Level Heartbeat ──────────────────────────────────────────────────
 
   /**
@@ -115,54 +92,6 @@ export class SurveillanceService implements OnModuleInit, OnModuleDestroy {
     });
   }
 
-  // ─── Webhook Probe ─────────────────────────────────────────────────────────
-
-  /**
-   * POSTs a lightweight ping to the n8n webhook URL.
-   * Any HTTP response (even 4xx) means the webhook is registered and n8n is
-   * accepting requests — we treat that as "ready".
-   * A network error (ECONNREFUSED, timeout, DNS failure) means "not ready".
-   */
-  private async probeWebhook(): Promise<void> {
-    if (!this.n8nWebhookUrl) {
-      if (this._n8nWebhookReady) {
-        this._n8nWebhookReady = false;
-        this.emitStatus();
-      }
-      return;
-    }
-
-    try {
-      const controller = new AbortController();
-      const timeout = setTimeout(() => controller.abort(), 3_000);
-      const res = await fetch(this.n8nWebhookUrl, {
-        method: 'POST',
-        headers: { 'Content-Type': 'application/json' },
-        body: JSON.stringify({ __ping: true }),
-        signal: controller.signal,
-      });
-      clearTimeout(timeout);
-      const ready = res.status < 500; // 2xx/3xx/4xx all mean n8n responded
-      if (ready !== this._n8nWebhookReady) {
-        this._n8nWebhookReady = ready;
-        this.logger.log(`🔗 n8n webhook probe: ${ready ? '✅ ready' : '⚠️ 5xx'}`);
-        this.emitStatus();
-      }
-    } catch {
-      if (this._n8nWebhookReady) {
-        this._n8nWebhookReady = false;
-        this.logger.warn('⚠️ n8n webhook unreachable — agent marked NOT READY');
-        this.emitStatus();
-      }
-    }
-  }
-
-  private emitStatus(): void {
-    if (this.onStatusUpdate) {
-      this.onStatusUpdate({ n8nWebhookReady: this._n8nWebhookReady, timestamp: new Date() });
-    }
-  }
-
   // ─── PID Discovery ─────────────────────────────────────────────────────────
 
   /**