import { Injectable, OnModuleInit } from "@nestjs/common"; import { MongoCoreService } from "src/mongo/mongo-core.service"; import { FFBProductionRepository } from "src/mongo/mongo-ffb-production.repository"; import { FFBGateway } from "./ffb.gateway"; import path from "path"; import fs from "fs"; import axios from "axios"; import { GeminiEmbeddingService } from "./gemini-embedding.service"; @Injectable() export class FFBQueryAgentService implements OnModuleInit { private systemPrompt: any; private repo: FFBProductionRepository; constructor( private readonly mongoCore: MongoCoreService, private readonly gateway: FFBGateway, private readonly embeddingService: GeminiEmbeddingService ) { } async onModuleInit() { const filePath = path.join(process.cwd(), 'QueryAgent.json'); const data = fs.readFileSync(filePath, 'utf-8'); this.systemPrompt = JSON.parse(data); } private buildPrompt(userMessage: string, schemaFields: string[]): string { const examplesText = (this.systemPrompt.examples || []) .map( (ex: any) => `Q: "${ex.question}"\nA: ${JSON.stringify(ex.plan, null, 2)}` ) .join('\n\n'); return ` ${this.systemPrompt.instructions} Document fields: ${schemaFields.join(", ")} Always include the minimal "fields" needed for computation to reduce bandwidth. ${examplesText} Now, given the following user question, output JSON only in the format: { "textToBeEmbedded": string, "pipeline": [ ... ], "reasoning": string } Q: "${userMessage}" `; } private async callGemini(prompt: string): Promise { const apiKey = process.env.GOOGLE_API_KEY; if (!apiKey) throw new Error('Missing GOOGLE_API_KEY'); const url = 'https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent'; const body = { contents: [{ role: 'user', parts: [{ text: prompt }] }] }; try { const response = await axios.post(url, body, { headers: { 'Content-Type': 'application/json', 'x-goog-api-key': apiKey }, }); const text = response.data?.candidates?.[0]?.content?.parts?.map((p: any) => p.text).join(' ') ?? ''; if (!text) throw new Error('No text generated by Gemini'); return text; } catch (err: any) { console.error('Failed to call Gemini:', err.response?.data || err.message); throw err; } } private sanitizeLLMOutput(text: string): string { let sanitized = text .trim() .replace(/^```json\s*/, '') .replace(/^```\s*/, '') .replace(/```$/, '') .trim(); sanitized = sanitized.replace(/ISODate\(["'](.+?)["']\)/g, '"$1"'); sanitized = sanitized.replace(/"\s*\$/g, '"$'); return sanitized; } private parseLLMOutput(sanitized: string): { textToBeEmbedded: string; pipeline: any[], reasoning: string } { try { const parsed = JSON.parse(sanitized); if (!('pipeline' in parsed) || !Array.isArray(parsed.pipeline)) { throw new Error('LLM output missing pipeline array'); } return parsed; } catch { try { return eval(`(${sanitized})`); } catch (err2) { console.error('Failed to parse LLM output even with fallback:', sanitized); throw new Error('LLM returned invalid JSON'); } } } private async getRepo(): Promise { if (!this.repo) { const db = await this.mongoCore.getDb(); this.repo = new FFBProductionRepository(db); await this.repo.init(); } return this.repo; } /** Main entry point: plan + conditional vector search execution */ async query(userMessage: string): Promise { // 0. Get repository first const repo = await this.getRepo(); // 1. Get sample doc to dynamically inject schema fields const sampleDoc = await repo.findAll(); const ffbFields = sampleDoc.length > 0 ? Object.keys(sampleDoc[0]) : [ "productionDate", "site", "phase", "block", "weight", "weightUom", "quantity", "quantityUom" ]; // 2. Build prompt with dynamic schema const promptText = this.buildPrompt(userMessage, ffbFields); // 3. Call LLM const llmResponse = await this.callGemini(promptText); // 4. Sanitize + parse const sanitized = this.sanitizeLLMOutput(llmResponse); const { textToBeEmbedded, pipeline, reasoning } = this.parseLLMOutput(sanitized); // 5. If vector search is needed, generate embedding and inject into $vectorSearch if (textToBeEmbedded && textToBeEmbedded.trim()) { const embedding = await this.embeddingService.embedText(textToBeEmbedded.trim()); // Ensure $vectorSearch is the first stage in the pipeline const vectorStageIndex = pipeline.findIndex(stage => '$vectorSearch' in stage); if (vectorStageIndex > -1) { pipeline[vectorStageIndex].$vectorSearch.queryVector = embedding; if (vectorStageIndex !== 0) { // Move $vectorSearch to first stage const [vsStage] = pipeline.splice(vectorStageIndex, 1); pipeline.unshift(vsStage); } } } // 6. Prepare pipeline for frontend display (mask actual vectors) const pipelineForSocket = pipeline.map(stage => ('$vectorSearch' in stage ? { ...stage, $vectorSearch: { ...stage.$vectorSearch, queryVector: '[VECTOR]' } } : stage) ); // 7. Emit pipeline + reasoning this.gateway.emitAgentOutput({ stage: 'pipeline_generated', rawLLMOutput: llmResponse, pipeline: pipelineForSocket, prettyPipeline: JSON.stringify(pipelineForSocket, null, 2), textToBeEmbedded, reasoning }); // 8. Execute aggregation const results = await repo.aggregate(pipeline); // 9. Emit execution results this.gateway.emitAgentOutput({ stage: 'pipeline_executed', pipeline: pipelineForSocket, count: results.length, results, reasoning }); return results; } }