Enzo преди 1 година
родител
ревизия
26eedb3c72

+ 1 - 2
services/acknowledgement.service.ts

@@ -1,9 +1,8 @@
-import { AnyObject } from "mongoose";
 import { map, Observable, of, tap } from "rxjs";
 import { Acknowledgemeent, AcknowledgementLogSetting } from "../type/acknowledgement.interface";
-import { BaseMessage, Command, FisCreateMessageUtility, ResponseMessage, Uuid } from "../dependencies/msgutil/interface/export";
 import { LogSetting, MessageLog } from "../dependencies/log/type/datatype";
 import { LoggingService } from "../dependencies/log/interface/export";
+import { BaseMessage, Command, FisCreateMessageUtility, ResponseMessage, Uuid } from "../dependencies/log/dependencies/msgutil/interface/export";
 
 
 /**

+ 2 - 2
services/incomingMessage.service.ts

@@ -1,8 +1,8 @@
 import { map, Observable, of, tap } from "rxjs";
 import { IncomingMessageServiceInterface } from "../type/datatype";
-import { LoggingService } from "../dependencies/log/interface/export";
 import { LogSetting, MessageLog } from "../dependencies/log/type/datatype";
-import { BaseMessage, Uuid } from "../dependencies/msgutil/interface/export";
+import { BaseMessage, Uuid } from "../dependencies/log/dependencies/msgutil/interface/export";
+import { LoggingService } from "../dependencies/log/interface/export";
 
 /**
  * @deprecated The logging is now supported by the Fis-Logging library.

+ 20 - 22
services/message-auditor.service.ts

@@ -1,24 +1,22 @@
 import { map, Observable, of, Subject } from "rxjs";
 import { ErrorTrigger, MessageAuditorServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype";
-import { LoggingService } from "../dependencies/log/interface/export";
 import { MessageLog } from "../dependencies/log/type/datatype";
 import { _ } from 'lodash'
-import { BaseMessage, RequestMessage, ResponseMessage } from "../dependencies/msgutil/interface/export";
+import { LoggingService } from "../dependencies/log/interface/export";
+import { BaseMessage } from "../dependencies/log/dependencies/msgutil/interface/export";
 
 export class MessageAuditorService implements MessageAuditorServiceInterface {
     private settings: MessageSynchronisationServiceSetting
     private sourceSrc: LoggingService = new LoggingService()
     private targetSrc: LoggingService = new LoggingService()
     private missingMessageSubject: Subject<MessageLog> = new Subject()
-    private filter: any
 
     /* Set up the targets or points of synchronization. This is where it will register the 2 different location of 
     the data to be synchronized */
