|
@@ -13,25 +13,49 @@ export const aggregationNode = async (
|
|
|
): Promise<Partial<typeof AgentState.State>> => {
|
|
): Promise<Partial<typeof AgentState.State>> => {
|
|
|
const lastMessage = state.messages[state.messages.length - 1].content as string;
|
|
const lastMessage = state.messages[state.messages.length - 1].content as string;
|
|
|
|
|
|
|
|
|
|
+ const context = state.messages.map(m => `${m._getType()}: ${m.content}`).join('\n');
|
|
|
|
|
+ console.log("Aggregation Context:", context);
|
|
|
|
|
+
|
|
|
const structuredLlm = model.withStructuredOutput(SCHEMAS.AGGREGATION);
|
|
const structuredLlm = model.withStructuredOutput(SCHEMAS.AGGREGATION);
|
|
|
- const params = await structuredLlm.invoke(`Extract aggregation parameters for: "${lastMessage}". Context: ${JSON.stringify(state.entityStore)}`);
|
|
|
|
|
|
|
+ const params = await structuredLlm.invoke(`
|
|
|
|
|
+ Extract aggregation parameters for: "${lastMessage}".
|
|
|
|
|
+
|
|
|
|
|
+ Conversation History:
|
|
|
|
|
+ ${context}
|
|
|
|
|
+
|
|
|
|
|
+ Entity State:
|
|
|
|
|
+ ${JSON.stringify(state.entityStore)}
|
|
|
|
|
+
|
|
|
|
|
+ INSTRUCTIONS:
|
|
|
|
|
+ - Use the History and Entity State to resolve implicit references (e.g., "that site").
|
|
|
|
|
+ - If a specific site/date is mentioned in history/state and NOT overridden, use it.
|
|
|
|
|
+ - If user asks for "per site", "by phase", or "for each block", use the 'groupBy' field.
|
|
|
|
|
+ - IMPORTANT: If NO specific date or range is mentioned by the user or found in history, DEFAULT the start date to "2024-01-01" and end date to "2026-12-31".
|
|
|
|
|
+ `);
|
|
|
|
|
+
|
|
|
|
|
+ // Update entity store with new parameters
|
|
|
|
|
+ const updatedEntityStore = { ...state.entityStore };
|
|
|
|
|
+ if (params.matchStage.site) updatedEntityStore.site = params.matchStage.site;
|
|
|
|
|
+ if (params.matchStage.startDate) updatedEntityStore.startDate = params.matchStage.startDate;
|
|
|
|
|
+ if (params.matchStage.endDate) updatedEntityStore.endDate = params.matchStage.endDate;
|
|
|
|
|
|
|
|
const pipeline: any[] = [];
|
|
const pipeline: any[] = [];
|
|
|
const match: any = {};
|
|
const match: any = {};
|
|
|
|
|
|
|
|
- // Check for null instead of undefined
|
|
|
|
|
- if (params.matchStage.site !== null) {
|
|
|
|
|
- match.site = params.matchStage.site;
|
|
|
|
|
|
|
+ // Use params or fallback to entity store
|
|
|
|
|
+ const siteToUse = params.matchStage.site || updatedEntityStore.site;
|
|
|
|
|
+ if (siteToUse) {
|
|
|
|
|
+ match.site = siteToUse;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- if (params.matchStage.startDate !== null || params.matchStage.endDate !== null) {
|
|
|
|
|
|
|
+ // Date handling
|
|
|
|
|
+ const startDate = params.matchStage.startDate || updatedEntityStore.startDate;
|
|
|
|
|
+ const endDate = params.matchStage.endDate || updatedEntityStore.endDate;
|
|
|
|
|
+
|
|
|
|
|
+ if (startDate || endDate) {
|
|
|
match.productionDate = {};
|
|
match.productionDate = {};
|
|
|
- if (params.matchStage.startDate !== null) {
|
|
|
|
|
- match.productionDate.$gte = new Date(params.matchStage.startDate);
|
|
|
|
|
- }
|
|
|
|
|
- if (params.matchStage.endDate !== null) {
|
|
|
|
|
- match.productionDate.$lte = new Date(params.matchStage.endDate);
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ if (startDate) match.productionDate.$gte = new Date(startDate);
|
|
|
|
|
+ if (endDate) match.productionDate.$lte = new Date(endDate);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if (Object.keys(match).length > 0) {
|
|
if (Object.keys(match).length > 0) {
|
|
@@ -48,14 +72,40 @@ export const aggregationNode = async (
|
|
|
results = results.map(val => ({ [params.fieldToAggregate]: val }));
|
|
results = results.map(val => ({ [params.fieldToAggregate]: val }));
|
|
|
} else {
|
|
} else {
|
|
|
// Count distinct
|
|
// Count distinct
|
|
|
- const distinctValues = await vectorService.getDistinct(params.fieldToAggregate, match);
|
|
|
|
|
- results = [{ totalValue: distinctValues.length }];
|
|
|
|
|
|
|
+ if (params.groupBy) {
|
|
|
|
|
+ // Remove duplicate match push (it was already done above if needed)
|
|
|
|
|
+
|
|
|
|
|
+ // Count distinct per group (e.g. how many blocks per site)
|
|
|
|
|
+ pipeline.push({
|
|
|
|
|
+ $group: {
|
|
|
|
|
+ _id: `$${params.groupBy}`,
|
|
|
|
|
+ distinctValues: { $addToSet: `$${params.fieldToAggregate}` }
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ pipeline.push({
|
|
|
|
|
+ $project: {
|
|
|
|
|
+ _id: 1,
|
|
|
|
|
+ totalValue: { $size: "$distinctValues" }
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ results = await vectorService.aggregate(pipeline);
|
|
|
|
|
+ } else {
|
|
|
|
|
+ const distinctValues = await vectorService.getDistinct(params.fieldToAggregate, match);
|
|
|
|
|
+ results = [{ totalValue: distinctValues.length }];
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
} else {
|
|
} else {
|
|
|
- // Standard aggregation
|
|
|
|
|
- const group: any = { _id: null };
|
|
|
|
|
- const operator = `$${params.aggregationType}`;
|
|
|
|
|
- group.totalValue = { [operator]: `$${params.fieldToAggregate}` };
|
|
|
|
|
|
|
+ // Standard aggregation (Sum, Avg, Count)
|
|
|
|
|
+ // If groupBy is null, _id is null -> one global result
|
|
|
|
|
+ const group: any = { _id: params.groupBy ? `$${params.groupBy}` : null };
|
|
|
|
|
+
|
|
|
|
|
+ let operator = `$${params.aggregationType}`;
|
|
|
|
|
+ if (params.aggregationType === 'count') {
|
|
|
|
|
+ group.totalValue = { $sum: 1 };
|
|
|
|
|
+ } else {
|
|
|
|
|
+ group.totalValue = { [operator]: `$${params.fieldToAggregate}` };
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
pipeline.push({ $group: group });
|
|
pipeline.push({ $group: group });
|
|
|
results = await vectorService.aggregate(pipeline);
|
|
results = await vectorService.aggregate(pipeline);
|
|
|
}
|
|
}
|
|
@@ -63,12 +113,14 @@ export const aggregationNode = async (
|
|
|
let payload: ThoughtPayload = {
|
|
let payload: ThoughtPayload = {
|
|
|
node: `aggregation_node`,
|
|
node: `aggregation_node`,
|
|
|
status: 'completed',
|
|
status: 'completed',
|
|
|
|
|
+ message: `Aggregated ${params.aggregationType} of ${params.fieldToAggregate}`,
|
|
|
pipeline: pipeline,
|
|
pipeline: pipeline,
|
|
|
results: results
|
|
results: results
|
|
|
}
|
|
}
|
|
|
gateway.emitThought(state.socketId, payload);
|
|
gateway.emitThought(state.socketId, payload);
|
|
|
|
|
|
|
|
return {
|
|
return {
|
|
|
- actionPayload: { type: 'aggregate', pipeline, results }
|
|
|
|
|
|
|
+ entityStore: updatedEntityStore,
|
|
|
|
|
+ actionPayload: { type: 'aggregate', pipeline, results, params }
|
|
|
};
|
|
};
|
|
|
};
|
|
};
|