ffb-query-agent.service.ts 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. import { Injectable, OnModuleInit } from "@nestjs/common";
  2. import { MongoCoreService } from "src/mongo/mongo-core.service";
  3. import { FFBProductionRepository } from "src/mongo/mongo-ffb-production.repository";
  4. import { FFBGateway } from "./ffb.gateway";
  5. import path from "path";
  6. import fs from "fs";
  7. import axios from "axios";
  8. import { GeminiEmbeddingService } from "./gemini-embedding.service";
  9. @Injectable()
  10. export class FFBQueryAgentService implements OnModuleInit {
  11. private systemPrompt: any;
  12. private repo: FFBProductionRepository;
  13. constructor(
  14. private readonly mongoCore: MongoCoreService,
  15. private readonly gateway: FFBGateway,
  16. private readonly embeddingService: GeminiEmbeddingService
  17. ) { }
  18. async onModuleInit() {
  19. const filePath = path.join(process.cwd(), 'QueryAgent.json');
  20. const data = fs.readFileSync(filePath, 'utf-8');
  21. this.systemPrompt = JSON.parse(data);
  22. }
  23. private buildPrompt(userMessage: string, schemaFields: string[]): string {
  24. const examplesText = (this.systemPrompt.examples || [])
  25. .map(
  26. (ex: any) => `Q: "${ex.question}"\nA: ${JSON.stringify(ex.plan, null, 2)}`
  27. )
  28. .join('\n\n');
  29. return `
  30. ${this.systemPrompt.instructions}
  31. Document fields: ${schemaFields.join(", ")}
  32. Always include the minimal "fields" needed for computation to reduce bandwidth.
  33. ${examplesText}
  34. Now, given the following user question, output JSON only in the format:
  35. { "textToBeEmbedded": string, "pipeline": [ ... ], "reasoning": string }
  36. Q: "${userMessage}"
  37. `;
  38. }
  39. private async callGemini(prompt: string): Promise<string> {
  40. const apiKey = process.env.GOOGLE_API_KEY;
  41. if (!apiKey) throw new Error('Missing GOOGLE_API_KEY');
  42. const url =
  43. 'https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent';
  44. const body = { contents: [{ role: 'user', parts: [{ text: prompt }] }] };
  45. try {
  46. const response = await axios.post(url, body, {
  47. headers: { 'Content-Type': 'application/json', 'x-goog-api-key': apiKey },
  48. });
  49. const text =
  50. response.data?.candidates?.[0]?.content?.parts?.map((p: any) => p.text).join(' ') ?? '';
  51. if (!text) throw new Error('No text generated by Gemini');
  52. return text;
  53. } catch (err: any) {
  54. console.error('Failed to call Gemini:', err.response?.data || err.message);
  55. throw err;
  56. }
  57. }
  58. private sanitizeLLMOutput(text: string): string {
  59. let sanitized = text
  60. .trim()
  61. .replace(/^```json\s*/, '')
  62. .replace(/^```\s*/, '')
  63. .replace(/```$/, '')
  64. .trim();
  65. sanitized = sanitized.replace(/ISODate\(["'](.+?)["']\)/g, '"$1"');
  66. sanitized = sanitized.replace(/"\s*\$/g, '"$');
  67. return sanitized;
  68. }
  69. private parseLLMOutput(sanitized: string): { textToBeEmbedded: string; pipeline: any[], reasoning: string } {
  70. try {
  71. const parsed = JSON.parse(sanitized);
  72. if (!('pipeline' in parsed) || !Array.isArray(parsed.pipeline)) {
  73. throw new Error('LLM output missing pipeline array');
  74. }
  75. return parsed;
  76. } catch {
  77. try {
  78. return eval(`(${sanitized})`);
  79. } catch (err2) {
  80. console.error('Failed to parse LLM output even with fallback:', sanitized);
  81. throw new Error('LLM returned invalid JSON');
  82. }
  83. }
  84. }
  85. private async getRepo(): Promise<FFBProductionRepository> {
  86. if (!this.repo) {
  87. const db = await this.mongoCore.getDb();
  88. this.repo = new FFBProductionRepository(db);
  89. await this.repo.init();
  90. }
  91. return this.repo;
  92. }
  93. /** Main entry point: plan + conditional vector search execution */
  94. async query(userMessage: string): Promise<any[]> {
  95. // 0. Get repository first
  96. const repo = await this.getRepo();
  97. // 1. Get sample doc to dynamically inject schema fields
  98. const sampleDoc = await repo.findAll();
  99. const ffbFields = sampleDoc.length > 0 ? Object.keys(sampleDoc[0]) : [
  100. "productionDate",
  101. "site",
  102. "phase",
  103. "block",
  104. "weight",
  105. "weightUom",
  106. "quantity",
  107. "quantityUom"
  108. ];
  109. // 2. Build prompt with dynamic schema
  110. const promptText = this.buildPrompt(userMessage, ffbFields);
  111. // 3. Call LLM
  112. const llmResponse = await this.callGemini(promptText);
  113. // 4. Sanitize + parse
  114. const sanitized = this.sanitizeLLMOutput(llmResponse);
  115. const { textToBeEmbedded, pipeline, reasoning } = this.parseLLMOutput(sanitized);
  116. // 5. If vector search is needed, generate embedding and inject into $vectorSearch
  117. if (textToBeEmbedded && textToBeEmbedded.trim()) {
  118. const embedding = await this.embeddingService.embedText(textToBeEmbedded.trim());
  119. // Ensure $vectorSearch is the first stage in the pipeline
  120. const vectorStageIndex = pipeline.findIndex(stage => '$vectorSearch' in stage);
  121. if (vectorStageIndex > -1) {
  122. pipeline[vectorStageIndex].$vectorSearch.queryVector = embedding;
  123. if (vectorStageIndex !== 0) {
  124. // Move $vectorSearch to first stage
  125. const [vsStage] = pipeline.splice(vectorStageIndex, 1);
  126. pipeline.unshift(vsStage);
  127. }
  128. }
  129. }
  130. // 6. Prepare pipeline for frontend display (mask actual vectors)
  131. const pipelineForSocket = pipeline.map(stage =>
  132. ('$vectorSearch' in stage
  133. ? { ...stage, $vectorSearch: { ...stage.$vectorSearch, queryVector: '[VECTOR]' } }
  134. : stage)
  135. );
  136. // 7. Emit pipeline + reasoning
  137. this.gateway.emitAgentOutput({
  138. stage: 'pipeline_generated',
  139. rawLLMOutput: llmResponse,
  140. pipeline: pipelineForSocket,
  141. prettyPipeline: JSON.stringify(pipelineForSocket, null, 2),
  142. textToBeEmbedded,
  143. reasoning
  144. });
  145. // 8. Execute aggregation
  146. const results = await repo.aggregate(pipeline);
  147. // 9. Emit execution results
  148. this.gateway.emitAgentOutput({
  149. stage: 'pipeline_executed',
  150. pipeline: pipelineForSocket,
  151. count: results.length,
  152. results,
  153. reasoning
  154. });
  155. return results;
  156. }
  157. }