Enzo 1 year ago
parent
commit
2a2809da62

+ 1 - 0
package.json

@@ -14,6 +14,7 @@
     "start3a": "node test/test3a.js",
     "start3a": "node test/test3a.js",
     "start3b": "node test/test3b.js",
     "start3b": "node test/test3b.js",
     "start3c": "node test/test3c.js",
     "start3c": "node test/test3c.js",
+    "start4": "node test/test4.js",
     "test": "echo \"Error: no test specified\" && exit 1"
     "test": "echo \"Error: no test specified\" && exit 1"
   },
   },
   "repository": {
   "repository": {

+ 3 - 4
services/acknowledgement.service.ts

@@ -1,10 +1,9 @@
 import { AnyObject } from "mongoose";
 import { AnyObject } from "mongoose";
 import { map, Observable, of, tap } from "rxjs";
 import { map, Observable, of, tap } from "rxjs";
-import { BaseMessage, Command, ResponseMessage, Uuid } from "../dependencies/msgutil/interface/export"
-import { FisCreateMessageUtility } from "../dependencies/msgutil/interface/export";
-import { LoggingService } from "../dependencies/log/services/logging-service";
-import { LogSetting, MessageLog } from "../dependencies/log/type/datatype";
 import { Acknowledgemeent, AcknowledgementLogSetting } from "../type/acknowledgement.interface";
 import { Acknowledgemeent, AcknowledgementLogSetting } from "../type/acknowledgement.interface";
+import { BaseMessage, Command, FisCreateMessageUtility, ResponseMessage, Uuid } from "../dependencies/fisappmessagejsutilty/interface/export";
+import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
+import { LoggingService } from "../dependencies/fisloggingservice/interface/export";
 
 
 
 
 /**
 /**

+ 3 - 3
services/incomingMessage.service.ts

@@ -1,8 +1,8 @@
 import { map, Observable, of, tap } from "rxjs";
 import { map, Observable, of, tap } from "rxjs";
-import { BaseMessage, Command, ResponseMessage, Uuid } from "../dependencies/msgutil/interface/export"
-import { LoggingService } from "../dependencies/log/services/logging-service";
-import { LogSetting, MessageLog } from "../dependencies/log/type/datatype";
 import { IncomingMessageServiceInterface } from "../type/datatype";
 import { IncomingMessageServiceInterface } from "../type/datatype";
+import { LoggingService } from "../dependencies/fisloggingservice/interface/export";
+import { BaseMessage, Uuid } from "../dependencies/fisappmessagejsutilty/interface/export";
+import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
 
 
 /**
 /**
  * @deprecated The logging is now supported by the Fis-Logging library.
  * @deprecated The logging is now supported by the Fis-Logging library.

+ 11 - 11
services/message-auditor.service.ts

@@ -1,8 +1,8 @@
 import { map, Observable, of, Subject } from "rxjs";
 import { map, Observable, of, Subject } from "rxjs";
-import { BaseMessage } from "../dependencies/msgutil/dependencies/dependencies";
-import { LoggingService } from "../dependencies/log/services/logging-service";
-import { LogSetting, MessageLog } from "../dependencies/log/type/datatype";
 import { ErrorTrigger, MessageAuditorServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype";
 import { ErrorTrigger, MessageAuditorServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype";
+import { LoggingService } from "../dependencies/fisloggingservice/interface/export";
+import { BaseMessage } from "../dependencies/fisappmessagejsutilty/interface/export";
+import { MessageLog } from "../dependencies/fisloggingservice/type/datatype";
 
 
 export class MessageAuditorService implements MessageAuditorServiceInterface {
 export class MessageAuditorService implements MessageAuditorServiceInterface {
     private settings: MessageSynchronisationServiceSetting
     private settings: MessageSynchronisationServiceSetting
@@ -17,7 +17,7 @@ export class MessageAuditorService implements MessageAuditorServiceInterface {
 
 
     /* This is the main interface of the message sync service. The argument will take in an observable stream of 
     /* This is the main interface of the message sync service. The argument will take in an observable stream of 
     error notifications, prompting it to perform the subscription of the targeted sources and it's corresponding 
     error notifications, prompting it to perform the subscription of the targeted sources and it's corresponding 
-    target. Essentially, this does not in synchronize, but rather it checks against the two sources and compare
+    target. Essentially, this does not synchronize, but rather it checks against the two sources and compare
     and return the missing data, which will then be passed into the targeted subject stream as specified by the
     and return the missing data, which will then be passed into the targeted subject stream as specified by the
     respective client. They can choose how they want to handle the missing messages returned. */
     respective client. They can choose how they want to handle the missing messages returned. */
     public subscribe(obsTrigger: Observable<ErrorTrigger>): Observable<BaseMessage> {
     public subscribe(obsTrigger: Observable<ErrorTrigger>): Observable<BaseMessage> {
@@ -33,14 +33,14 @@ export class MessageAuditorService implements MessageAuditorServiceInterface {
                 missingMsg.subscribe({
                 missingMsg.subscribe({
                     next: element => {
                     next: element => {
                         msg.next(element)
                         msg.next(element)
+                        console.log(`Synchronizing ${element.header.messageID} ....`)
                     }
                     }
                 })
                 })
+                this.targetSrc.init(this.settings.target).then(() => {
+                    this.targetSrc.subscribe(msg)
+                })
             }
             }
         })
         })
