Prechádzať zdrojové kódy

feat: implement palm-oil module with ONNX inference backend and vision gateway via socket communication

Dr-Swopt 1 deň pred
rodič
commit
9eb583baea

+ 2 - 45
src/palm-oil/palm-oil.controller.ts

@@ -1,47 +1,4 @@
 /**
- * Lego 13 — Codebase Mutation: REST analyze endpoint stripped.
- *
- * POST /palm-oil/analyze has been replaced by the VisionGateway (Socket.io).
- * Angular must no longer upload images via multipart HTTP.
- * It must emit vision:stream with a raw Base64 string instead (Lego 11).
- *
- * Read-only REST endpoints (history, archive) are preserved — they serve
- * the Angular history tab and do not conflict with the Hoarding architecture.
+ * ADR-024 (v1.1) — All REST endpoints removed. Zero active route handlers.
+ * Asset delivery migrated to PalmHistory:GetImage over the 'response' socket channel.
  */
-
-import { Controller, Get, Delete, Res, Param } from '@nestjs/common';
-import { PalmOilService } from './palm-oil.service';
-import { Response } from 'express';
-import * as fs from 'fs';
-
-@Controller('palm-oil')
-export class PalmOilController {
-  constructor(private readonly palmOilService: PalmOilService) { }
-
-  @Get('history')
-  async getHistory() {
-    return this.palmOilService.getHistory();
-  }
-
-  @Delete('history/:archiveId')
-  async deleteRecord(@Param('archiveId') archiveId: string) {
-    return this.palmOilService.deleteRecord(archiveId);
-  }
-
-  @Delete('history')
-  async clearAllHistory() {
-    return this.palmOilService.clearAllHistory();
-  }
-
-  @Get('archive/:id')
-  async getArchivedImage(@Param('id') id: string, @Res() res: Response) {
-    const record = await this.palmOilService.getRecordByArchiveId(id);
-    if (!record || !record.image_path) {
-      return res.status(404).send('Image not found');
-    }
-
-    res.setHeader('Content-Type', 'image/jpeg');
-    const fileStream = fs.createReadStream(record.image_path);
-    fileStream.pipe(res);
-  }
-}

+ 1 - 2
src/palm-oil/palm-oil.module.ts

@@ -1,6 +1,5 @@
 import { Module } from '@nestjs/common';
 import { TypeOrmModule } from '@nestjs/typeorm';
-import { PalmOilController } from './palm-oil.controller';
 import { PalmOilService } from './palm-oil.service';
 import { VisionGateway } from './vision.gateway';
 import { History } from './entities/history.entity';
