| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- import { Injectable, OnModuleInit } from "@nestjs/common";
- import { MongoCoreService } from "src/mongo/mongo-core.service";
- import { FFBProductionRepository } from "src/mongo/mongo-ffb-production.repository";
- import { FFBGateway } from "./ffb.gateway";
- import path from "path";
- import fs from "fs";
- import axios from "axios";
- import { GeminiEmbeddingService } from "./gemini-embedding.service";
- @Injectable()
- export class FFBQueryAgentService implements OnModuleInit {
- private systemPrompt: any;
- private repo: FFBProductionRepository;
- constructor(
- private readonly mongoCore: MongoCoreService,
- private readonly gateway: FFBGateway,
- private readonly embeddingService: GeminiEmbeddingService
- ) { }
- async onModuleInit() {
- const filePath = path.join(process.cwd(), 'QueryAgent.json');
- const data = fs.readFileSync(filePath, 'utf-8');
- this.systemPrompt = JSON.parse(data);
- }
- private buildPrompt(userMessage: string, schemaFields: string[]): string {
- const examplesText = (this.systemPrompt.examples || [])
- .map(
- (ex: any) => `Q: "${ex.question}"\nA: ${JSON.stringify(ex.plan, null, 2)}`
- )
- .join('\n\n');
- return `
- ${this.systemPrompt.instructions}
- Document fields: ${schemaFields.join(", ")}
- Always include the minimal "fields" needed for computation to reduce bandwidth.
- ${examplesText}
- Now, given the following user question, output JSON only in the format:
- { "textToBeEmbedded": string, "pipeline": [ ... ], "reasoning": string }
- Q: "${userMessage}"
- `;
- }
- private async callGemini(prompt: string): Promise<string> {
- const apiKey = process.env.GOOGLE_API_KEY;
- if (!apiKey) throw new Error('Missing GOOGLE_API_KEY');
- const url =
- 'https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent';
- const body = { contents: [{ role: 'user', parts: [{ text: prompt }] }] };
- try {
- const response = await axios.post(url, body, {
- headers: { 'Content-Type': 'application/json', 'x-goog-api-key': apiKey },
- });
- const text =
- response.data?.candidates?.[0]?.content?.parts?.map((p: any) => p.text).join(' ') ?? '';
- if (!text) throw new Error('No text generated by Gemini');
- return text;
- } catch (err: any) {
- console.error('Failed to call Gemini:', err.response?.data || err.message);
- throw err;
- }
- }
- private sanitizeLLMOutput(text: string): string {
- let sanitized = text
- .trim()
- .replace(/^```json\s*/, '')
- .replace(/^```\s*/, '')
- .replace(/```$/, '')
- .trim();
- sanitized = sanitized.replace(/ISODate\(["'](.+?)["']\)/g, '"$1"');
- sanitized = sanitized.replace(/"\s*\$/g, '"$');
- return sanitized;
- }
- private parseLLMOutput(sanitized: string): { textToBeEmbedded: string; pipeline: any[], reasoning: string } {
- try {
- const parsed = JSON.parse(sanitized);
- if (!('pipeline' in parsed) || !Array.isArray(parsed.pipeline)) {
- throw new Error('LLM output missing pipeline array');
- }
- return parsed;
- } catch {
- try {
- return eval(`(${sanitized})`);
- } catch (err2) {
- console.error('Failed to parse LLM output even with fallback:', sanitized);
- throw new Error('LLM returned invalid JSON');
- }
- }
- }
- private async getRepo(): Promise<FFBProductionRepository> {
- if (!this.repo) {
- const db = await this.mongoCore.getDb();
- this.repo = new FFBProductionRepository(db);
- await this.repo.init();
- }
- return this.repo;
- }
- /** Main entry point: plan + conditional vector search execution */
- async query(userMessage: string): Promise<any[]> {
- // 0. Get repository first
- const repo = await this.getRepo();
- // 1. Get sample doc to dynamically inject schema fields
- const sampleDoc = await repo.findAll();
- const ffbFields = sampleDoc.length > 0 ? Object.keys(sampleDoc[0]) : [
- "productionDate",
- "site",
- "phase",
- "block",
- "weight",
- "weightUom",
- "quantity",
- "quantityUom"
- ];
- // 2. Build prompt with dynamic schema
- const promptText = this.buildPrompt(userMessage, ffbFields);
- // 3. Call LLM
- const llmResponse = await this.callGemini(promptText);
- // 4. Sanitize + parse
- const sanitized = this.sanitizeLLMOutput(llmResponse);
- const { textToBeEmbedded, pipeline, reasoning } = this.parseLLMOutput(sanitized);
- // 5. If vector search is needed, generate embedding and inject into $vectorSearch
- if (textToBeEmbedded && textToBeEmbedded.trim()) {
- const embedding = await this.embeddingService.embedText(textToBeEmbedded.trim());
- // Ensure $vectorSearch is the first stage in the pipeline
- const vectorStageIndex = pipeline.findIndex(stage => '$vectorSearch' in stage);
- if (vectorStageIndex > -1) {
- pipeline[vectorStageIndex].$vectorSearch.queryVector = embedding;
- if (vectorStageIndex !== 0) {
- // Move $vectorSearch to first stage
- const [vsStage] = pipeline.splice(vectorStageIndex, 1);
- pipeline.unshift(vsStage);
- }
- }
- }
- // 6. Prepare pipeline for frontend display (mask actual vectors)
- const pipelineForSocket = pipeline.map(stage =>
- ('$vectorSearch' in stage
- ? { ...stage, $vectorSearch: { ...stage.$vectorSearch, queryVector: '[VECTOR]' } }
- : stage)
- );
- // 7. Emit pipeline + reasoning
- this.gateway.emitAgentOutput({
- stage: 'pipeline_generated',
- rawLLMOutput: llmResponse,
- pipeline: pipelineForSocket,
- prettyPipeline: JSON.stringify(pipelineForSocket, null, 2),
- textToBeEmbedded,
- reasoning
- });
- // 8. Execute aggregation
- const results = await repo.aggregate(pipeline);
- // 9. Emit execution results
- this.gateway.emitAgentOutput({
- stage: 'pipeline_executed',
- pipeline: pipelineForSocket,
- count: results.length,
- results,
- reasoning
- });
- return results;
- }
- }
|