import { AgentState } from "../config/agent-state"; import { PROMPTS, SCHEMAS } from "../config/langchain-config"; import { BaseChatModel } from "@langchain/core/language_models/chat_models"; import { FFBGateway } from "../../ffb.gateway"; import { ThoughtPayload } from "../../ffb-production.schema"; import { FFBVectorService } from "../ffb-vector.service"; export const aggregationNode = async ( state: typeof AgentState.State, model: BaseChatModel, vectorService: FFBVectorService, gateway: FFBGateway ): Promise> => { const lastMessage = state.messages[state.messages.length - 1].content as string; const context = state.messages.map(m => `${m._getType()}: ${m.content}`).join('\n'); console.log("Aggregation Context:", context); const structuredLlm = model.withStructuredOutput(SCHEMAS.AGGREGATION); const params = await structuredLlm.invoke(` Extract aggregation parameters for: "${lastMessage}". Conversation History: ${context} Entity State: ${JSON.stringify(state.entityStore)} INSTRUCTIONS: - Use the History and Entity State to resolve implicit references (e.g., "that site"). - If a specific site/date is mentioned in history/state and NOT overridden, use it. - If user asks for "per site", "by phase", or "for each block", use the 'groupBy' field. - IMPORTANT: If NO specific date or range is mentioned by the user or found in history, DEFAULT the start date to "2024-01-01" and end date to "2026-12-31". `); // Update entity store with new parameters const updatedEntityStore = { ...state.entityStore }; if (params.matchStage.site) updatedEntityStore.site = params.matchStage.site; if (params.matchStage.startDate) updatedEntityStore.startDate = params.matchStage.startDate; if (params.matchStage.endDate) updatedEntityStore.endDate = params.matchStage.endDate; const pipeline: any[] = []; const match: any = {}; // Use params or fallback to entity store const siteToUse = params.matchStage.site || updatedEntityStore.site; if (siteToUse) { match.site = siteToUse; } // Date handling const startDate = params.matchStage.startDate || updatedEntityStore.startDate; const endDate = params.matchStage.endDate || updatedEntityStore.endDate; if (startDate || endDate) { match.productionDate = {}; if (startDate) match.productionDate.$gte = new Date(startDate); if (endDate) match.productionDate.$lte = new Date(endDate); } if (Object.keys(match).length > 0) { pipeline.push({ $match: match }); } let results; if (params.aggregationType === 'list' || params.aggregationType === 'count_distinct') { if (params.aggregationType === 'list') { // Use distinct to get unique values results = await vectorService.getDistinct(params.fieldToAggregate, match); // Format as objects for synthesis results = results.map(val => ({ [params.fieldToAggregate]: val })); } else { // Count distinct if (params.groupBy) { // Remove duplicate match push (it was already done above if needed) // Count distinct per group (e.g. how many blocks per site) pipeline.push({ $group: { _id: `$${params.groupBy}`, distinctValues: { $addToSet: `$${params.fieldToAggregate}` } } }); pipeline.push({ $project: { _id: 1, totalValue: { $size: "$distinctValues" } } }); results = await vectorService.aggregate(pipeline); } else { const distinctValues = await vectorService.getDistinct(params.fieldToAggregate, match); results = [{ totalValue: distinctValues.length }]; } } } else { // Standard aggregation (Sum, Avg, Count) // If groupBy is null, _id is null -> one global result const group: any = { _id: params.groupBy ? `$${params.groupBy}` : null }; let operator = `$${params.aggregationType}`; if (params.aggregationType === 'count') { group.totalValue = { $sum: 1 }; } else { group.totalValue = { [operator]: `$${params.fieldToAggregate}` }; } pipeline.push({ $group: group }); results = await vectorService.aggregate(pipeline); } let payload: ThoughtPayload = { node: `aggregation_node`, status: 'completed', message: `Aggregated ${params.aggregationType} of ${params.fieldToAggregate}`, pipeline: pipeline, results: results } gateway.emitThought(state.socketId, payload); return { entityStore: updatedEntityStore, actionPayload: { type: 'aggregate', pipeline, results, params } }; };