Просмотр исходного кода

simplification for monitoring system

Dr-Swopt 11 часов назад
Родитель
Сommit
1565df13bf

+ 27 - 0
package-lock.json

@@ -26,6 +26,7 @@
         "sharp": "^0.34.5",
         "socket.io": "^4.8.3",
         "sqlite3": "^5.1.7",
+        "systeminformation": "^5.31.5",
         "typeorm": "^0.3.28"
       },
       "devDependencies": {
@@ -12925,6 +12926,32 @@
         "url": "https://opencollective.com/synckit"
       }
     },
+    "node_modules/systeminformation": {
+      "version": "5.31.5",
+      "resolved": "https://registry.npmjs.org/systeminformation/-/systeminformation-5.31.5.tgz",
+      "integrity": "sha512-5SyLdip4/3alxD4Kh+63bUQTJmu7YMfYQTC+koZy7X73HgNqZSD2P4wOZQWtUncvPvcEmnfIjCoygN4MRoEejQ==",
+      "license": "MIT",
+      "os": [
+        "darwin",
+        "linux",
+        "win32",
+        "freebsd",
+        "openbsd",
+        "netbsd",
+        "sunos",
+        "android"
+      ],
+      "bin": {
+        "systeminformation": "lib/cli.js"
+      },
+      "engines": {
+        "node": ">=8.0.0"
+      },
+      "funding": {
+        "type": "Buy me a coffee",
+        "url": "https://www.buymeacoffee.com/systeminfo"
+      }
+    },
     "node_modules/tapable": {
       "version": "2.3.2",
       "resolved": "https://registry.npmjs.org/tapable/-/tapable-2.3.2.tgz",

+ 1 - 0
package.json

@@ -37,6 +37,7 @@
     "sharp": "^0.34.5",
     "socket.io": "^4.8.3",
     "sqlite3": "^5.1.7",
+    "systeminformation": "^5.31.5",
     "typeorm": "^0.3.28"
   },
   "devDependencies": {

+ 2 - 2
src/palm-oil/palm-oil.module.ts

@@ -5,11 +5,11 @@ import { PalmOilService } from './palm-oil.service';
 import { ScannerProvider } from './providers/scanner.provider';
 import { VisionGateway } from './vision.gateway';
 import { History } from './entities/history.entity';
+import { SurveillanceModule } from '../surveillance/surveillance.module';
 
 @Module({
-  imports: [TypeOrmModule.forFeature([History])],
+  imports: [TypeOrmModule.forFeature([History]), SurveillanceModule],
   controllers: [PalmOilController],
-  // Lego 13 — VisionGateway replaces the REST analyze endpoint
   providers: [PalmOilService, ScannerProvider, VisionGateway],
   exports: [PalmOilService, ScannerProvider],
 })

+ 15 - 3
src/palm-oil/vision.gateway.ts

@@ -25,10 +25,11 @@ import {
   MessageBody,
   ConnectedSocket,
 } from '@nestjs/websockets';
-import { Logger } from '@nestjs/common';
+import { Logger, OnModuleInit } from '@nestjs/common';
 import { Server, Socket } from 'socket.io';
 import { PalmOilService } from './palm-oil.service';
 import { AnalysisResponse } from './interfaces/palm-analysis.interface';
+import { SurveillanceService } from '../surveillance/surveillance.service';
 
 // ─── Payload shape Angular sends on vision:analyze ────────────────────────────
 interface VisionStreamPayload {
@@ -54,17 +55,26 @@ const N8N_WEBHOOK_URL = process.env['N8N_WEBHOOK_URL'] ?? '';
   namespace: '/vision',
 })
 export class VisionGateway