@@ -12,7 +11,7 @@ console.log('🔧 Inference backend: onnx-wasm (Android/Termux)');
 
 @Module({
   imports: [TypeOrmModule.forFeature([History]), SurveillanceModule],
-  controllers: [PalmOilController],
+  controllers: [],
   providers: [
     PalmOilService,
     VisionGateway,

+ 266 - 132
src/palm-oil/vision.gateway.ts

@@ -1,20 +1,24 @@
 /**
- * Lego 02  — Vision Inference over WebSocket (YOLOv8 + ONNX)
- * Lego 11  — Socket Event Schema: vision:analyze → vision:result
- * Lego 13  — Codebase Mutation: REST analyze endpoint replaced by WS Gateway
+ * ADR-024 — FIS Protocol Compliance Integration
+ * ADR-024.3 — Full FIS Streaming Protocol Alignment
  *
- * CONTRACT (Lego 11 hard rule):
- *   Angular MUST send raw, uncompressed Base64 strings on vision:analyze.
- *   Binary compression and WebRTC are strictly forbidden.
- *   The gateway deliberately skips all buffer optimisation to maximise
- *   I/O and CPU overhead on the socket bus.
+ * All client↔server interactions are multiplexed through a single
+ * `request` event carrying a FisAppMessage envelope (header + data).
+ * The routing engine branches on `header.serviceId:header.messageName`
+ * and always echoes back a FIS-compliant two-packet response:
+ *   1. Data packet  — { id, messageID, ..., complete: false, message: JSON.stringify(payload) }
+ *   2. Done packet  — { id, messageID, ..., complete: true }
  *
- * ORDERING (Lego 12):
- *   SQLite write (via PalmOilService.analyzeImage) is fully awaited
- *   BEFORE vision:result is emitted to the client — intentional blocking.
+ * This two-packet protocol satisfies NgxSocketService's streaming contract:
+ *   observer.next() fires on complete:false, observer.complete() on complete:true.
+ *
+ * ORDERING GUARANTEE (unchanged from Lego 12):
+ *   PalmOilService.analyzeImage() is fully awaited — SQLite write
+ *   completes before any vision response is emitted.
  */
 
 import * as crypto from 'crypto';
+import * as fs from 'fs';
 import {
   WebSocketGateway,
   WebSocketServer,
@@ -29,27 +33,58 @@ import { Logger, OnModuleInit } from '@nestjs/common';
 import { Server, Socket } from 'socket.io';
 import { PalmOilService } from './palm-oil.service';
 import { AnalysisResponse } from './interfaces/palm-analysis.interface';
-import { SurveillanceService } from '../surveillance/surveillance.service';
+import { SurveillanceService, SystemMetrics } from '../surveillance/surveillance.service';
+
+// ─── FIS Protocol Envelope ────────────────────────────────────────────────────
+
+interface FisAppMessage {
+  header?: {
+    messageID?: string;
+    serviceId?: string;
+    messageName?: string;
+    [k: string]: unknown;
+  };
+  data?: any;
+  // Legacy flat-format fallback fields
+  messageID?: string;
+  serviceId?: string;
+  operation?: string;
+  payload?: any;
+}
+
+interface FisAppResponse {
+  id: string;
+  messageID: string;
+  serviceId: string;
+  operation: string;
+  complete: boolean;
+  message?: string;   // JSON-encoded payload for data packets (complete: false)
+  payload?: any;      // Used only for push notifications (surveillance)
+  error?: string;
+}
+
+// ─── Typed inbound payload shapes ────────────────────────────────────────────
 
-// ─── Payload shape Angular sends on vision:analyze ────────────────────────────
-interface VisionStreamPayload {
-  /** Raw, uncompressed Base64 string — data URI prefix is stripped server-side */
+interface PalmVisionPayload {
   frame: string;
-  /** Optional: lets the UI tag which camera source the frame came from */
   sourceLabel?: string;
-  /** UUID shared across all frames in a batch session — links DB rows for Vault grouping */
   batchId?: string;
 }
 
-// ─── Payload shape Angular sends on chat:send ─────────────────────────────────
 interface ChatPayload {
-  /** User's chat message */
   message: string;
 }
 
-// N8N webhook URL from .env (loaded via dotenv/config in main.ts)
+interface HistoryDeletePayload {
+  archiveId: string;
+}
+
+// ─── Env ──────────────────────────────────────────────────────────────────────
+
 const N8N_WEBHOOK_URL = process.env['N8N_WEBHOOK_URL'] ?? '';
 
+// ─── Gateway ──────────────────────────────────────────────────────────────────
+
 @WebSocketGateway({
   cors: { origin: '*' },
   namespace: '/vision',
@@ -70,8 +105,17 @@ export class VisionGateway
   // ─── Lifecycle ───────────────────────────────────────────────────────────────
 
   onModuleInit() {
-    this.surveillanceService.registerMetricsCallback((metrics) => {
-      this.server?.emit('monitor:update', metrics);
+    this.surveillanceService.registerMetricsCallback((metrics: SystemMetrics) => {
+      const id = crypto.randomUUID();
+      const packet: FisAppResponse = {
+        id,
+        messageID: id,
+        serviceId: 'Surveillance',
+        operation: 'metricsUpdate',
+        complete: true,
+        payload: metrics,
+      };
+      this.server?.emit('response', packet);
     });
   }
 
@@ -82,140 +126,230 @@ export class VisionGateway
   handleConnection(client: Socket) {
     client.data.sessionId = crypto.randomUUID();
     this.logger.log(`📡 Vision client connected: ${client.id} (session: ${client.data.sessionId})`);
+
     const snapshot = this.surveillanceService.getLatestMetrics();
-    if (snapshot) client.emit('monitor:update', snapshot);
+    if (snapshot) {
+      const snapId = crypto.randomUUID();
+      const packet: FisAppResponse = {
+        id: snapId,
+        messageID: snapId,
+        serviceId: 'Surveillance',
+        operation: 'metricsUpdate',
+        complete: true,
+        payload: snapshot,
+      };
+      client.emit('response', packet);
+    }
   }
 
   handleDisconnect(client: Socket) {
     this.logger.log(`🔌 Vision client disconnected: ${client.id}`);
   }
 
-  // ─── vision:analyze handler ─────────────────────────────────────────────────
+  // ─── Unified message broker ───────────────────────────────────────────────
 
   /**
-   * Lego 11 — vision:analyze
+   * Single catch-all handler for all client operations.
    *
-   * Receives a raw Base64 image frame from Angular.
-   * NO compression negotiation. NO WebRTC. NO binary frames.
-   * The full uncompressed Base64 string travels the socket bus every tick.
+   * Accepts both FIS protocol envelope (header/data) and legacy flat format.
+   * Routing key: `${serviceId}:${operation}`
    *
-   * Sequence (Lego 12 ordering guarantee):
-   *   1. Decode Base64 → Buffer
-   *   2. Run ONNX inference (ScannerProvider — untouched per Lego 13)
-   *   3. Synchronous SQLite persist (inside analyzeImage, awaited fully)
-   *   4. ONLY THEN emit vision:result back to the client
+   * Supported routes:
+   *   PalmVision:analyze   — decode Base64 frame, run ONNX, persist, reply
+   *   History:getAll       — fetch last 50 history records
+   *   History:delete       — delete one record by archiveId
+   *   History:clearAll     — wipe all records and archived images
+   *   Chat:send            — proxy message to n8n webhook
+   *   Chat:clear           — reset session UUID
+   *   PalmHistory:GetImage — stream archived image as Base64 data URL
    */
-  @SubscribeMessage('vision:analyze')
-  async handleVisionStream(
-    @MessageBody() payload: VisionStreamPayload,
+  @SubscribeMessage('request')
+  async handleMessage(
+    @MessageBody() rawMsg: any,
     @ConnectedSocket() client: Socket,
-  ) {
-    if (!payload?.frame) {
-      client.emit('vision:error', { message: 'No frame data received' });
-      return;
-    }
+  ): Promise<void> {
+    // NgxSocketService sends JSON.stringify(FisAppMessage) — parse if string
+    const msg: FisAppMessage = typeof rawMsg === 'string' ? JSON.parse(rawMsg) : (rawMsg ?? {});
+
+    // Extract fields from FIS envelope; fall back to legacy flat format
+    const header = msg.header ?? {};
+    const messageID: string = (header.messageID ?? msg.messageID ?? crypto.randomUUID()) as string;
+    const serviceId: string = (header.serviceId ?? msg.serviceId ?? '') as string;
+    const operation: string = (header.messageName ?? msg.operation ?? '') as string;
+    const payload: any = msg.data ?? msg.payload;
+
+    /**
+     * FIS streaming two-packet reply:
+     *  1. Data packet  (complete: false) — NgxSocketService calls observer.next()
+     *  2. Done packet  (complete: true)  — NgxSocketService calls observer.complete()
+     * makeNgxCall in DpService reads `res.message` via JSON.parse, so payload
+     * must be serialised into the `message` string field.
+     */
+    const reply = (data: any): void => {
+      client.emit('response', {
+        id: messageID, messageID, serviceId, operation,
+        complete: false,
+        message: JSON.stringify(data),
+      } satisfies FisAppResponse);
+      client.emit('response', {
+        id: messageID, messageID, serviceId, operation,
+        complete: true,
+      } satisfies FisAppResponse);
+    };
+
+    const replyError = (error: string): void => {
+      client.emit('response', {
+        id: messageID, messageID, serviceId, operation,
+        complete: false,
+        message: JSON.stringify({ error }),
+      } satisfies FisAppResponse);
+      client.emit('response', {
+        id: messageID, messageID, serviceId, operation,
+        complete: true,
+      } satisfies FisAppResponse);
+    };
+
+    const route = `${serviceId}:${operation}`;
+    this.logger.log(`📨 request [${route}] from ${client.id}`);
 
-    // Lego 11 — High-Tax payload audit: log raw size to prove I/O overhead
-    this.logger.log(
-      `📦 vision:analyze from ${client.id} — payload size: ${payload.frame.length} chars (~${(payload.frame.length / 1024).toFixed(1)} KB)`,
-    );
-
-    // Strip the data URI prefix if Angular included it
-    // e.g. "data:image/jpeg;base64,/9j/4AAQ..." → "/9j/4AAQ..."
-    const rawBase64 = payload.frame.replace(/^data:image\/\w+;base64,/, '');
-
-    // Decode to Buffer — this is deliberately the HEAVY path:
-    // Base64 is ~33% larger than binary; we absorb that overhead intentionally.
-    const imageBuffer = Buffer.from(rawBase64, 'base64');
-
-    const sourceLabel = payload.sourceLabel ?? 'socket-frame';
-
-    // ── STEP 3 happens inside analyzeImage ──────────────────────────────────
-    // PalmOilService.analyzeImage():
-    //   → runs ONNX preprocess / inference / postprocess (Lego 02)
-    //   → awaits historyRepository.save() — SQLite write blocks here (Lego 12)
-    //   → only then returns the result
-    // We await the full chain before emitting so the DB write is guaranteed
-    // to complete first. This is the intentional I/O bottleneck.
-    let result: AnalysisResponse;
     try {
-      result = await this.palmOilService.analyzeImage(imageBuffer, sourceLabel, payload.batchId);
-    } catch (err: any) {
-      this.logger.error(`❌ Inference failed for ${client.id}: ${err.message}`);
-      client.emit('vision:error', { message: err.message });
-      return;
-    }
+      switch (route) {
 
-    // ── STEP 4 — emit AFTER the SQLite write is confirmed ────────────────────
-    // Attach technical_evidence block so the frontend audit manifest has
-    // engine metadata, archive pointer, and raw tensor snapshot per frame.
-    client.emit('vision:result', {
-      ...result,
-      technical_evidence: {
-        engine: 'NestJS-ONNX' as const,
-        archive_id: result.archive_id,
-        total_count: result.total_count,
-        threshold: result.current_threshold,
-        industrial_summary: result.industrial_summary,
-        raw_tensor_sample: result.raw_tensor_sample,
-      },
-    });
-  }
+        // ── PalmVision:analyze ─────────────────────────────────────────────
+        case 'PalmVision:analyze': {
+          const { frame, sourceLabel, batchId } = (payload ?? {}) as PalmVisionPayload;
 
-  // ─── chat:clear handler ─────────────────────────────────────────────────────
+          if (!frame) {
+            replyError('No frame data received');
+            return;
+          }
 
-  @SubscribeMessage('chat:clear')
-  handleChatClear(@ConnectedSocket() client: Socket): void {
-    client.data.sessionId = crypto.randomUUID();
-    this.logger.log(`🔄 chat:clear — new session for ${client.id}: ${client.data.sessionId}`);
-    client.emit('chat:cleared', { status: 'success' });
-  }
+          this.logger.log(
+            `📦 PalmVision:analyze from ${client.id} — payload size: ${frame.length} chars (~${(frame.length / 1024).toFixed(1)} KB)`,
+          );
 
-  // ─── chat:send handler ──────────────────────────────────────────────────────
+          const rawBase64 = frame.replace(/^data:image\/\w+;base64,/, '');
+          const imageBuffer = Buffer.from(rawBase64, 'base64');
 
-  /**
-   * Lego 03 / Lego 06 — RAG Chat Proxy (CORS Killshot)
-   *
-   * Receives the user's message from Angular and forwards it to n8n via a
-   * server-to-server HTTP POST — no browser CORS restrictions apply.
-   * The webhook URL is read from .env (N8N_WEBHOOK_URL) so the Angular client
-   * never needs to store or send the URL itself.
-   *
-   * Response is emitted back to the requesting client via chat:result.
-   */
-  @SubscribeMessage('chat:send')
-  async handleChat(
-    @MessageBody() data: ChatPayload,
-    @ConnectedSocket() client: Socket,
-  ): Promise<void> {
-    if (!data?.message?.trim()) {
-      client.emit('chat:error', { message: 'Empty message received' });
-      return;
-    }
+          const result: AnalysisResponse = await this.palmOilService.analyzeImage(
+            imageBuffer,
+            sourceLabel ?? 'socket-frame',
+            batchId,
+          );
 
-    if (!N8N_WEBHOOK_URL) {
-      client.emit('chat:error', { message: 'N8N_WEBHOOK_URL not configured in .env' });
-      return;
-    }
+          reply({
+            ...result,
+            technical_evidence: {
+              engine: 'NestJS-ONNX' as const,
+              archive_id: result.archive_id,
+              total_count: result.total_count,
+              threshold: result.current_threshold,
+              industrial_summary: result.industrial_summary,
+              raw_tensor_sample: result.raw_tensor_sample,
+            },
+          });
+          break;
+        }
 
-    const body = { chatInput: data.message, action: 'sendMessage', sessionId: client.data.sessionId };
-    this.logger.log(`💬 chat:send from ${client.id} — payload: ${JSON.stringify(body)} → ${N8N_WEBHOOK_URL}`);
+        // ── History:getAll ─────────────────────────────────────────────────
+        case 'History:getAll': {
+          const history = await this.palmOilService.getHistory();
+          reply(history);
+          break;
+        }
 
-    try {
-      const response = await fetch(N8N_WEBHOOK_URL, {
-        method: 'POST',
-        headers: { 'Content-Type': 'application/json' },
-        body: JSON.stringify(body),
-      });
-
-      const raw = await response.json();
-      // n8n often returns an array of results — unwrap to the first element so
-      // the frontend always receives a predictable object, never a bare array.
-      const result = Array.isArray(raw) ? raw[0] : raw;
-      client.emit('chat:result', result);
+        // ── History:delete ─────────────────────────────────────────────────
+        case 'History:delete': {
+          const { archiveId } = (payload ?? {}) as HistoryDeletePayload;
+          const result = await this.palmOilService.deleteRecord(archiveId);
+          reply(result);
+          break;
+        }
+
+        // ── History:clearAll ───────────────────────────────────────────────
+        case 'History:clearAll': {
+          const result = await this.palmOilService.clearAllHistory();
+          reply(result);
+          break;
+        }
+
+        // ── Chat:send ──────────────────────────────────────────────────────
+        case 'Chat:send': {
+          const { message } = (payload ?? {}) as ChatPayload;
+
+          if (!message?.trim()) {
+            replyError('Empty message received');
+            return;
+          }
+
+          if (!N8N_WEBHOOK_URL) {
+            replyError('N8N_WEBHOOK_URL not configured in .env');
+            return;
+          }
+
+          const body = {
+            chatInput: message,
+            action: 'sendMessage',
+            sessionId: client.data.sessionId,
+          };
+          this.logger.log(`💬 Chat:send from ${client.id} → ${N8N_WEBHOOK_URL}`);
+
+          const response = await fetch(N8N_WEBHOOK_URL, {
+            method: 'POST',
+            headers: { 'Content-Type': 'application/json' },
+            body: JSON.stringify(body),
+          });
+
+          const raw = await response.json();
+          const chatResult = Array.isArray(raw) ? raw[0] : raw;
+          reply(chatResult);
+          break;
+        }
+
+        // ── Chat:clear ─────────────────────────────────────────────────────
+        case 'Chat:clear': {
+          client.data.sessionId = crypto.randomUUID();
+          this.logger.log(`🔄 Chat:clear — new session for ${client.id}: ${client.data.sessionId}`);
+          reply({ status: 'success' });
+          break;
+        }
+
+        // ── PalmHistory:GetImage ───────────────────────────────────────────
+        case 'PalmHistory:GetImage': {
+          const { archiveId } = (payload ?? {}) as HistoryDeletePayload;
+
+          if (!archiveId) {
+            replyError('No archiveId provided');
+            return;
+          }
+
+          const record = await this.palmOilService.getRecordByArchiveId(archiveId);
+          if (!record || !record.image_path) {
+            replyError(`Image not found for archiveId: ${archiveId}`);
+            return;
+          }
+
+          if (!fs.existsSync(record.image_path)) {
+            replyError(`Image file missing on disk for archiveId: ${archiveId}`);
+            return;
+          }
+
+          const fileBuffer = fs.readFileSync(record.image_path);
+          const dataUrl = `data:image/jpeg;base64,${fileBuffer.toString('base64')}`;
+
+          reply({ archiveId, image_data: dataUrl });
+          break;
+        }
+
+        // ── Unknown route ──────────────────────────────────────────────────
+        default: {
+          this.logger.warn(`⚠️  Unknown route: ${route}`);
+          replyError(`Unknown route: ${route}`);
+        }
+      }
     } catch (err: any) {
-      this.logger.error(`❌ n8n proxy failed: ${err.message}`);
-      client.emit('chat:error', { message: `n8n proxy failed: ${err.message}` });
+      this.logger.error(`❌ request error [${route}]: ${err.message}`);
+      replyError(err.message);
     }
   }
 }