| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- import { AgentState } from "../config/agent-state";
- import { PROMPTS, SCHEMAS } from "../config/langchain-config";
- import { BaseChatModel } from "@langchain/core/language_models/chat_models";
- import { FFBGateway } from "../../ffb.gateway";
- import { ThoughtPayload } from "../../ffb-production.schema";
- import { FFBVectorService } from "../ffb-vector.service";
- export const aggregationNode = async (
- state: typeof AgentState.State,
- model: BaseChatModel,
- vectorService: FFBVectorService,
- gateway: FFBGateway
- ): Promise<Partial<typeof AgentState.State>> => {
- const lastMessage = state.messages[state.messages.length - 1].content as string;
- const context = state.messages.map(m => `${m._getType()}: ${m.content}`).join('\n');
- console.log("Aggregation Context:", context);
- const structuredLlm = model.withStructuredOutput(SCHEMAS.AGGREGATION);
- const params = await structuredLlm.invoke(`
- Extract aggregation parameters for: "${lastMessage}".
-
- Conversation History:
- ${context}
-
- Entity State:
- ${JSON.stringify(state.entityStore)}
-
- INSTRUCTIONS:
- - Use the History and Entity State to resolve implicit references (e.g., "that site").
- - If a specific site/date is mentioned in history/state and NOT overridden, use it.
- - If user asks for "per site", "by phase", or "for each block", use the 'groupBy' field.
- - IMPORTANT: If NO specific date or range is mentioned by the user or found in history, DEFAULT the start date to "2024-01-01" and end date to "2026-12-31".
- `);
- // Update entity store with new parameters
- const updatedEntityStore = { ...state.entityStore };
- if (params.matchStage.site) updatedEntityStore.site = params.matchStage.site;
- if (params.matchStage.startDate) updatedEntityStore.startDate = params.matchStage.startDate;
- if (params.matchStage.endDate) updatedEntityStore.endDate = params.matchStage.endDate;
- const pipeline: any[] = [];
- const match: any = {};
- // Use params or fallback to entity store
- const siteToUse = params.matchStage.site || updatedEntityStore.site;
- if (siteToUse) {
- match.site = siteToUse;
- }
- // Date handling
- const startDate = params.matchStage.startDate || updatedEntityStore.startDate;
- const endDate = params.matchStage.endDate || updatedEntityStore.endDate;
- if (startDate || endDate) {
- match.productionDate = {};
- if (startDate) match.productionDate.$gte = new Date(startDate);
- if (endDate) match.productionDate.$lte = new Date(endDate);
- }
- if (Object.keys(match).length > 0) {
- pipeline.push({ $match: match });
- }
- let results;
- if (params.aggregationType === 'list' || params.aggregationType === 'count_distinct') {
- if (params.aggregationType === 'list') {
- // Use distinct to get unique values
- results = await vectorService.getDistinct(params.fieldToAggregate, match);
- // Format as objects for synthesis
- results = results.map(val => ({ [params.fieldToAggregate]: val }));
- } else {
- // Count distinct
- if (params.groupBy) {
- // Remove duplicate match push (it was already done above if needed)
- // Count distinct per group (e.g. how many blocks per site)
- pipeline.push({
- $group: {
- _id: `$${params.groupBy}`,
- distinctValues: { $addToSet: `$${params.fieldToAggregate}` }
- }
- });
- pipeline.push({
- $project: {
- _id: 1,
- totalValue: { $size: "$distinctValues" }
- }
- });
- results = await vectorService.aggregate(pipeline);
- } else {
- const distinctValues = await vectorService.getDistinct(params.fieldToAggregate, match);
- results = [{ totalValue: distinctValues.length }];
- }
- }
- } else {
- // Standard aggregation (Sum, Avg, Count)
- // If groupBy is null, _id is null -> one global result
- const group: any = { _id: params.groupBy ? `$${params.groupBy}` : null };
- let operator = `$${params.aggregationType}`;
- if (params.aggregationType === 'count') {
- group.totalValue = { $sum: 1 };
- } else {
- group.totalValue = { [operator]: `$${params.fieldToAggregate}` };
- }
- pipeline.push({ $group: group });
- results = await vectorService.aggregate(pipeline);
- }
- let payload: ThoughtPayload = {
- node: `aggregation_node`,
- status: 'completed',
- message: `Aggregated ${params.aggregationType} of ${params.fieldToAggregate}`,
- pipeline: pipeline,
- results: results
- }
- gateway.emitThought(state.socketId, payload);
- return {
- entityStore: updatedEntityStore,
- actionPayload: { type: 'aggregate', pipeline, results, params }
- };
- };
|