Explorar el Código

udpated gateway to do away with REST for chat functions

Dr-Swopt hace 2 semanas
padre
commit
385d3d4343

+ 1 - 1
src/FFB/ffb-production.controller.ts

@@ -13,7 +13,7 @@ export class FFBProductionController {
 
   @Post('chat')
   async query(@Body('message') message: string, @Body('sessionId') sessionId?: string) {
-    return this.ffbLangChainService.chat(message);
+    throw new BadRequestException('HTTP Chat endpoint is deprecated. Please use WebSocket connection on namespace /ffb with event "chat".');
   }
 
 

+ 25 - 3
src/FFB/ffb.gateway.ts

@@ -4,9 +4,13 @@ import {
     OnGatewayInit,
     OnGatewayConnection,
     OnGatewayDisconnect,
+    SubscribeMessage,
+    MessageBody,
+    ConnectedSocket,
 } from '@nestjs/websockets';
 import { Server, Socket } from 'socket.io';
-import { Logger } from '@nestjs/common';
+import { Logger, Inject, forwardRef } from '@nestjs/common';
+import { FFBLangChainService } from './services/ffb-langchain.service';
 
 @WebSocketGateway({
     cors: {
@@ -19,19 +23,37 @@ export class FFBGateway
     @WebSocketServer() server: Server;
     private logger: Logger = new Logger('FFBGateway');
 
+    constructor(
+        @Inject(forwardRef(() => FFBLangChainService))
+        private readonly langchainService: FFBLangChainService
+    ) { }
+
     afterInit(server: Server) {
         this.logger.log('FFB Gateway Initialized');
     }
 
     handleConnection(client: Socket, ...args: any[]) {
         this.logger.log(`Client connected: ${client.id}`);
+        this.langchainService.createSession(client.id);
     }
 
     handleDisconnect(client: Socket) {
         this.logger.log(`Client disconnected: ${client.id}`);
+        this.langchainService.deleteSession(client.id);
+    }
+
+    @SubscribeMessage('chat')
+    async handleChat(
+        @MessageBody() data: { message: string },
+        @ConnectedSocket() client: Socket,
+    ) {
+        this.logger.log(`Received chat from ${client.id}: ${data.message}`);
+        const response = await this.langchainService.chat(client.id, data.message);
+        client.emit('chat_response', { message: response });
+        return { event: 'sent', data: response }; // Acknowledgement if needed
     }
 
-    emitThought(data: any) {
-        this.server.emit('agent_thought', data);
+    emitThought(socketId: string, data: any) {
+        this.server.to(socketId).emit('agent_thought', data);
     }
 }

+ 40 - 14
src/FFB/services/ffb-langchain.service.ts

@@ -4,7 +4,8 @@ import { FFBVectorService } from './ffb-vector.service';
 import { z } from "zod";
 import { StateGraph, START, END, Annotation } from "@langchain/langgraph";
 import { BaseMessage, HumanMessage, AIMessage } from "@langchain/core/messages";
-
+import { forwardRef, Inject } from '@nestjs/common';
+import { FFBGateway } from '../ffb.gateway';
 // State Definition using Annotation
 const AgentState = Annotation.Root({
     messages: Annotation<BaseMessage[]>({
@@ -25,18 +26,22 @@ const AgentState = Annotation.Root({
     }),
     finalResponse: Annotation<string>({
         reducer: (x, y) => y ?? x,
+    }),
+    socketId: Annotation<string>({
+        reducer: (x, y) => y ?? x,
+        default: () => "default",
     })
 });
 
-import { FFBGateway } from '../ffb.gateway';
-
 @Injectable()
 export class FFBLangChainService {
     private model: ChatOpenAI;
     private graph: any;
+    private sessions: Map<string, BaseMessage[]> = new Map();
 
     constructor(
         private readonly vectorService: FFBVectorService,
+        @Inject(forwardRef(() => FFBGateway))
         private readonly gateway: FFBGateway
     ) {
         this.model = new ChatOpenAI({
@@ -97,7 +102,7 @@ export class FFBLangChainService {
             reasoning: z.string()
         });
 
-        this.gateway.emitThought({
+        this.gateway.emitThought(state.socketId, {
             node: 'router_node',
             status: 'processing',
             message: 'Analyzing user intent...',
@@ -126,7 +131,7 @@ User Input: "${lastMessage}"
         const result = await structuredLlm.invoke(routerPrompt);
 
         // Merge extracted entities with existing store
-        this.gateway.emitThought({
+        this.gateway.emitThought(state.socketId, {
             node: 'router_node',
             status: 'completed',
             result: result
@@ -134,14 +139,15 @@ User Input: "${lastMessage}"
 
         return {
             activeIntent: result.intent as any,
-            entityStore: result.entities || {}
+            entityStore: result.entities || {},
+            socketId: state.socketId
         };
     }
 
     private async clarifierNode(state: typeof AgentState.State): Promise<Partial<typeof AgentState.State>> {
         const prompt = `User mentioned ${JSON.stringify(state.entityStore)}. Ask them to clarify what they want to know (e.g., total production, specific issues, etc.).`;
 
-        this.gateway.emitThought({
+        this.gateway.emitThought(state.socketId, {
             node: 'clarifier_node',
             status: 'processing',
             message: 'Asking for clarification',
@@ -174,7 +180,7 @@ User Input: "${lastMessage}"
 
         const results = await this.vectorService.vectorSearch(lastMessage, 5, filter);
 
-        this.gateway.emitThought({
+        this.gateway.emitThought(state.socketId, {
             node: 'vector_search_node',
             status: 'completed',
             query: lastMessage,
@@ -233,7 +239,7 @@ User Input: "${lastMessage}"
 
         const results = await this.vectorService.aggregate(pipeline);
 
-        this.gateway.emitThought({
+        this.gateway.emitThought(state.socketId, {
             node: 'aggregation_node',
             status: 'completed',
             pipeline: pipeline,
@@ -257,7 +263,7 @@ User Input: "${lastMessage}"
         Cite the source (e.g., "Based on aggregation results...").
         `;
 
-        this.gateway.emitThought({
+        this.gateway.emitThought(state.socketId, {
             node: 'synthesis_node',
             status: 'processing',
             message: 'Synthesizing final response',
@@ -272,16 +278,36 @@ User Input: "${lastMessage}"
 
     // --- MAIN ENTRY POINT ---
 
-    async chat(message: string): Promise<string> {
+    createSession(socketId: string) {
+        this.sessions.set(socketId, []);
+        console.log(`Session created for ${socketId}`);
+    }
+
+    deleteSession(socketId: string) {
+        this.sessions.delete(socketId);
+        console.log(`Session deleted for ${socketId}`);
+    }
+
+    async chat(socketId: string, message: string): Promise<string> {
         try {
+            // Get history or init empty
+            const history = this.sessions.get(socketId) || [];
+
             const inputs = {
-                messages: [new HumanMessage(message)],
-                entityStore: {}
+                messages: [...history, new HumanMessage(message)],
+                entityStore: {},
+                socketId: socketId
             };
 
             const result = await this.graph.invoke(inputs);
 
-            const agentMessages = result.messages.filter((m: BaseMessage) => m._getType() === 'ai');
+            const allMessages = result.messages as BaseMessage[];
+
+            // Update history (keep all messages for context window? Or truncate?)
+            // For now, keep all. Memory optimization might be needed later.
+            this.sessions.set(socketId, allMessages);
+
+            const agentMessages = allMessages.filter((m: BaseMessage) => m._getType() === 'ai');
             const lastResponse = agentMessages[agentMessages.length - 1];
 
             return lastResponse.content as string;