ffb-query-agent.service.ts 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. import { Injectable, OnModuleInit } from "@nestjs/common";
  2. import { MongoCoreService } from "src/mongo/mongo-core.service";
  3. import { FFBProductionRepository } from "src/FFB/repo/mongo-ffb-production.repository";
  4. import { FFBGateway } from "../ffb.gateway";
  5. import path from "path";
  6. import fs from "fs";
  7. import { GeminiEmbeddingService } from "../embeddings/gemini-embedding.service";
  8. import { callGemini } from "../external/gemini.api";
  9. @Injectable()
  10. export class FFBQueryAgentService implements OnModuleInit {
  11. private systemPrompt: any;
  12. private repo: FFBProductionRepository;
  13. constructor(
  14. private readonly mongoCore: MongoCoreService,
  15. private readonly gateway: FFBGateway,
  16. private readonly embeddingService: GeminiEmbeddingService
  17. ) { }
  18. async onModuleInit() {
  19. // __dirname points to the directory of the current file in dist (e.g., dist/FFB/services)
  20. const filePath = path.join(__dirname, '..', 'prompts', 'QueryAgent.json');
  21. const data = fs.readFileSync(filePath, 'utf-8');
  22. this.systemPrompt = JSON.parse(data);
  23. }
  24. private buildPrompt(userMessage: string, schemaFields: string[]): string {
  25. const examplesText = (this.systemPrompt.examples || [])
  26. .map(
  27. (ex: any) => `Q: "${ex.question}"\nA: ${JSON.stringify(ex.plan, null, 2)}`
  28. )
  29. .join('\n\n');
  30. return `
  31. ${this.systemPrompt.instructions}
  32. Document fields: ${schemaFields.join(", ")}
  33. Always include the minimal "fields" needed for computation to reduce bandwidth.
  34. ${examplesText}
  35. Now, given the following user question, output JSON only in the format:
  36. { "textToBeEmbedded": string, "pipeline": [ ... ], "reasoning": string }
  37. Q: "${userMessage}"
  38. `;
  39. }
  40. private sanitizeLLMOutput(text: string): string {
  41. let sanitized = text
  42. .trim()
  43. .replace(/^```json\s*/, '')
  44. .replace(/^```\s*/, '')
  45. .replace(/```$/, '')
  46. .trim();
  47. sanitized = sanitized.replace(/ISODate\(["'](.+?)["']\)/g, '"$1"');
  48. sanitized = sanitized.replace(/"\s*\$/g, '"$');
  49. return sanitized;
  50. }
  51. private parseLLMOutput(sanitized: string): { textToBeEmbedded: string; pipeline: any[], reasoning: string } {
  52. try {
  53. const parsed = JSON.parse(sanitized);
  54. if (!('pipeline' in parsed) || !Array.isArray(parsed.pipeline)) {
  55. throw new Error('LLM output missing pipeline array');
  56. }
  57. return parsed;
  58. } catch {
  59. try {
  60. return eval(`(${sanitized})`);
  61. } catch (err2) {
  62. console.error('Failed to parse LLM output even with fallback:', sanitized);
  63. throw new Error('LLM returned invalid JSON');
  64. }
  65. }
  66. }
  67. private async getRepo(): Promise<FFBProductionRepository> {
  68. if (!this.repo) {
  69. const db = await this.mongoCore.getDb();
  70. this.repo = new FFBProductionRepository(db);
  71. await this.repo.init();
  72. }
  73. return this.repo;
  74. }
  75. /** Main entry point: plan + conditional vector search execution */
  76. async query(userMessage: string): Promise<any[]> {
  77. // 0. Get repository first
  78. const repo = await this.getRepo();
  79. // 1. Get sample doc to dynamically inject schema fields
  80. const sampleDoc = await repo.findAll();
  81. const ffbFields = sampleDoc.length > 0 ? Object.keys(sampleDoc[0]) : [
  82. "productionDate",
  83. "site",
  84. "phase",
  85. "block",
  86. "weight",
  87. "weightUom",
  88. "quantity",
  89. "quantityUom"
  90. ];
  91. // 2. Build prompt with dynamic schema
  92. const promptText = this.buildPrompt(userMessage, ffbFields);
  93. // 3. Call LLM
  94. const llmResponse = await callGemini(promptText);
  95. // 4. Sanitize + parse
  96. const sanitized = this.sanitizeLLMOutput(llmResponse);
  97. const { textToBeEmbedded, pipeline, reasoning } = this.parseLLMOutput(sanitized);
  98. // 5. If vector search is needed, generate embedding and inject into $vectorSearch
  99. if (textToBeEmbedded && textToBeEmbedded.trim()) {
  100. const embedding = await this.embeddingService.embedText(textToBeEmbedded.trim());
  101. // Ensure $vectorSearch is the first stage in the pipeline
  102. const vectorStageIndex = pipeline.findIndex(stage => '$vectorSearch' in stage);
  103. if (vectorStageIndex > -1) {
  104. pipeline[vectorStageIndex].$vectorSearch.queryVector = embedding;
  105. if (vectorStageIndex !== 0) {
  106. // Move $vectorSearch to first stage
  107. const [vsStage] = pipeline.splice(vectorStageIndex, 1);
  108. pipeline.unshift(vsStage);
  109. }
  110. }
  111. }
  112. // 6. Prepare pipeline for frontend display (mask actual vectors)
  113. const pipelineForSocket = pipeline.map(stage =>
  114. ('$vectorSearch' in stage
  115. ? { ...stage, $vectorSearch: { ...stage.$vectorSearch, queryVector: '[VECTOR]' } }
  116. : stage)
  117. );
  118. // 7. Emit pipeline + reasoning
  119. this.gateway.emitAgentOutput({
  120. stage: 'pipeline_generated',
  121. rawLLMOutput: llmResponse,
  122. pipeline: pipelineForSocket,
  123. prettyPipeline: JSON.stringify(pipelineForSocket, null, 2),
  124. textToBeEmbedded,
  125. reasoning
  126. });
  127. // 8. Execute aggregation
  128. const results = await repo.aggregate(pipeline);
  129. // 9. Emit execution results
  130. this.gateway.emitAgentOutput({
  131. stage: 'pipeline_executed',
  132. pipeline: pipelineForSocket,
  133. count: results.length,
  134. results,
  135. reasoning
  136. });
  137. return results;
  138. }
  139. }