|
|
@@ -0,0 +1,160 @@
|
|
|
+import { Injectable, OnModuleInit } from "@nestjs/common";
|
|
|
+import { MongoCoreService } from "src/mongo/mongo-core.service";
|
|
|
+import { FFBProductionRepository } from "src/mongo/mongo-ffb-production.repository";
|
|
|
+import { FFBGateway } from "./ffb.gateway";
|
|
|
+import path from "path";
|
|
|
+import fs from "fs";
|
|
|
+import axios from "axios";
|
|
|
+import jwt from "jsonwebtoken";
|
|
|
+import { GeminiEmbeddingService } from "./gemini-embedding.service";
|
|
|
+
|
|
|
+@Injectable()
|
|
|
+export class FFBQueryAgentService implements OnModuleInit {
|
|
|
+ private systemPrompt: any;
|
|
|
+ private repo: FFBProductionRepository;
|
|
|
+ private readonly VECTOR_DIM = parseInt(process.env.VECTOR_DIM || '3072'); // Gemini default
|
|
|
+
|
|
|
+ private serviceAccount: any; // parsed service account JSON
|
|
|
+ private tokenExpiry: number = 0;
|
|
|
+ private accessToken: string = '';
|
|
|
+
|
|
|
+ constructor(
|
|
|
+ private readonly mongoCore: MongoCoreService,
|
|
|
+ private readonly gateway: FFBGateway,
|
|
|
+ private readonly embeddingService: GeminiEmbeddingService
|
|
|
+ ) { }
|
|
|
+
|
|
|
+ async onModuleInit() {
|
|
|
+ const filePath = path.join(process.cwd(), 'QueryAgent.json');
|
|
|
+ const data = fs.readFileSync(filePath, 'utf-8');
|
|
|
+ this.systemPrompt = JSON.parse(data);
|
|
|
+
|
|
|
+ // Load service account for OAuth
|
|
|
+ const credsPath = process.env.GOOGLE_APPLICATION_CREDENTIALS;
|
|
|
+ if (!credsPath) throw new Error('Missing GOOGLE_APPLICATION_CREDENTIALS');
|
|
|
+ const credsData = fs.readFileSync(credsPath, 'utf-8');
|
|
|
+ this.serviceAccount = JSON.parse(credsData);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ private buildPrompt(userMessage: string): string {
|
|
|
+ const examplesText = (this.systemPrompt.examples || [])
|
|
|
+ .map((ex: any) => `Q: "${ex.question}"\nA: ${JSON.stringify(ex.plan, null, 2)}`)
|
|
|
+ .join('\n\n');
|
|
|
+
|
|
|
+ return `
|
|
|
+${this.systemPrompt.instructions}
|
|
|
+
|
|
|
+Always include the minimal "fields" needed for computation to reduce bandwidth.
|
|
|
+
|
|
|
+${examplesText}
|
|
|
+
|
|
|
+Now, given the following user question, output JSON only in the format:
|
|
|
+{ "textToBeEmbedded": string, "pipeline": [ ... ] }
|
|
|
+
|
|
|
+Q: "${userMessage}"
|
|
|
+`;
|
|
|
+ }
|
|
|
+
|
|
|
+ private async callGemini(prompt: string): Promise<string> {
|
|
|
+ const apiKey = process.env.GOOGLE_API_KEY;
|
|
|
+ if (!apiKey) throw new Error('Missing GOOGLE_API_KEY');
|
|
|
+
|
|
|
+ const url =
|
|
|
+ 'https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent';
|
|
|
+
|
|
|
+ const body = { contents: [{ role: 'user', parts: [{ text: prompt }] }] };
|
|
|
+
|
|
|
+ try {
|
|
|
+ const response = await axios.post(url, body, {
|
|
|
+ headers: { 'Content-Type': 'application/json', 'x-goog-api-key': apiKey },
|
|
|
+ });
|
|
|
+
|
|
|
+ const text =
|
|
|
+ response.data?.candidates?.[0]?.content?.parts?.map((p: any) => p.text).join(' ') ?? '';
|
|
|
+ if (!text) throw new Error('No text generated by Gemini');
|
|
|
+ return text;
|
|
|
+ } catch (err: any) {
|
|
|
+ console.error('Failed to call Gemini:', err.response?.data || err.message);
|
|
|
+ throw err;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private sanitizeLLMOutput(text: string): string {
|
|
|
+ let sanitized = text
|
|
|
+ .trim()
|
|
|
+ .replace(/^```json\s*/, '')
|
|
|
+ .replace(/^```\s*/, '')
|
|
|
+ .replace(/```$/, '')
|
|
|
+ .trim();
|
|
|
+ sanitized = sanitized.replace(/ISODate\(["'](.+?)["']\)/g, '"$1"');
|
|
|
+ sanitized = sanitized.replace(/"\s*\$/g, '"$');
|
|
|
+ return sanitized;
|
|
|
+ }
|
|
|
+
|
|
|
+ private parseLLMOutput(sanitized: string): { textToBeEmbedded: string; pipeline: any[] } {
|
|
|
+ try {
|
|
|
+ const parsed = JSON.parse(sanitized);
|
|
|
+ if (!('pipeline' in parsed) || !Array.isArray(parsed.pipeline)) {
|
|
|
+ throw new Error('LLM output missing pipeline array');
|
|
|
+ }
|
|
|
+ return parsed;
|
|
|
+ } catch {
|
|
|
+ try {
|
|
|
+ return eval(`(${sanitized})`);
|
|
|
+ } catch (err2) {
|
|
|
+ console.error('Failed to parse LLM output even with fallback:', sanitized);
|
|
|
+ throw new Error('LLM returned invalid JSON');
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private async getRepo(): Promise<FFBProductionRepository> {
|
|
|
+ if (!this.repo) {
|
|
|
+ const db = await this.mongoCore.getDb();
|
|
|
+ this.repo = new FFBProductionRepository(db);
|
|
|
+ await this.repo.init();
|
|
|
+ }
|
|
|
+ return this.repo;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Main entry point: plan + conditional vector search execution */
|
|
|
+ async query(userMessage: string): Promise<any[]> {
|
|
|
+ const promptText = this.buildPrompt(userMessage);
|
|
|
+ const llmResponse = await this.callGemini(promptText);
|
|
|
+
|
|
|
+ const sanitized = this.sanitizeLLMOutput(llmResponse);
|
|
|
+ const { textToBeEmbedded, pipeline } = this.parseLLMOutput(sanitized);
|
|
|
+
|
|
|
+ if (textToBeEmbedded && textToBeEmbedded.trim()) {
|
|
|
+ const embedding = await this.embeddingService.embedText(textToBeEmbedded.trim());
|
|
|
+ for (const stage of pipeline) {
|
|
|
+ if ('$vectorSearch' in stage) stage.$vectorSearch.queryVector = embedding;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ const pipelineForSocket = pipeline.map(stage => ('$vectorSearch' in stage ? { ...stage, $vectorSearch: { ...stage.$vectorSearch, queryVector: '[VECTOR]' } } : stage));
|
|
|
+
|
|
|
+ this.gateway.emitAgentOutput({
|
|
|
+ stage: 'pipeline_generated',
|
|
|
+ rawLLMOutput: llmResponse,
|
|
|
+ pipeline: pipelineForSocket,
|
|
|
+ prettyPipeline: JSON.stringify(pipelineForSocket, null, 2),
|
|
|
+ textToBeEmbedded,
|
|
|
+ });
|
|
|
+
|
|
|
+ const repo = await this.getRepo();
|
|
|
+ const results = await repo.aggregate(pipeline);
|
|
|
+
|
|
|
+ this.gateway.emitAgentOutput({
|
|
|
+ stage: 'pipeline_executed',
|
|
|
+ pipeline: pipelineForSocket,
|
|
|
+ count: results.length,
|
|
|
+ results,
|
|
|
+ });
|
|
|
+
|
|
|
+ return results;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|