|
@@ -1,8 +1,11 @@
|
|
|
import { map, Observable, of, Subject } from "rxjs";
|
|
|
import { ErrorTrigger, MessageAuditorServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype";
|
|
|
-import { LoggingService } from "../dependencies/log/interface/export";
|
|
|
-import { BaseMessage } from "../dependencies/msgutil/interface/export";
|
|
|
import { MessageLog } from "../dependencies/log/type/datatype";
|
|
|
+import * as _ from 'lodash'
|
|
|
+import { LoggingService } from "../dependencies/log/interface/export";
|
|
|
+import { BaseMessage } from "../dependencies/log/dependencies/msgutil/interface/export";
|
|
|
+let processedMsgIds = new Set();
|
|
|
+require('dotenv').config();
|
|
|
|
|
|
export class MessageAuditorService implements MessageAuditorServiceInterface {
|
|
|
private settings: MessageSynchronisationServiceSetting
|
|
@@ -14,6 +17,9 @@ export class MessageAuditorService implements MessageAuditorServiceInterface {
|
|
|
the data to be synchronized */
|
|
|
public init(settings: MessageSynchronisationServiceSetting): void {
|
|
|
this.settings = settings;
|
|
|
+ if (settings.filters) {
|
|
|
+ console.log(`Integrating filters: ${Object.keys(this.settings.filters)} in AuditMessage service`)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/* This is the main interface of the message sync service. The argument will take in an observable stream of
|
|
@@ -26,6 +32,11 @@ export class MessageAuditorService implements MessageAuditorServiceInterface {
|
|
|
obsTrigger.subscribe({
|
|
|
next: obsTrigger => {
|
|
|
console.log(obsTrigger.message)// just checking the message
|
|
|
+ if (!this.settings.filters) {
|
|
|
+ console.log(`No filters applies`)
|
|
|
+ } else {
|
|
|
+ console.log(`Synchronizating with filters: '${Object.keys(this.settings.filters)}': '${Object.values(this.settings.filters)}'`)
|
|
|
+ }
|
|
|
let missingMsg: Observable<MessageLog> = this.synchronize()
|
|
|
missingMsg.subscribe({
|
|
|
next: element => {
|
|
@@ -38,14 +49,69 @@ export class MessageAuditorService implements MessageAuditorServiceInterface {
|
|
|
return this.missingMessageSubject
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ /* ________________ Private Functions _________________ */
|
|
|
+ // Filtering functions to filters out messages
|
|
|
+ private filterData(filters: any, message: MessageLog): boolean {
|
|
|
+ let response: boolean = true //Just using this like a statemanagement
|
|
|
+ let payload: BaseMessage = JSON.parse(message.appData.msgPayload as string) // Extract the payload from the messageLog first
|
|
|
+ this.checkIfIsInPayloadDataFormat(payload) // Convert stringified nested payload if there's any
|
|
|
+ // Making a separate function to cater to different multi filters conditions are coded below
|
|
|
+ if (filters) { // if filters is not null
|
|
|
+ if (Object.keys(filters).length > 1) {
|
|
|
+ let totalCount = Object.keys(filters).length
|
|
|
+ let matchedCount = 0
|
|
|
+ Object.entries(filters).forEach(([key, value]) => {
|
|
|
+ let filters = { [key]: value }
|
|
|
+ // console.log(filters)
|
|
|
+ if (this.checkValues(payload, filters) == true) matchedCount++
|
|
|
+ })
|
|
|
+ if (totalCount == matchedCount) { // check if all the criterias are met
|
|
|
+ response = true
|
|
|
+ } else {
|
|
|
+ response = false
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if (this.checkValues(payload, filters) == true) {
|
|
|
+ response = true
|
|
|
+ } else {
|
|
|
+ response = false
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else { // if not filters is provided. Then the just let response be true so that the data can be further processed
|
|
|
+ response = true
|
|
|
+ }
|
|
|
+ return response
|
|
|
+ }
|
|
|
+
|
|
|
/* This is where the 'synching' operation takes place. */
|
|
|
private synchronize(): Subject<MessageLog> {
|
|
|
let subjectOutput: Subject<MessageLog> = new Subject()
|
|
|
// Acquire the data from both location and return them as an array respectively.
|
|
|
this.acquireData().then((data: { arr1: MessageLog[], arr2: MessageLog[] }) => {
|
|
|
// In the case where there are differences in the array length, then extensive comparison
|
|
|
- // will be carried out to filter out the differences. Differences are the missing data.
|
|
|
+ // will be carried out to filters out the differences. Differences are the missing data.
|
|
|
+
|
|
|
+ if(process.env.CheckAudit)
|
|
|
+ {
|
|
|
+ console.log("Record set 1: ", _.keys(_.countBy(data.arr1,function(data:MessageLog){return data.appData['msgId']})).length);
|
|
|
+ console.log("Record set 2: ", _.keys(_.countBy(data.arr2,function(data:MessageLog){return data.appData['msgId']})).length);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ if(process.env.CheckAudit)
|
|
|
+ {
|
|
|
+ console.log("[CheckAudit] Record set 1: ", _.keys(_.countBy(data.arr1,function(data:MessageLog){return data.appData['msgId']})).length);
|
|
|
+ console.log("[CheckAudit] Record set 2: ", _.keys(_.countBy(data.arr2,function(data:MessageLog){return data.appData['msgId']})).length);
|
|
|
+ }
|
|
|
+
|
|
|
this.checkArrayDifferences(data).then((data: MessageLog[]) => {
|
|
|
+
|
|
|
+ if(process.env.CheckAudit)
|
|
|
+ {
|
|
|
+ console.log("Difference: ",data.length);
|
|
|
+ }
|
|
|
+
|
|
|
data.forEach(msgElement => {
|
|
|
let refined = JSON.parse(JSON.stringify(msgElement))
|
|
|
// Once the missing data has been weeded out, it is then passed into the Subject
|
|
@@ -61,25 +127,28 @@ export class MessageAuditorService implements MessageAuditorServiceInterface {
|
|
|
private async acquireData(): Promise<any> {
|
|
|
const promiseQuery: Promise<any> = new Promise((resolve, reject) => {
|
|
|
// declare what to expect.
|
|
|
- let allSets: {
|
|
|
- arr1: MessageLog[],
|
|
|
- arr2: MessageLog[]
|
|
|
- } = {
|
|
|
+ let allSets: { arr1: MessageLog[], arr2: MessageLog[] } = {
|
|
|
arr1: [],
|
|
|
arr2: []
|
|
|
}
|
|
|
- let set1: MessageLog[]
|
|
|
- let set2: MessageLog[]
|
|
|
+ let set1: MessageLog[] = []
|
|
|
+ let set2: MessageLog[] = []
|
|
|
|
|
|
// Initiate the source to find the location of the targeted data to be synched.
|
|
|
this.sourceSrc.init(this.settings.incomingSource).then(() => {
|
|
|
this.targetSrc.init(this.settings.target).then(() => {
|
|
|
// Filter also carries out the query aspect of the operation, allowing it to acquire all the relevant data.
|
|
|
this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data: MessageLog[]) => {
|
|
|
- set1 = data
|
|
|
+ data.forEach((message: MessageLog) => {
|
|
|
+ if (this.filterData(this.settings.filters, message)) set1.push(message)
|
|
|
+ })
|
|
|
+ }).catch((err) => {
|
|
|
+ console.error(err.message)
|
|
|
}).then(() => {
|
|
|
this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data: MessageLog[]) => {
|
|
|
- set2 = data
|
|
|
+ data.forEach(message => {
|
|
|
+ if (this.filterData(this.settings.filters, message)) set2.push(message)
|
|
|
+ })
|
|
|
allSets.arr1 = set1
|
|
|
allSets.arr2 = set2
|
|
|
resolve(allSets)
|
|
@@ -92,7 +161,7 @@ export class MessageAuditorService implements MessageAuditorServiceInterface {
|
|
|
}
|
|
|
|
|
|
// compare results and return differences
|
|
|
- private async checkArrayDifferences(args: { arr1?: any[], arr2?: any[] }): Promise<MessageLog[]> {
|
|
|
+ private async checkArrayDifferences(args: { arr1: MessageLog[], arr2: MessageLog[] }): Promise<MessageLog[]> {
|
|
|
return new Promise((resolve, reject) => {
|
|
|
let missingMsg: MessageLog[] = []
|
|
|
args.arr1.forEach((msgElement: MessageLog) => {
|
|
@@ -110,5 +179,48 @@ export class MessageAuditorService implements MessageAuditorServiceInterface {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
+ // To be used by the filterData function to check between payload values and filter conditions
|
|
|
+ private checkValues(payload, filters): boolean { //FYI, all parameters are string
|
|
|
+ let key = Object.keys(filters)
|
|
|
+ // console.log(Object.values(filters))
|
|
|
+ let value = Object.values(filters)[0]
|
|
|
+ let res = _.get(payload, key[0])
|
|
|
+ // Check first if the payload has the filtering properties/path
|
|
|
+ if (_.has(payload, key[0])) {
|
|
|
+ let strarray: string[]
|
|
|
+ // check array
|
|
|
+ if (Array.isArray(value)) {
|
|
|
+ strarray = value as string[]
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ strarray = [value as string]
|
|
|
+ }
|
|
|
+ // compare array with that string
|
|
|
+ if (strarray.includes(res)) {
|
|
|
+ return true
|
|
|
+ } else {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ console.log(`${key} does not exists in payload`)
|
|
|
+ return false
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check in the case of notification messages, for the nested data properties
|
|
|
+ // Notification message may have multiple nested data properties that maybe in string format
|
|
|
+ private checkIfIsInPayloadDataFormat(payload: BaseMessage | any) {
|
|
|
+ let parsedData: any
|
|
|
+ if (payload.data
|
|
|
+ && payload.data.data
|
|
|
+ && payload.data.data.data && typeof payload.data.data.data === 'string') {
|
|
|
+ parsedData = JSON.parse(payload.data.data.data)
|
|
|
+ // console.log(parsedData)
|
|
|
+ payload.data.data.data = parsedData
|
|
|
+ return payload
|
|
|
+ } else {
|
|
|
+ return payload
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
}
|