Prechádzať zdrojové kódy

first draft on audit filtering

Enzo 1 rok pred
rodič
commit
16d19a40ef

+ 1 - 1
dependencies/log

@@ -1 +1 @@
-Subproject commit d2b19d8619e2d5cc96c5ade77d9195b7b861aa85
+Subproject commit 0b53937fd2267451215d7139ab04f65c62fd1102

+ 11 - 0
package-lock.json

@@ -12,6 +12,7 @@
         "@types/node": "^18.11.18",
         "dotenv": "^16.0.3",
         "jsonschema": "^1.4.1",
+        "lodash": "^4.17.21",
         "mongoose": "^6.9.0",
         "rfdc": "^1.3.0",
         "rxjs": "^7.8.1",
@@ -1255,6 +1256,11 @@
         "node": ">=12.0.0"
       }
     },
+    "node_modules/lodash": {
+      "version": "4.17.21",
+      "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz",
+      "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg=="
+    },
     "node_modules/memory-pager": {
       "version": "1.5.0",
       "resolved": "https://registry.npmjs.org/memory-pager/-/memory-pager-1.5.0.tgz",
@@ -2473,6 +2479,11 @@
       "resolved": "https://registry.npmjs.org/kareem/-/kareem-2.5.1.tgz",
       "integrity": "sha512-7jFxRVm+jD+rkq3kY0iZDJfsO2/t4BBPeEb2qKn2lR/9KhuksYk5hxzfRYWMPV8P/x2d0kHD306YyWLzjjH+uA=="
     },