-        // Not sure why this piece of code is here. It generally doesn't affect the function
-        // if (!obsTrigger) {
-        //     this.dataConversion()
-        // }
         let result: Observable<BaseMessage> = msg.asObservable()
         let result: Observable<BaseMessage> = msg.asObservable()
         return result
         return result
     }
     }
@@ -50,7 +50,7 @@ export class MessageAuditorService implements MessageAuditorServiceInterface {
         // let subjectOutput = this.syncrhonize()
         // let subjectOutput = this.syncrhonize()
         let obsOutput: Observable<BaseMessage> = this.synchronize().pipe(
         let obsOutput: Observable<BaseMessage> = this.synchronize().pipe(
             map((msg: MessageLog) => {
             map((msg: MessageLog) => {
-                console.log(`Converting this ${msg.appData.msgId}`)
+                console.log(`PROCESSING this ${msg.appData.msgId}`)
                 return JSON.parse(<string>msg.appData.msgPayload)
                 return JSON.parse(<string>msg.appData.msgPayload)
             })
             })
         )
         )
@@ -68,8 +68,8 @@ export class MessageAuditorService implements MessageAuditorServiceInterface {
             if (data.arr1.length === data.arr2.length) {
             if (data.arr1.length === data.arr2.length) {
                 console.log(`No synchronization needed`)
                 console.log(`No synchronization needed`)
             } else {
             } else {
-                // In the case where there are differences in the array lengthh, then extensive comparison
-                // will be carried to filter out the differences. Differences are the missing data.
+                // In the case where there are differences in the array length, then extensive comparison
+                // will be carried out to filter out the differences. Differences are the missing data.
                 this.checkArrayDifferences(data).then((data: MessageLog[]) => {
                 this.checkArrayDifferences(data).then((data: MessageLog[]) => {
                     data.forEach(msgElement => {
                     data.forEach(msgElement => {
                         // Once the missing data has been weeded out, it is then passed into the Subject 
                         // Once the missing data has been weeded out, it is then passed into the Subject 

+ 1 - 1
test/test-streamOBS.ts

@@ -3,7 +3,7 @@
 
 
 import { from, map, Observable, of, Subject } from "rxjs";
 import { from, map, Observable, of, Subject } from "rxjs";
 import * as fs from "fs"
 import * as fs from "fs"
-import { BaseMessage } from "../dependencies/log/services/logging-service";
+import { BaseMessage } from "../dependencies/fisappmessagejsutilty/interface/export";
 
 
 export class StreamingService {
 export class StreamingService {
     private messagesJSON: any = fs.readFileSync("testRequest.json")
     private messagesJSON: any = fs.readFileSync("testRequest.json")

+ 2 - 2
test/test1a.ts

@@ -3,8 +3,8 @@ import { StreamingService } from "./test-streamOBS";
 import { MessageAuditorService } from "../services/message-auditor.service";
 import { MessageAuditorService } from "../services/message-auditor.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { map, Observable } from "rxjs";
 import { map, Observable } from "rxjs";
-import { LogSetting, MessageLog } from "../dependencies/log/type/datatype";
-import { BaseMessage } from "../dependencies/msgutil/dependencies/dependencies";
+import { BaseMessage } from "../dependencies/fisappmessagejsutilty/interface/export";
+import { LogSetting } from "../dependencies/fisloggingservice/type/datatype";
 const incoming = new IncomingMessageService()
 const incoming = new IncomingMessageService()
 const acknowledge = new AcknowledgementService()
 const acknowledge = new AcknowledgementService()
 const syncrhonize = new MessageAuditorService()
 const syncrhonize = new MessageAuditorService()

+ 3 - 2
test/test1b.ts

@@ -3,8 +3,9 @@ import { StreamingService } from "./test-streamOBS";
 import { MessageAuditorService } from "../services/message-auditor.service";
 import { MessageAuditorService } from "../services/message-auditor.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { map, Observable } from "rxjs";
 import { map, Observable } from "rxjs";
-import { LogSetting, MessageLog } from "../dependencies/log/type/datatype";
-import { BaseMessage } from "../dependencies/msgutil/dependencies/dependencies";
+import { BaseMessage } from "../dependencies/fisappmessagejsutilty/interface/export";
+import { LogSetting } from "../dependencies/fisloggingservice/type/datatype";
+
 const incoming = new IncomingMessageService()
 const incoming = new IncomingMessageService()
 const acknowledge = new AcknowledgementService()
 const acknowledge = new AcknowledgementService()
 const syncrhonize = new MessageAuditorService()
 const syncrhonize = new MessageAuditorService()

+ 2 - 2
test/test2a.ts

@@ -3,8 +3,8 @@ import { StreamingService } from "./test-streamOBS";
 import { MessageAuditorService } from "../services/message-auditor.service";
 import { MessageAuditorService } from "../services/message-auditor.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { map, Observable, take } from "rxjs";
 import { map, Observable, take } from "rxjs";
-import { LogSetting, MessageLog } from "../dependencies/log/type/datatype";
-import { BaseMessage } from "../dependencies/msgutil/dependencies/dependencies";
+import { BaseMessage } from "../dependencies/fisappmessagejsutilty/interface/export";
+import { LogSetting } from "../dependencies/fisloggingservice/type/datatype";
 const incoming = new IncomingMessageService()
 const incoming = new IncomingMessageService()
 const syncrhonize = new MessageAuditorService()
 const syncrhonize = new MessageAuditorService()
 const acknowledge = new AcknowledgementService()
 const acknowledge = new AcknowledgementService()

+ 2 - 2
test/test2b.ts

@@ -3,8 +3,8 @@ import { StreamingService } from "./test-streamOBS";
 import { MessageAuditorService } from "../services/message-auditor.service";
 import { MessageAuditorService } from "../services/message-auditor.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { map, Observable, take } from "rxjs";
 import { map, Observable, take } from "rxjs";
-import { LogSetting, MessageLog } from "../dependencies/log/type/datatype";
-import { BaseMessage } from "../dependencies/msgutil/dependencies/dependencies";
+import { BaseMessage } from "../dependencies/fisappmessagejsutilty/interface/export";
+import { LogSetting } from "../dependencies/fisloggingservice/type/datatype";
 const incoming = new IncomingMessageService()
 const incoming = new IncomingMessageService()
 const syncrhonize = new MessageAuditorService()
 const syncrhonize = new MessageAuditorService()
 const acknowledge = new AcknowledgementService()
 const acknowledge = new AcknowledgementService()

+ 2 - 2
test/test3a.ts

@@ -4,12 +4,12 @@ Which is local file storage as the control/source, and then comparing the data f
 server data, and then synchronizing them */
 server data, and then synchronizing them */
 
 
 import { Observable, map, Subject, takeUntil, take, of, timer, from } from "rxjs";
 import { Observable, map, Subject, takeUntil, take, of, timer, from } from "rxjs";
-import { BaseMessage } from "../dependencies/msgutil/dependencies/dependencies";
-import { LogSetting, MessageLog } from "../dependencies/log/type/datatype";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { MessageAuditorService } from "../services/message-auditor.service";
 import { MessageAuditorService } from "../services/message-auditor.service";
 import { ErrorTrigger, MessageSynchronisationServiceSetting } from "../type/datatype";
 import { ErrorTrigger, MessageSynchronisationServiceSetting } from "../type/datatype";
 import { StreamingService } from "./test-streamOBS";
 import { StreamingService } from "./test-streamOBS";
+import { BaseMessage } from "../dependencies/fisappmessagejsutilty/interface/export";
+import { LogSetting } from "../dependencies/fisloggingservice/type/datatype";
 
 
 /* Pre - Defined Data && Settings */
 /* Pre - Defined Data && Settings */
 const stream = new StreamingService()
 const stream = new StreamingService()

+ 2 - 2
test/test3b.ts

@@ -4,13 +4,13 @@ Which is cloud mongo storage as the control/source, and then comparing the data
 server data, and then synchronizing them */
 server data, and then synchronizing them */
 
 
 import { Observable, map, Subject, takeUntil, take, of, timer, from } from "rxjs";
 import { Observable, map, Subject, takeUntil, take, of, timer, from } from "rxjs";
-import { BaseMessage } from "../dependencies/msgutil/dependencies/dependencies";
-import { LogSetting, MessageLog } from "../dependencies/log/type/datatype";
 import { AcknowledgementService } from "../services/acknowledgement.service";
 import { AcknowledgementService } from "../services/acknowledgement.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { MessageAuditorService } from "../services/message-auditor.service";
 import { MessageAuditorService } from "../services/message-auditor.service";
 import { ErrorTrigger, MessageSynchronisationServiceSetting } from "../type/datatype";
 import { ErrorTrigger, MessageSynchronisationServiceSetting } from "../type/datatype";
 import { StreamingService } from "./test-streamOBS";
 import { StreamingService } from "./test-streamOBS";
+import { BaseMessage } from "../dependencies/fisappmessagejsutilty/interface/export";
+import { LogSetting } from "../dependencies/fisloggingservice/type/datatype";
 
 
 /* Pre - Defined Data && Settings */
 /* Pre - Defined Data && Settings */
 const stream = new StreamingService()
 const stream = new StreamingService()

+ 31 - 13
test/test3c.ts

@@ -4,12 +4,12 @@ Which is local file mongo as the control/source, and then comparing the data fro
 server data, and then synchronizing them */
 server data, and then synchronizing them */
 
 
 import { Observable, map, Subject, takeUntil, take, of, timer, from } from "rxjs";
 import { Observable, map, Subject, takeUntil, take, of, timer, from } from "rxjs";
-import { BaseMessage, ResponseMessage } from "../dependencies/msgutil/dependencies/dependencies";
-import { LogSetting, MessageLog } from "../dependencies/log/type/datatype";
 import { ErrorTrigger, MessageSynchronisationServiceSetting } from "../type/datatype";
 import { ErrorTrigger, MessageSynchronisationServiceSetting } from "../type/datatype";
 import { StreamingService } from "./test-streamOBS";
 import { StreamingService } from "./test-streamOBS";
 import { MessageAuditorService } from "../services/message-auditor.service";
 import { MessageAuditorService } from "../services/message-auditor.service";
-import { LoggingService } from "../dependencies/log/services/logging-service";
+import { LoggingService } from "../dependencies/fisloggingservice/interface/export";
+import { BaseMessage, ResponseMessage } from "../dependencies/fisappmessagejsutilty/interface/export";
+import { LogSetting } from "../dependencies/fisloggingservice/type/datatype";
 
 
 /* Pre - Defined Data && Settings */
 /* Pre - Defined Data && Settings */
 // This service will stream the messages from the local testRequest.json messages
 // This service will stream the messages from the local testRequest.json messages
@@ -24,6 +24,9 @@ const publisher_sync = new MessageAuditorService()
 const publisher_Log = new LoggingService()
 const publisher_Log = new LoggingService()
 const publisher_take_four_messages: Observable<BaseMessage> = stream.stream().pipe(take(4))
 const publisher_take_four_messages: Observable<BaseMessage> = stream.stream().pipe(take(4))
 const publisher: Subject<BaseMessage> = new Subject()
 const publisher: Subject<BaseMessage> = new Subject()
+publisher.subscribe((e) => {
+    console.log(`Primary Received ${e.header.messageID}`)
+})
 publisher_take_four_messages.subscribe({
 publisher_take_four_messages.subscribe({
     next: (data) => {
     next: (data) => {
         publisher.next(data)
         publisher.next(data)
@@ -36,6 +39,9 @@ on. But generall the declarative structure is the same as the above. */
 const subscriber_log = new LoggingService()
 const subscriber_log = new LoggingService()
 const subscriber_take_two_messagse: Observable<BaseMessage> = stream.stream().pipe(take(2))
 const subscriber_take_two_messagse: Observable<BaseMessage> = stream.stream().pipe(take(2))
 const subscriber: Subject<BaseMessage> = new Subject()
 const subscriber: Subject<BaseMessage> = new Subject()
+subscriber.subscribe((e) => {
+    console.log(`Secondary Received ${e.header.messageID}`)
+})
 subscriber_take_two_messagse.subscribe({
 subscriber_take_two_messagse.subscribe({
     next: (data) => {
     next: (data) => {
         subscriber.next(<ResponseMessage>data)
         subscriber.next(<ResponseMessage>data)
@@ -59,7 +65,7 @@ let publisher_storage: LogSetting = {
     },
     },
     customSetting: {
     customSetting: {
         server: "192.168.100.59:27017",
         server: "192.168.100.59:27017",
-        database: "test"
+        database: "primary"
     }
     }
 }
 }
 
 
@@ -80,7 +86,7 @@ let subscriber_storage: LogSetting = {
         user: "testDB",
         user: "testDB",
         password: "h1nt1OyXw6QeUnzS",
         password: "h1nt1OyXw6QeUnzS",
         server: "cluster0.29sklte.mongodb.net",
         server: "cluster0.29sklte.mongodb.net",
-        database: "log",
+        database: "secondary",
     }
     }
 }
 }
 
 
@@ -103,15 +109,15 @@ let settings: MessageSynchronisationServiceSetting = {
 // messages into the designated database as specified earlier.
 // messages into the designated database as specified earlier.
 function initializeData() { // To store the data into the designated databases.
 function initializeData() { // To store the data into the designated databases.
     publisher_Log.init(publisher_storage).then(() => {
     publisher_Log.init(publisher_storage).then(() => {
-        publisher_Log.subscribe(publisher)
+        publisher_Log.subscribe(publisher) // Logging only occurs here
     })
     })
     subscriber_log.init(subscriber_storage).then(() => {
     subscriber_log.init(subscriber_storage).then(() => {
-        subscriber_log.subscribe(subscriber)
+        subscriber_log.subscribe(subscriber) // Logging only occurs here
     })
     })
 }
 }
 
 
 // Done by appoximately 5-8 Seconds
 // Done by appoximately 5-8 Seconds
-initializeData() // Call the function to store the data into the designated databases.
+// initializeData() // Call the function to store the data into the designated databases.
 publisher_sync.init(settings)
 publisher_sync.init(settings)
 
 
 /* This is where the synchronization logic is called. The errorSubject will act as a trigger
 /* This is where the synchronization logic is called. The errorSubject will act as a trigger
@@ -120,13 +126,15 @@ let errorSubject: Subject<ErrorTrigger> = new Subject()
 // Subscribe to errorSubject notification 
 // Subscribe to errorSubject notification 
 let sync = publisher_sync.subscribe(errorSubject)
 let sync = publisher_sync.subscribe(errorSubject)
 sync.subscribe({
 sync.subscribe({
-    next: (msgToBeSynched) => {
-        console.log(`synching ... ${msgToBeSynched.header.messageID}`)
+    next: (msgToBeSynchronized) => {
+        console.log(`passing missing message: ${msgToBeSynchronized.header.messageID} into target/secondary subject.`)
         // the missing data returned will be pushed (next(message)) into the target payload.
         // the missing data returned will be pushed (next(message)) into the target payload.
-        subscriber.next(msgToBeSynched)
+        subscriber.next(msgToBeSynchronized)
     }
     }
 })
 })
 
 
+
+
 // Set time oout for 5 seconds to allow the initial logging stage to complete it's logging
 // Set time oout for 5 seconds to allow the initial logging stage to complete it's logging
 // implementation first before proceedint to trigger the sync
 // implementation first before proceedint to trigger the sync
 setTimeout(() => {
 setTimeout(() => {
@@ -137,7 +145,7 @@ setTimeout(() => {
         message: "NO. I dont want to work"
         message: "NO. I dont want to work"
     }
     }
     errorSubject.next(sampleError)
     errorSubject.next(sampleError)
-}, 5000)
+}, 10000)
 
 
 /* THis is testing for generating error message to be fed into the error subject
 /* THis is testing for generating error message to be fed into the error subject
 to act as additional trigger to exectute the synchronization when there's no internet
 to act as additional trigger to exectute the synchronization when there's no internet
@@ -172,4 +180,14 @@ const duration = 60000; // 1 minute
 setTimeout(function () {
 setTimeout(function () {
     clearInterval(interval);
     clearInterval(interval);
     console.log('Internet connectivity monitoring stopped');
     console.log('Internet connectivity monitoring stopped');
-}, duration);
+}, duration);
+
+function countdown() {
+    let seconds = 0;
+    const countUpInterval = setInterval(() => {
+      console.log(`Elapsed seconds: ${seconds}`);
+      seconds++;
+    }, 1000); // Update every second (1000 milliseconds)
+  }
+
+countdown()

+ 2 - 3
type/acknowledgement.interface.ts

@@ -1,7 +1,6 @@
 import { Observable } from "rxjs";
 import { Observable } from "rxjs";
-import { ResponseMessage } from "../dependencies/msgutil/dependencies/dependencies";
-import { BaseMessage } from "../dependencies/log/services/logging-service";
-import { LogSetting } from "../dependencies/log/type/datatype";
+import { LogSetting } from "../dependencies/fisloggingservice/type/datatype";
+import { BaseMessage, ResponseMessage } from "../dependencies/fisappmessagejsutilty/interface/export";
 
 
 /**
 /**
  * @deprecated The acknowledgement will be covered by MessageAuditorService.
  * @deprecated The acknowledgement will be covered by MessageAuditorService.

+ 2 - 2
type/datatype.ts

@@ -1,6 +1,6 @@
 import { Observable } from "rxjs";
 import { Observable } from "rxjs";
-import { BaseMessage } from "../dependencies/msgutil/dependencies/dependencies";
-import { LogSetting, MessageLog } from "../dependencies/log/type/datatype";
+import { LogSetting } from "../dependencies/fisloggingservice/type/datatype";
+import { BaseMessage } from "../dependencies/fisappmessagejsutilty/interface/export";
 
 
 type IncomingMessageSetting = LogSetting & {
 type IncomingMessageSetting = LogSetting & {
     incomingObservable: Observable<BaseMessage>
     incomingObservable: Observable<BaseMessage>