-  implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
+  implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect, OnModuleInit
 {
   @WebSocketServer()
   private server!: Server;
 
   private readonly logger = new Logger(VisionGateway.name);
 
-  constructor(private readonly palmOilService: PalmOilService) {}
+  constructor(
+    private readonly palmOilService: PalmOilService,
+    private readonly surveillanceService: SurveillanceService,
+  ) {}
 
   // ─── Lifecycle ───────────────────────────────────────────────────────────────
 
+  onModuleInit() {
+    this.surveillanceService.registerMetricsCallback((metrics) => {
+      this.server?.emit('monitor:update', metrics);
+    });
+  }
+
   afterInit() {
     this.logger.log('🔌 VisionGateway initialized on /vision namespace');
   }
@@ -72,6 +82,8 @@ export class VisionGateway
   handleConnection(client: Socket) {
     client.data.sessionId = crypto.randomUUID();
     this.logger.log(`📡 Vision client connected: ${client.id} (session: ${client.data.sessionId})`);
+    const snapshot = this.surveillanceService.getLatestMetrics();
+    if (snapshot) client.emit('monitor:update', snapshot);
   }
 
   handleDisconnect(client: Socket) {

+ 5 - 10
src/surveillance/surveillance.gateway.ts

@@ -17,7 +17,7 @@ import {
 } from '@nestjs/websockets';
 import { Logger, OnModuleInit } from '@nestjs/common';
 import { Server, Socket } from 'socket.io';
-import { SurveillanceService, MonitorPayload } from './surveillance.service';
+import { SurveillanceService, SystemMetrics } from './surveillance.service';
 
 @WebSocketGateway({
   cors: { origin: '*' },   // Angular dev server on any port
@@ -37,7 +37,7 @@ export class SurveillanceGateway
 
   onModuleInit() {
     // Wire the service callback → broadcasts to all connected clients
-    this.surveillanceService.registerMetricsCallback((metrics: MonitorPayload[]) => {
+    this.surveillanceService.registerMetricsCallback((metrics: SystemMetrics) => {
       this.broadcast(metrics);
     });
   }
@@ -49,7 +49,7 @@ export class SurveillanceGateway
   handleConnection(client: Socket) {
     this.logger.log(`📡 Client connected: ${client.id}`);
     const snapshot = this.surveillanceService.getLatestMetrics();
-    if (snapshot.length > 0) {
+    if (snapshot) {
       client.emit('monitor:data', snapshot);
     }
   }
@@ -60,22 +60,17 @@ export class SurveillanceGateway
 
   // ─── Event Handlers ────────────────────────────────────────────────────────
 
-  /**
-   * Lego 11 — monitor:subscribe
-   * UI emits this to start (or re-confirm) the resource tracking stream.
-   * We acknowledge and immediately push the latest snapshot.
-   */
   @SubscribeMessage('monitor:subscribe')
   handleSubscribe(client: Socket) {
     this.logger.log(`🟢 monitor:subscribe from ${client.id}`);
     const snapshot = this.surveillanceService.getLatestMetrics();
-    client.emit('monitor:data', snapshot);
+    if (snapshot) client.emit('monitor:data', snapshot);
     return { event: 'monitor:subscribed', data: { ok: true } };
   }
 
   // ─── Broadcast ─────────────────────────────────────────────────────────────
 
-  private broadcast(metrics: MonitorPayload[]) {
+  private broadcast(metrics: SystemMetrics) {
     if (this.server) {
       this.server.emit('monitor:data', metrics);
     }

+ 1 - 2
src/surveillance/surveillance.module.ts

@@ -1,9 +1,8 @@
 import { Module } from '@nestjs/common';
 import { SurveillanceService } from './surveillance.service';
-import { SurveillanceGateway } from './surveillance.gateway';
 
 @Module({
-  providers: [SurveillanceService, SurveillanceGateway],
+  providers: [SurveillanceService],
   exports: [SurveillanceService],
 })
 export class SurveillanceModule {}

+ 70 - 155
src/surveillance/surveillance.service.ts

@@ -1,187 +1,86 @@
-/**
- * Lego 09 — Surveillance Engine (PID Polling & Socket Stream)
- *
- * Discovers the PIDs for n8n (node), ollama_llama_server, and the NestJS
- * process itself. Polls every 500ms via pidusage and exposes the live
- * metrics payload so the Gateway can broadcast monitor:data to all clients.
- *
- * Port-Level Heartbeat: n8n is only included in metrics when both a PID is
- * found AND port 5678 is actively accepting TCP connections.
- *
- * Webhook Probe: every 10 s, NestJS attempts a real HTTP POST to N8N_WEBHOOK_URL
- * with a ping payload. If n8n responds (any non-network-error), the agent is
- * considered ready. This status is broadcast via monitor:status so the UI can
- * show "Agent Ready / Not Ready" independently of process metrics.
- */
-
 import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
-import * as net from 'net';
-import * as pidusage from 'pidusage';
-import find from 'find-process';
-
-export interface MonitorPayload {
-  service: string;
-  pid: number;
-  cpu: number;
-  memory: number;
+import * as si from 'systeminformation';
+
+export interface ServiceStatus {
+  service: string; // 'nestjs' | 'n8n' | 'ollama'
+  pid: number | null;
+  online: boolean;
+  cpu: number;    // % usage
+  memory: number; // bytes
+}
+
+export interface SystemMetrics {
+  cpuLoad: number;      // Total system CPU %
+  memUsed: number;      // Total system RAM used (bytes)
+  memTotal: number;     // Total system RAM (bytes)
+  uptime: number;       // System uptime in seconds
+  services: ServiceStatus[];
   timestamp: Date;
 }
 
+// Keep legacy alias so the Gateway compiles without changes
+export type MonitorPayload = SystemMetrics;
+
+const TRACKED_NAMES = ['node', 'n8n', 'ollama'];
 
 @Injectable()
 export class SurveillanceService implements OnModuleInit, OnModuleDestroy {
   private readonly logger = new Logger(SurveillanceService.name);
 
-  // Live metrics — the Gateway reads these on every tick
-  private _latestMetrics: MonitorPayload[] = [];
-
-  // Callback registered by the Gateway so it receives every poll result
-  private onMetricsUpdate: ((metrics: MonitorPayload[]) => void) | null = null;
-
+  private _latestMetrics: SystemMetrics | null = null;
+  private onMetricsUpdate: ((metrics: SystemMetrics) => void) | null = null;
   private pollInterval: NodeJS.Timeout | null = null;
 
-  // Tracked PIDs: resolved once and re-used (re-discovered when null)
-  private pidMap: Record<string, number | null> = {
-    NestJS: process.pid,  // Always known immediately
-    n8n: null,
-    Ollama: null,
-  };
-
   // ─── Lifecycle ─────────────────────────────────────────────────────────────
 
-  async onModuleInit() {
-    this.logger.log('🟢 SurveillanceService booting — starting 500ms PID poll loop');
-    await this.discoverPids();
+  onModuleInit() {
+    this.logger.log('SurveillanceService booting — starting 500ms poll loop');
     this.pollInterval = setInterval(() => this.tick(), 500);
   }
 
   onModuleDestroy() {
     if (this.pollInterval) clearInterval(this.pollInterval);
-    this.logger.log('🔴 SurveillanceService stopped');
+    this.logger.log('SurveillanceService stopped');
   }
 
   // ─── Public API ────────────────────────────────────────────────────────────
 
-  registerMetricsCallback(cb: (metrics: MonitorPayload[]) => void) {
+  registerMetricsCallback(cb: (metrics: SystemMetrics) => void) {
     this.onMetricsUpdate = cb;
   }
 
-  getLatestMetrics(): MonitorPayload[] {
+  getLatestMetrics(): SystemMetrics | null {
     return this._latestMetrics;
   }
 
-  // ─── Port-Level Heartbeat ──────────────────────────────────────────────────
-
-  /**
-   * Opens a short-lived TCP connection to host:port.
-   * Resolves true if the port accepts the connection within 800 ms.
-   * Resolves false on ECONNREFUSED, timeout, or any error.
-   */
-  private portListening(host: string, port: number): Promise<boolean> {
-    return new Promise((resolve) => {
-      const socket = new net.Socket();
-      const done = (result: boolean) => {
-        socket.destroy();
-        resolve(result);
-      };
-      socket.setTimeout(800);
-      socket.once('connect', () => done(true));
-      socket.once('timeout', () => done(false));
-      socket.once('error', () => done(false));
-      socket.connect(port, host);
-    });
-  }
-
-  // ─── PID Discovery ─────────────────────────────────────────────────────────
-
-  /**
-   * Uses find-process to locate n8n and Ollama PIDs.
-   * Called once at boot and retried on each tick for any service still null.
-   * n8n PID is only accepted if port 5678 is also actively listening.
-   */
-  private async discoverPids() {
-    // --- n8n: a Node.js process whose command line contains "n8n" ---
-    if (!this.pidMap['n8n']) {
-      try {
-        const nodeProcs = await find('name', 'node', true);
-        const n8nProc = nodeProcs.find(
-          (p: any) => p.cmd && p.cmd.toLowerCase().includes('n8n'),
-        );
-        if (n8nProc) {
-          // Port-level guard: only accept the PID if n8n is actually serving
-          const portUp = await this.portListening('127.0.0.1', 5678);
-          if (portUp) {
-            this.pidMap['n8n'] = n8nProc.pid;
-            this.logger.log(`🔍 n8n PID discovered: ${n8nProc.pid} (port 5678 ✓)`);
-          } else {
-            this.logger.warn(`⚠️ n8n PID ${n8nProc.pid} found but port 5678 is not listening — treating as offline`);
-          }
-        }
-      } catch (e) {
-        // n8n not running yet — will retry next tick
-      }
-    }
-
-    // --- Ollama: match the server child process by executable name ---
-    if (!this.pidMap['Ollama']) {
-      try {
-        // On Windows the process is "ollama_llama_server.exe"; on Linux "ollama"
-        const ollamaNames = ['ollama_llama_server', 'ollama'];
-        for (const name of ollamaNames) {
-          const procs = await find('name', name, true);
-          if (procs.length > 0) {
-            this.pidMap['Ollama'] = procs[0].pid;
-            this.logger.log(`🔍 Ollama PID discovered: ${procs[0].pid} (${name})`);
-            break;
-          }
-        }
-      } catch (e) {
-        // Ollama not running yet — will retry next tick
-      }
-    }
-  }
-
   // ─── Poll Tick ─────────────────────────────────────────────────────────────
 
   private async tick() {
-    // Port re-verification: if n8n PID is known, confirm port 5678 is still up.
-    // A zombie n8n process (PID alive, port dead) is evicted so discovery retries.
-    if (this.pidMap['n8n'] !== null) {
-      const portUp = await this.portListening('127.0.0.1', 5678);
-      if (!portUp) {
-        this.logger.warn(`⚠️ n8n port 5678 went silent — evicting PID ${this.pidMap['n8n']}`);
-        this.pidMap['n8n'] = null;
-      }
-    }
+    const [loadData, memData] = await Promise.all([si.currentLoad(), si.mem()]);
 
-    // Retry PID discovery for any services still unknown
-    const hasMissingPids = Object.values(this.pidMap).some((v) => v === null);
-    if (hasMissingPids) await this.discoverPids();
+    const cpuLoad = parseFloat(loadData.currentLoad.toFixed(2));
+    const memUsed = memData.used;
+    const memTotal = memData.total;
+    const uptime = Math.floor(si.time().uptime ?? 0);
 
-    const activePids = Object.entries(this.pidMap).filter(
-      ([, pid]) => pid !== null,
-    ) as [string, number][];
-
-    if (activePids.length === 0) return;
-
-    const pidsToQuery = activePids.map(([, pid]) => pid);
-
-    let stats: Record<number, pidusage.Status>;
+    let processList: si.Systeminformation.ProcessesProcessData[] = [];
     try {
-      stats = await pidusage(pidsToQuery);
-    } catch (e) {
-      // A process may have died — invalidate its PID so it gets re-discovered
-      this.invalidateDeadPids(e);
-      return;
+      const result = await si.processes();
+      processList = result.list ?? [];
+    } catch {
+      // Fall through — processList stays empty, services will be offline
     }
 
-    const metrics: MonitorPayload[] = activePids.map(([service, pid]) => ({
-      service,
-      pid,
-      cpu: parseFloat((stats[pid]?.cpu ?? 0).toFixed(2)),
-      memory: stats[pid]?.memory ?? 0,   // bytes — let the UI format it
+    const services = this.buildServiceStatuses(processList);
+
+    const metrics: SystemMetrics = {
+      cpuLoad,
+      memUsed,
+      memTotal,
+      uptime,
+      services,
       timestamp: new Date(),
-    }));
+    };
 
     this._latestMetrics = metrics;
 
@@ -192,14 +91,30 @@ export class SurveillanceService implements OnModuleInit, OnModuleDestroy {
 
   // ─── Helpers ───────────────────────────────────────────────────────────────
 
-  private invalidateDeadPids(error: any) {
-    // pidusage throws with the bad PID in the message — reset it so we re-search
-    const msg = String(error);
-    for (const [service, pid] of Object.entries(this.pidMap)) {
-      if (pid !== null && msg.includes(String(pid)) && service !== 'NestJS') {
-        this.logger.warn(`⚠️ PID ${pid} (${service}) appears dead — resetting`);
-        this.pidMap[service] = null;
+  private buildServiceStatuses(
+    processList: si.Systeminformation.ProcessesProcessData[],
+  ): ServiceStatus[] {
+    return TRACKED_NAMES.map((name) => {
+      if (processList.length === 0) {
+        return { service: name, pid: null, online: false, cpu: 0, memory: 0 };
       }
-    }
+
+      const match = processList.find((p) =>
+        (p.name ?? '').toLowerCase().includes(name) ||
+        (p.command ?? '').toLowerCase().includes(name),
+      );
+
+      if (!match) {
+        return { service: name, pid: null, online: false, cpu: 0, memory: 0 };
+      }
+
+      return {
+        service: name,
+        pid: match.pid,
+        online: true,
+        cpu: parseFloat((match.cpu ?? 0).toFixed(2)),
+        memory: match.memRss ?? 0,
+      };
+    });
   }
 }