+    "lodash": {
+      "version": "4.17.21",
+      "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz",
+      "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg=="
+    },
     "memory-pager": {
       "version": "1.5.0",
       "resolved": "https://registry.npmjs.org/memory-pager/-/memory-pager-1.5.0.tgz",

+ 2 - 0
package.json

@@ -15,6 +15,7 @@
     "start3b": "node test/test3b.js",
     "start3c": "node test/test3c.js",
     "start4": "node test/test4.js",
+    "start5": "node test/test5.js",
     "test": "echo \"Error: no test specified\" && exit 1"
   },
   "repository": {
@@ -27,6 +28,7 @@
     "@types/node": "^18.11.18",
     "dotenv": "^16.0.3",
     "jsonschema": "^1.4.1",
+    "lodash": "^4.17.21",
     "mongoose": "^6.9.0",
     "rfdc": "^1.3.0",
     "rxjs": "^7.8.1",

+ 75 - 6
services/message-auditor.service.ts

@@ -1,14 +1,16 @@
 import { map, Observable, of, Subject } from "rxjs";
 import { ErrorTrigger, MessageAuditorServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype";
 import { LoggingService } from "../dependencies/log/interface/export";
-import { BaseMessage } from "../dependencies/msgutil/interface/export";
 import { MessageLog } from "../dependencies/log/type/datatype";
+import { _ } from 'lodash'
+import { BaseMessage, RequestMessage, ResponseMessage } from "../dependencies/msgutil/interface/export";
 
 export class MessageAuditorService implements MessageAuditorServiceInterface {
     private settings: MessageSynchronisationServiceSetting
     private sourceSrc: LoggingService = new LoggingService()
     private targetSrc: LoggingService = new LoggingService()
     private missingMessageSubject: Subject<MessageLog> = new Subject()
+    private filter: any
 
     /* Set up the targets or points of synchronization. This is where it will register the 2 different location of 
     the data to be synchronized */
@@ -26,6 +28,11 @@ export class MessageAuditorService implements MessageAuditorServiceInterface {
         obsTrigger.subscribe({
             next: obsTrigger => {
                 console.log(obsTrigger.message)// just checking the message
+                if (!this.filter) {
+                    console.log(`No filter applies`)
+                } else {
+                    console.log(`Synchronizating with filters: ${Object.keys(this.filter)}: ${Object.values(this.filter)}`)
+                }
                 let missingMsg: Observable<MessageLog> = this.synchronize()
                 missingMsg.subscribe({
                     next: element => {
@@ -38,6 +45,62 @@ export class MessageAuditorService implements MessageAuditorServiceInterface {
         return this.missingMessageSubject
     }
 
+    public setFilter(filters: any) {
+        this.filter = filters
+        console.log(`Integrating filters: ${Object.keys(this.filter)} in AuditMessage service`)
+    }
+
+
+    /* ________________ Private Functions _________________ */
+    // Filtering functions to filter out messages
+    private filterData(filters: any, message: MessageLog): boolean {
+        let response: boolean = true //Just using this like a statemanagement
+        let payload: BaseMessage = JSON.parse(message.appData.msgPayload as string) // Extract the payload from the messageLog first
+        // Making a separate function to cater to different multi filter conditions are coded below
+        function checkValues(filter): boolean { //FYI, all parameters are string
+            let key = Object.keys(filter)
+            let value = Object.values(filter)
+            let res = _.get(payload, key[0])
+            // Check first if the payload has the filtering properties/path
+            if (_.has(payload, key[0])) {
+                // check if value is equal to fitler's
+                if (value == res) {
+                    return true
+                } else {
+                    return false
+                }
+            } else {
+                console.log(`${key} does not exists in payload`)
+                return false
+            }
+        }
+        if (filters) { // if filters is not null
+            if (Object.keys(filters).length > 1) {
+                let totalCount = Object.keys(filters).length
+                let matchedCount = 0
+                Object.entries(filters).forEach(([key, value]) => {
+                    let filter = { [key]: value }
+                    // console.log(filter)
+                    if (checkValues(filter) == true) matchedCount++
+                })
+                if (totalCount == matchedCount) {
+                    response = true
+                } else {
+                    response = false
+                }
+            } else {
+                if (checkValues(filters) == true) {
+                    response = true
+                } else {
+                    response = false
+                }
+            }
+        } else {
+            response = true
+        }
+        return response
+    }
+
     /* This is where the 'synching' operation takes place. */
     private synchronize(): Subject<MessageLog> {
         let subjectOutput: Subject<MessageLog> = new Subject()
@@ -68,18 +131,24 @@ export class MessageAuditorService implements MessageAuditorServiceInterface {
                 arr1: [],
                 arr2: []
             }
-            let set1: MessageLog[]
-            let set2: MessageLog[]
+            let set1: MessageLog[] = []
+            let set2: MessageLog[] = []
 
             // Initiate the source to find the location of the targeted data to be synched.
             this.sourceSrc.init(this.settings.incomingSource).then(() => {
                 this.targetSrc.init(this.settings.target).then(() => {
                     // Filter also carries out the query aspect of the operation, allowing it to acquire all the relevant data.
                     this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data: MessageLog[]) => {
-                        set1 = data
+                        data.forEach((message: MessageLog) => {
+                            if (this.filterData(this.filter, message)) set1.push(message)
+                        })
+                    }).catch((err) => {
+                        console.error(err.message)
                     }).then(() => {
                         this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data: MessageLog[]) => {
-                            set2 = data
+                            data.forEach(message => {
+                                if (this.filterData(this.filter, message)) set2.push(message)
+                            })
                             allSets.arr1 = set1
                             allSets.arr2 = set2
                             resolve(allSets)
@@ -92,7 +161,7 @@ export class MessageAuditorService implements MessageAuditorServiceInterface {
     }
 
     // compare results and return differences
-    private async checkArrayDifferences(args: { arr1?: any[], arr2?: any[] }): Promise<MessageLog[]> {
+    private async checkArrayDifferences(args: { arr1: MessageLog[], arr2: MessageLog[] }): Promise<MessageLog[]> {
         return new Promise((resolve, reject) => {
             let missingMsg: MessageLog[] = []
             args.arr1.forEach((msgElement: MessageLog) => {

+ 1 - 1
test/test-streamOBS.ts

@@ -1,7 +1,7 @@
 /* ----------------------        Simulate a stream of messages to be inserted or used by the test        ---------------------- */
 
 
-import { from, map, Observable, of, Subject } from "rxjs";
+import { Subject } from "rxjs";
 import * as fs from "fs"
 import { BaseMessage } from "../dependencies/msgutil/interface/export";
 

+ 5 - 2
test/test3c.ts

@@ -62,8 +62,11 @@ let publisher_storage: LogSetting = {
         logLocName: 'To be generated in client',
     },
     customSetting: {
-        server: "192.168.100.59:27017",
-        database: "test"
+        srv: true,
+        user: "testDB",
+        password: "h1nt1OyXw6QeUnzS",
+        server: "cluster0.29sklte.mongodb.net",
+        database: "test2",
     }
 }
 

+ 109 - 0
test/test5.ts

@@ -0,0 +1,109 @@
+/* ---------------------    TEST 5 : Filtering     -------------------------- */
+/* This test is specifically design for testing the audit with additional fitlers. When the primary source want to perform
+audit on the designated target, they will impose one or many condition, in that only the data that meets the criteria
+will be taken into consideratoin for auditng. */
+import { Observable, Subject, take } from "rxjs"
+import { MessageAuditorService } from "../services/message-auditor.service"
+import { MessageAuditorServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype"
+import { LogSetting, MessageLog } from "../dependencies/log/type/datatype"
+import { _ } from 'lodash'
+import { StreamingService } from "./test-streamOBS"
+import { LoggingService } from "../dependencies/log/services/logging-service"
+import { ResponseMessage } from "../dependencies/msgutil/interface/export"
+const auditService: MessageAuditorServiceInterface = new MessageAuditorService()
+const publisherloggingService: LoggingService = new LoggingService()
+const subscriberloggingService: LoggingService = new LoggingService()
+const stream = new StreamingService()
+
+let triggerSyncSubject: Subject<any> = new Subject()
+const publisher_take_four_messages: Observable<any> = stream.stream().pipe(take(4))
+const publisher: Subject<any> = new Subject()
+publisher_take_four_messages.subscribe({
+    next: (data) => {
+        publisher.next(data)
+    }
+})
+const subscriber_take_two_messagse: Observable<any> = stream.stream().pipe(take(2))
+const subscriber: Subject<any> = new Subject()
+subscriber_take_two_messagse.subscribe({
+    next: (data) => {
+        subscriber.next(<ResponseMessage>data)
+    }
+})
+
+let source: LogSetting = {
+    cacheMessageLimit: 0,
+    storage: "MongoDB",
+    setting: {
+        appName: 'Deafult from source',
+        appLocName: 'To be generated in source',
+        logLocName: 'To be generated in source',
+    },
+    customSetting: {
+        url: 'mongodb+srv://testDB:h1nt1OyXw6QeUnzS@cluster0.29sklte.mongodb.net/test'
+    }
+}
+let target: LogSetting = {
+    cacheMessageLimit: 0,
+    storage: "MongoDB",
+    setting: {
+        appName: 'Default from target',
+        appLocName: 'To be generated in target',
+        logLocName: 'To be generated in target',
+    },
+    customSetting: {
+        url: 'mongodb+srv://testDB:h1nt1OyXw6QeUnzS@cluster0.29sklte.mongodb.net/test2'
+    }
+}
+
+// Combine source and target storage to form MessageSynchronisationServiceSetting. This is required in messageAudit initialization
+let settings: MessageSynchronisationServiceSetting = {
+    incomingSource: {
+        //all of the settings to be combined here
+        ...source,
+        tags: ['default'],
+    },
+    target: {
+        ...target,
+        tags: ['default'],
+    },
+}
+
+/* -------  Calling the functions to be tested ----------- */
+intializeLogging(source, target)
+initializeAuditService(settings)
+setTimeout(() => {
+    performSync({ status: 1, message: "GO! GO! GO!" })
+}, 5000)
+
+
+// Basically start up all the functions and relevant subscription service in Audit Service.
+async function initializeAuditService(configuration: MessageSynchronisationServiceSetting) {
+    let filter: any = {
+        'data.data.appData.msgTag[0]': 'likable',
+        'header.messageProducerInformation.origin.userApplication.userAppName': 'Client'
+    }
+    auditService.init(configuration) // Configure two points of audit
+    // auditService.setFilter({ 'data.data.appData.msgTag[0]': 'oval' }) // set fitler if there's any. Please not that 
+    auditService.setFilter(filter) // set fitler if there's any. Please note that filtering doesn't work on arrays yet.
+    auditService.subscribe(triggerSyncSubject).subscribe((missingElements: MessageLog) => {
+        let message = JSON.parse(missingElements.appData.msgPayload as any)
+        subscriber.next(message)
+    })
+}
+
+// Emit an args into the synchronization trigger stream to perform a sync
+async function performSync(args: any) {
+    triggerSyncSubject.next(args)
+}
+
+// Set up logging point
+async function intializeLogging(source: LogSetting, target: LogSetting) {
+    publisherloggingService.init(source).then(() => {
+        publisherloggingService.subscribe(publisher)
+    })
+    subscriberloggingService.init(target).then(() => {
+        subscriberloggingService.subscribe(subscriber)
+    })
+}
+

+ 2 - 7
testRequest.json

@@ -41,8 +41,7 @@
                     "msgLogDateTime": "2022-12-06T15:01:46.987Z",
                     "msgDateTime": "2022-12-06T08:50:33.809Z",
                     "msgTag": [
-                        "oval",
-                        "likable"
+                        "oval"
                     ],
                     "msgPayload": "Molestias facilis iusto similique iste voluptas facere. Alias est sequi. Quos consequatur temporibus blanditiis numquam vel. Eos repellat eaque. Voluptatibus optio optio magni eveniet. Quidem architecto esse aut sint neque error magnam perspiciatis."
                 }
@@ -91,7 +90,6 @@
                     "msgLogDateTime": "2022-12-06T15:01:46.987Z",
                     "msgDateTime": "2022-12-06T08:50:33.809Z",
                     "msgTag": [
-                        "oval",
                         "likable"
                     ],
                     "msgPayload": "Molestias facilis iusto similique iste voluptas facere. Alias est sequi. Quos consequatur temporibus blanditiis numquam vel. Eos repellat eaque. Voluptatibus optio optio magni eveniet. Quidem architecto esse aut sint neque error magnam perspiciatis."
@@ -141,7 +139,6 @@
                     "msgLogDateTime": "2022-12-06T15:01:46.987Z",
                     "msgDateTime": "2022-12-06T08:50:33.809Z",
                     "msgTag": [
-                        "oval",
                         "likable"
                     ],
                     "msgPayload": "Molestias facilis iusto similique iste voluptas facere. Alias est sequi. Quos consequatur temporibus blanditiis numquam vel. Eos repellat eaque. Voluptatibus optio optio magni eveniet. Quidem architecto esse aut sint neque error magnam perspiciatis."
@@ -191,8 +188,7 @@
                     "msgLogDateTime": "2022-12-06T15:01:46.987Z",
                     "msgDateTime": "2022-12-06T08:50:33.809Z",
                     "msgTag": [
-                        "oval",
-                        "likable"
+                        "oval"
                     ],
                     "msgPayload": "Molestias facilis iusto similique iste voluptas facere. Alias est sequi. Quos consequatur temporibus blanditiis numquam vel. Eos repellat eaque. Voluptatibus optio optio magni eveniet. Quidem architecto esse aut sint neque error magnam perspiciatis."
                 }
@@ -241,7 +237,6 @@
                     "msgLogDateTime": "2022-12-06T15:01:46.987Z",
                     "msgDateTime": "2022-12-06T08:50:33.809Z",
                     "msgTag": [
-                        "oval",
                         "likable"
                     ],
                     "msgPayload": "Molestias facilis iusto similique iste voluptas facere. Alias est sequi. Quos consequatur temporibus blanditiis numquam vel. Eos repellat eaque. Voluptatibus optio optio magni eveniet. Quidem architecto esse aut sint neque error magnam perspiciatis."

+ 3 - 1
type/datatype.ts

@@ -32,7 +32,9 @@ export interface MessageAuditorServiceInterface {
     // Set default setting
     init(settings: MessageSynchronisationServiceSetting): void;
     // Subscribe to trigger
-    subscribe(obs: Observable<ErrorTrigger>): Observable<any>;
+    subscribe(obs: Observable<ErrorTrigger>): Observable<any>;  
+    // Set filter
+    setFilter(any): any
 }
 
 export interface ErrorTrigger {