aggregation.node.ts 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. import { AgentState } from "../config/agent-state";
  2. import { PROMPTS, SCHEMAS } from "../config/langchain-config";
  3. import { BaseChatModel } from "@langchain/core/language_models/chat_models";
  4. import { FFBGateway } from "../../ffb.gateway";
  5. import { ThoughtPayload } from "../../ffb-production.schema";
  6. import { FFBVectorService } from "../ffb-vector.service";
  7. export const aggregationNode = async (
  8. state: typeof AgentState.State,
  9. model: BaseChatModel,
  10. vectorService: FFBVectorService,
  11. gateway: FFBGateway
  12. ): Promise<Partial<typeof AgentState.State>> => {
  13. const lastMessage = state.messages[state.messages.length - 1].content as string;
  14. const context = state.messages.map(m => `${m._getType()}: ${m.content}`).join('\n');
  15. console.log("Aggregation Context:", context);
  16. const structuredLlm = model.withStructuredOutput(SCHEMAS.AGGREGATION);
  17. const params = await structuredLlm.invoke(`
  18. Extract aggregation parameters for: "${lastMessage}".
  19. Conversation History:
  20. ${context}
  21. Entity State:
  22. ${JSON.stringify(state.entityStore)}
  23. INSTRUCTIONS:
  24. - Use the History and Entity State to resolve implicit references (e.g., "that site").
  25. - If a specific site/date is mentioned in history/state and NOT overridden, use it.
  26. - If user asks for "per site", "by phase", or "for each block", use the 'groupBy' field.
  27. - IMPORTANT: If NO specific date or range is mentioned by the user or found in history, DEFAULT the start date to "2024-01-01" and end date to "2026-12-31".
  28. `);
  29. // Update entity store with new parameters
  30. const updatedEntityStore = { ...state.entityStore };
  31. if (params.matchStage.site) updatedEntityStore.site = params.matchStage.site;
  32. if (params.matchStage.startDate) updatedEntityStore.startDate = params.matchStage.startDate;
  33. if (params.matchStage.endDate) updatedEntityStore.endDate = params.matchStage.endDate;
  34. const pipeline: any[] = [];
  35. const match: any = {};
  36. // Use params or fallback to entity store
  37. const siteToUse = params.matchStage.site || updatedEntityStore.site;
  38. if (siteToUse) {
  39. match.site = siteToUse;
  40. }
  41. // Date handling
  42. const startDate = params.matchStage.startDate || updatedEntityStore.startDate;
  43. const endDate = params.matchStage.endDate || updatedEntityStore.endDate;
  44. if (startDate || endDate) {
  45. match.productionDate = {};
  46. if (startDate) match.productionDate.$gte = new Date(startDate);
  47. if (endDate) match.productionDate.$lte = new Date(endDate);
  48. }
  49. if (Object.keys(match).length > 0) {
  50. pipeline.push({ $match: match });
  51. }
  52. let results;
  53. if (params.aggregationType === 'list' || params.aggregationType === 'count_distinct') {
  54. if (params.aggregationType === 'list') {
  55. // Use distinct to get unique values
  56. results = await vectorService.getDistinct(params.fieldToAggregate, match);
  57. // Format as objects for synthesis
  58. results = results.map(val => ({ [params.fieldToAggregate]: val }));
  59. } else {
  60. // Count distinct
  61. if (params.groupBy) {
  62. // Remove duplicate match push (it was already done above if needed)
  63. // Count distinct per group (e.g. how many blocks per site)
  64. pipeline.push({
  65. $group: {
  66. _id: `$${params.groupBy}`,
  67. distinctValues: { $addToSet: `$${params.fieldToAggregate}` }
  68. }
  69. });
  70. pipeline.push({
  71. $project: {
  72. _id: 1,
  73. totalValue: { $size: "$distinctValues" }
  74. }
  75. });
  76. results = await vectorService.aggregate(pipeline);
  77. } else {
  78. const distinctValues = await vectorService.getDistinct(params.fieldToAggregate, match);
  79. results = [{ totalValue: distinctValues.length }];
  80. }
  81. }
  82. } else {
  83. // Standard aggregation (Sum, Avg, Count)
  84. // If groupBy is null, _id is null -> one global result
  85. const group: any = { _id: params.groupBy ? `$${params.groupBy}` : null };
  86. let operator = `$${params.aggregationType}`;
  87. if (params.aggregationType === 'count') {
  88. group.totalValue = { $sum: 1 };
  89. } else {
  90. group.totalValue = { [operator]: `$${params.fieldToAggregate}` };
  91. }
  92. pipeline.push({ $group: group });
  93. results = await vectorService.aggregate(pipeline);
  94. }
  95. let payload: ThoughtPayload = {
  96. node: `aggregation_node`,
  97. status: 'completed',
  98. message: `Aggregated ${params.aggregationType} of ${params.fieldToAggregate}`,
  99. pipeline: pipeline,
  100. results: results
  101. }
  102. gateway.emitThought(state.socketId, payload);
  103. return {
  104. entityStore: updatedEntityStore,
  105. actionPayload: { type: 'aggregate', pipeline, results, params }
  106. };
  107. };