-    public init(settings: MessageSynchronisationServiceSetting, filters?: any): void {
+    public init(settings: MessageSynchronisationServiceSetting): void {
         this.settings = settings;
-        if (filters) {
-            console.log(`Integrating filters: ${Object.keys(this.filter)} in AuditMessage service`)
-            this.filter = filters
+        if (settings.filters) {
+            console.log(`Integrating filters: ${Object.keys(this.settings.filters)} in AuditMessage service`)
         }
     }
 
@@ -32,10 +30,10 @@ export class MessageAuditorService implements MessageAuditorServiceInterface {
         obsTrigger.subscribe({
             next: obsTrigger => {
                 console.log(obsTrigger.message)// just checking the message
-                if (!this.filter) {
-                    console.log(`No filter applies`)
+                if (!this.settings.filters) {
+                    console.log(`No filters applies`)
                 } else {
-                    console.log(`Synchronizating with filters: ${Object.keys(this.filter)}: ${Object.values(this.filter)}`)
+                    console.log(`Synchronizating with filters: ${Object.keys(this.settings.filters)}: ${Object.values(this.settings.filters)}`)
                 }
                 let missingMsg: Observable<MessageLog> = this.synchronize()
                 missingMsg.subscribe({
@@ -51,15 +49,15 @@ export class MessageAuditorService implements MessageAuditorServiceInterface {
 
 
     /* ________________ Private Functions _________________ */
-    // Filtering functions to filter out messages
+    // Filtering functions to filters out messages
     private filterData(filters: any, message: MessageLog): boolean {
         let response: boolean = true //Just using this like a statemanagement
         let payload: BaseMessage = JSON.parse(message.appData.msgPayload as string) // Extract the payload from the messageLog first
-        // Making a separate function to cater to different multi filter conditions are coded below
-        function checkValues(filter): boolean { //FYI, all parameters are string
-            let key = Object.keys(filter)
-            console.log(Object.values(filter))
-            let value = Object.values(filter)[0]
+        // Making a separate function to cater to different multi filters conditions are coded below
+        function checkValues(filters): boolean { //FYI, all parameters are string
+            let key = Object.keys(filters)
+            console.log(Object.values(filters))
+            let value = Object.values(filters)[0]
             let res = _.get(payload, key[0])
             // Check first if the payload has the filtering properties/path
             if (_.has(payload, key[0])) {
@@ -88,9 +86,9 @@ export class MessageAuditorService implements MessageAuditorServiceInterface {
                 let totalCount = Object.keys(filters).length
                 let matchedCount = 0
                 Object.entries(filters).forEach(([key, value]) => {
-                    let filter = { [key]: value }
-                    // console.log(filter)
-                    if (checkValues(filter) == true) matchedCount++
+                    let filters = { [key]: value }
+                    // console.log(filters)
+                    if (checkValues(filters) == true) matchedCount++
                 })
                 if (totalCount == matchedCount) {
                     response = true
@@ -116,7 +114,7 @@ export class MessageAuditorService implements MessageAuditorServiceInterface {
         // Acquire the data from both location and return them as an array respectively.
         this.acquireData().then((data: { arr1: MessageLog[], arr2: MessageLog[] }) => {
             // In the case where there are differences in the array length, then extensive comparison
-            // will be carried out to filter out the differences. Differences are the missing data.
+            // will be carried out to filters out the differences. Differences are the missing data.
             this.checkArrayDifferences(data).then((data: MessageLog[]) => {
                 data.forEach(msgElement => {
                     let refined = JSON.parse(JSON.stringify(msgElement))
@@ -149,14 +147,14 @@ export class MessageAuditorService implements MessageAuditorServiceInterface {
                     // Filter also carries out the query aspect of the operation, allowing it to acquire all the relevant data.
                     this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data: MessageLog[]) => {
                         data.forEach((message: MessageLog) => {
-                            if (this.filterData(this.filter, message)) set1.push(message)
+                            if (this.filterData(this.settings.filters, message)) set1.push(message)
                         })
                     }).catch((err) => {
                         console.error(err.message)
                     }).then(() => {
                         this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data: MessageLog[]) => {
                             data.forEach(message => {
-                                if (this.filterData(this.filter, message)) set2.push(message)
+                                if (this.filterData(this.settings.filters, message)) set2.push(message)
                             })
                             allSets.arr1 = set1
                             allSets.arr2 = set2

+ 1 - 1
test/test-streamOBS.ts

@@ -3,7 +3,7 @@
 
 import { Subject } from "rxjs";
 import * as fs from "fs"
-import { BaseMessage } from "../dependencies/msgutil/interface/export";
+import { BaseMessage } from "../dependencies/log/dependencies/msgutil/interface/export";
 
 export class StreamingService {
     private messagesJSON: any = fs.readFileSync("testRequest.json")

+ 2 - 6
test/test1a.ts

@@ -1,13 +1,9 @@
-import { AcknowledgementService } from "../services/acknowledgement.service";
 import { StreamingService } from "./test-streamOBS";
-import { MessageAuditorService } from "../services/message-auditor.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
-import { map, Observable } from "rxjs";
-import { BaseMessage } from "../dependencies/msgutil/interface/export";
+import { Observable } from "rxjs";
 import { LogSetting } from "../dependencies/log/type/datatype";
+import { BaseMessage } from "../dependencies/log/dependencies/msgutil/interface/export";
 const incoming = new IncomingMessageService()
-const acknowledge = new AcknowledgementService()
-const syncrhonize = new MessageAuditorService()
 const streamService = new StreamingService()
 
 /* --------------  TEST -------------------- */

+ 1 - 3
test/test1b.ts

@@ -3,12 +3,10 @@ import { StreamingService } from "./test-streamOBS";
 import { MessageAuditorService } from "../services/message-auditor.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { map, Observable } from "rxjs";
-import { BaseMessage } from "../dependencies/msgutil/interface/export";
 import { LogSetting } from "../dependencies/log/type/datatype";
+import { BaseMessage } from "../dependencies/log/dependencies/msgutil/interface/export";
 
-const incoming = new IncomingMessageService()
 const acknowledge = new AcknowledgementService()
-const syncrhonize = new MessageAuditorService()
 const streamService = new StreamingService()
 
 /* --------------  TEST -------------------- */

+ 3 - 7
test/test2a.ts

@@ -1,13 +1,9 @@
-import { AcknowledgementService } from "../services/acknowledgement.service";
 import { StreamingService } from "./test-streamOBS";
-import { MessageAuditorService } from "../services/message-auditor.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
-import { map, Observable, take } from "rxjs";
-import { BaseMessage } from "../dependencies/msgutil/interface/export";
+import { Observable, take } from "rxjs";
 import { LogSetting } from "../dependencies/log/type/datatype";
+import { BaseMessage } from "../dependencies/log/dependencies/msgutil/interface/export";
 const incoming = new IncomingMessageService()
-const syncrhonize = new MessageAuditorService()
-const acknowledge = new AcknowledgementService()
 const streamService = new StreamingService()
 
 /* --------------  TEST -------------------- */
@@ -29,7 +25,7 @@ let storage: LogSetting = {
         password: "h1nt1OyXw6QeUnzS",
         server: "cluster0.29sklte.mongodb.net",
         database: "log",
-      }
+    }
 }
 let dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> } = {
     storage: storage.storage,

+ 1 - 5
test/test2b.ts

@@ -1,12 +1,8 @@
 import { AcknowledgementService } from "../services/acknowledgement.service";
 import { StreamingService } from "./test-streamOBS";
-import { MessageAuditorService } from "../services/message-auditor.service";
-import { IncomingMessageService } from "../services/incomingMessage.service";
 import { map, Observable, take } from "rxjs";
-import { BaseMessage } from "../dependencies/msgutil/interface/export";
 import { LogSetting } from "../dependencies/log/type/datatype";
-const incoming = new IncomingMessageService()
-const syncrhonize = new MessageAuditorService()
+import { BaseMessage } from "../dependencies/log/dependencies/msgutil/interface/export";
 const acknowledge = new AcknowledgementService()
 const streamService = new StreamingService()
 

+ 1 - 1
test/test3a.ts

@@ -8,8 +8,8 @@ import { IncomingMessageService } from "../services/incomingMessage.service";
 import { MessageAuditorService } from "../services/message-auditor.service";
 import { ErrorTrigger, MessageSynchronisationServiceSetting } from "../type/datatype";
 import { StreamingService } from "./test-streamOBS";
-import { BaseMessage } from "../dependencies/msgutil/interface/export";
 import { LogSetting } from "../dependencies/log/type/datatype";
+import { BaseMessage } from "../dependencies/log/dependencies/msgutil/interface/export";
 
 /* Pre - Defined Data && Settings */
 const stream = new StreamingService()

+ 1 - 1
test/test3b.ts

@@ -9,8 +9,8 @@ import { IncomingMessageService } from "../services/incomingMessage.service";
 import { MessageAuditorService } from "../services/message-auditor.service";
 import { ErrorTrigger, MessageSynchronisationServiceSetting } from "../type/datatype";
 import { StreamingService } from "./test-streamOBS";
-import { BaseMessage } from "../dependencies/msgutil/interface/export";
 import { LogSetting, MessageLog } from "../dependencies/log/type/datatype";
+import { BaseMessage } from "../dependencies/log/dependencies/msgutil/interface/export";
 
 /* Pre - Defined Data && Settings */
 const stream = new StreamingService()

+ 7 - 7
test/test3c.ts

@@ -3,13 +3,13 @@
 Which is local file mongo as the control/source, and then comparing the data from cloud mongoDB
 server data, and then synchronizing them */
 
-import { Observable, map, Subject, takeUntil, take, of, timer, from } from "rxjs";
+import { Observable, Subject, take } from "rxjs";
 import { ErrorTrigger, MessageSynchronisationServiceSetting } from "../type/datatype";
 import { StreamingService } from "./test-streamOBS";
 import { MessageAuditorService } from "../services/message-auditor.service";
-import { BaseMessage, ResponseMessage } from "../dependencies/msgutil/interface/export";
-import { LoggingService } from "../dependencies/log/interface/export";
 import { LogSetting, MessageLog } from "../dependencies/log/type/datatype";
+import { BaseMessage, ResponseMessage } from "../dependencies/log/dependencies/msgutil/interface/export";
+import { LoggingService } from "../dependencies/log/interface/export";
 
 /* Pre - Defined Data && Settings */
 // This service will stream the messages from the local testRequest.json messages
@@ -131,7 +131,7 @@ sync.subscribe({
         console.log(`passing missing message: ${msgToBeSynchronized.appData.msgId} into target/secondary subject.`)
         // the missing data returned will be pushed (next(message)) into the target payload.
         let raw = msgToBeSynchronized.appData.msgPayload
-        let data : BaseMessage = JSON.parse(<string>raw)
+        let data: BaseMessage = JSON.parse(<string>raw)
         subscriber.next(data)
     }
 })
@@ -188,9 +188,9 @@ setTimeout(function () {
 function countdown() {
     let seconds = 0;
     const countUpInterval = setInterval(() => {
-      console.log(`Elapsed seconds: ${seconds}`);
-      seconds++;
+        console.log(`Elapsed seconds: ${seconds}`);
+        seconds++;
     }, 1000); // Update every second (1000 milliseconds)
-  }
+}
 
 countdown()

+ 1 - 3
test/test4.ts

@@ -6,12 +6,10 @@ different types of data aside from messageLogs. */
 /* Note: MessageAudit will not work if storage is FIle. Search does not work at logging service
 does not cater for File system storage */
 import * as mongoose from 'mongoose'
-import { Observable, map, Subject, takeUntil, take, of, timer, from } from "rxjs";
+import { Subject } from "rxjs";
 import { ErrorTrigger, MessageSynchronisationServiceSetting } from "../type/datatype";
-import { StreamingService } from "./test-streamOBS";
 import { MessageAuditorService } from "../services/message-auditor.service";
 import { LoggingService } from '../dependencies/log/interface/export';
-import { BaseMessage } from '../dependencies/msgutil/interface/export';
 import { LogSetting, MessageLog } from '../dependencies/log/type/datatype';
 import * as fs from "fs"
 

+ 1 - 1
test/test5.ts

@@ -9,7 +9,7 @@ import { LogSetting, MessageLog } from "../dependencies/log/type/datatype"
 import { _ } from 'lodash'
 import { StreamingService } from "./test-streamOBS"
 import { LoggingService } from "../dependencies/log/services/logging-service"
-import { ResponseMessage } from "../dependencies/msgutil/interface/export"
+import { ResponseMessage } from "../dependencies/log/dependencies/msgutil/interface/export"
 const auditService: MessageAuditorServiceInterface = new MessageAuditorService()
 const publisherloggingService: LoggingService = new LoggingService()
 const subscriberloggingService: LoggingService = new LoggingService()

+ 1 - 1
type/acknowledgement.interface.ts

@@ -1,6 +1,6 @@
 import { Observable } from "rxjs";
-import { BaseMessage, ResponseMessage } from "../dependencies/msgutil/interface/export";
 import { LogSetting } from "../dependencies/log/type/datatype";
+import { BaseMessage, ResponseMessage } from "../dependencies/log/dependencies/msgutil/interface/export";
 
 /**
  * @deprecated The acknowledgement will be covered by MessageAuditorService.

+ 3 - 2
type/datatype.ts

@@ -1,6 +1,6 @@
 import { Observable } from "rxjs";
-import { BaseMessage } from "../dependencies/msgutil/interface/export";
 import { LogSetting } from "../dependencies/log/type/datatype";
+import { BaseMessage } from "../dependencies/log/dependencies/msgutil/interface/export";
 
 type IncomingMessageSetting = LogSetting & {
     incomingObservable: Observable<BaseMessage>
@@ -16,7 +16,8 @@ export interface IncomingMessageServiceInterface {
 
 export interface MessageSynchronisationServiceSetting {
     incomingSource: LogSettingwTags,
-    target: LogSettingwTags
+    target: LogSettingwTags,
+    filters?: any
 }
 
 // Renew Structure To fix undefined issue at test3a.ts init()