Bläddra i källkod

Update and fix message syn and comprehensive comments

Enzo 1 år sedan
förälder
incheckning
2aa2acd5dd
8 ändrade filer med 95 tillägg och 47 borttagningar
  1. 9 7
      services/incomingMessage.service.ts
  2. 10 6
      services/synchronization.service.ts
  3. 2 1
      test/test2a.ts
  4. 2 1
      test/test2b.ts
  5. 9 3
      test/test3a.ts
  6. 9 4
      test/test3b.ts
  7. 48 24
      test/test3c.ts
  8. 6 1
      type/datatype.ts

+ 9 - 7
services/incomingMessage.service.ts

@@ -3,11 +3,10 @@ import { BaseMessage, Command, ResponseMessage, Uuid } from "../dependencies/fis
 import { LoggingService } from "../dependencies/fisloggingservice/services/logging-service";
 import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
 import { IncomingMessageServiceInterface } from "../type/datatype";
-import { AcknowledgementService } from "./acknowledgement.service";
 
 export class IncomingMessageService implements IncomingMessageServiceInterface {
 
-    constructor(private logService?: LoggingService, private acknowledgeService?: AcknowledgementService) {
+    constructor(private logService?: LoggingService) {
         this.logService = new LoggingService()
     }
 
@@ -25,19 +24,23 @@ export class IncomingMessageService implements IncomingMessageServiceInterface {
         incomingObservable: null
     }
 
+
+    /* This function is mainful used for setting the log setting as well as transforming the messages
+    into log messages and then storing them in the desiganted location of storage as specified. */
     public init(settings: LogSetting & { incomingObservable: Observable<BaseMessage>; }): void {
 
+        // Restructuring the settings. I think they were some trouble doing this last time
         let newSetting: any = settings
         newSetting.setting = {
             ...this.settings.setting,
             ...settings.setting,
-            customSetting:{ 
+            customSetting: {
                 ...this.settings.customSetting,
                 ...settings.customSetting,
             }
         }
 
-        this.settings = newSetting
+        this.settings = newSetting // Become stateful???
 
         // Transform incoming observables into Observable<MessageLog> to be logged
         let transformedOBS: Observable<MessageLog> = settings.incomingObservable.pipe(
@@ -53,14 +56,13 @@ export class IncomingMessageService implements IncomingMessageServiceInterface {
                     }
                 }
                 return finalResponse
-            }
-            )
+            })
         )
 
+        // Once the messages has been transformed, then the logging can be executed
         this.logService.init(this.settings).then(() => {
             this.logService.subscribe(transformedOBS)
         }).catch((e) => console.error(e))
 
-
     }
 }

+ 10 - 6
services/synchronization.service.ts

@@ -3,7 +3,7 @@ import { map, Observable, of, Subject } from "rxjs";
 import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
 import { LoggingService } from "../dependencies/fisloggingservice/services/logging-service";
 import { MessageLog } from "../dependencies/fisloggingservice/type/datatype";
-import { MessageSynchronisationServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype";
+import { MessageSynchronisationServiceInterface, MessageSynchronisationServiceSetting, ErrorTrigger } from "../type/datatype";
 
 export class MessageSyncrhonizationService implements MessageSynchronisationServiceInterface {
 
@@ -15,9 +15,13 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
         this.settings = settings;
     }
 
-    // Incoming obstriger serves as a trigger point to perform another synchronization
-    public subscribe(obsTrigger: Observable<string>): Observable<BaseMessage> {
+    /* This functions will subsribe to the designated error triggers. The error will trigger the need to
+    sync, should the user or circumstances necessitates it. */
+    public subscribe(obsTrigger: Observable<ErrorTrigger>): Observable<BaseMessage> {
+        // Create a subject as a means to return the missing messages if there's any.
         let msg: Subject<BaseMessage> = new Subject()
+
+        // Subsribe to the errorTrigger obs to listen to any notification.
         obsTrigger.subscribe({
             next: obsTrigger => {
                 let missingMsg = this.dataConversion()
@@ -37,9 +41,9 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
 
     private dataConversion(): Observable<BaseMessage> {
         // let subjectOutput = this.syncrhonize()
-        let obsOutput: Observable<BaseMessage> = this.syncrhonize().pipe(
+        let obsOutput: Observable<BaseMessage> = this.synchronize().pipe(
             map((msg: MessageLog) => {
-                // console.log(`Converting this ${msg.appData.msgId}`)
+                console.log(`Converting this ${msg.appData.msgId}`)
                 return JSON.parse(<string>msg.appData.msgPayload)
             })
         )
@@ -48,7 +52,7 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
 
 
     // Returns all the missing data to be synchronized in the observables later
-    private syncrhonize(): Subject<any> {
+    private synchronize(): Subject<any> {
         let subjectOutput = new Subject()
         this.acquireData().then((data: { arr1: MessageLog[], arr2: MessageLog[] }) => {
             if (data.arr1.length === data.arr2.length) {

+ 2 - 1
test/test2a.ts

@@ -17,6 +17,7 @@ const payload: Observable<BaseMessage> = streamService.stream().pipe(take(3));
 // Configure Log Setting
 let storage: LogSetting = {
     storage: "MongoDB",
+    cacheMessageLimit: 0,
     setting: {
         appName: 'Default from client',
         appLocName: 'To be generated in client',
@@ -27,7 +28,7 @@ let storage: LogSetting = {
         user: "testDB",
         password: "h1nt1OyXw6QeUnzS",
         server: "cluster0.29sklte.mongodb.net",
-        collection: "log",
+        database: "log",
       }
 }
 let dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> } = {

+ 2 - 1
test/test2b.ts

@@ -17,6 +17,7 @@ const payload: Observable<BaseMessage> = streamService.stream().pipe(take(3));
 // Configure Log Setting
 let storage: LogSetting = {
     storage: "MongoDB",
+    cacheMessageLimit: 0,
     setting: {
         appName: 'Default from client',
         appLocName: 'To be generated in client',
@@ -27,7 +28,7 @@ let storage: LogSetting = {
         user: "testDB",
         password: "h1nt1OyXw6QeUnzS",
         server: "cluster0.29sklte.mongodb.net",
-        collection: "log",
+        database: "log",
       }
 }
 let dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> } = {

+ 9 - 3
test/test3a.ts

@@ -8,7 +8,7 @@ import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/
 import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { MessageSyncrhonizationService } from "../services/synchronization.service";
-import { MessageSynchronisationServiceSetting } from "../type/datatype";
+import { ErrorTrigger, MessageSynchronisationServiceSetting } from "../type/datatype";
 import { StreamingService } from "./test-streamOBS";
 
 /* Pre - Defined Data && Settings */
@@ -64,6 +64,7 @@ let source_dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> }
 //Declare Target Storage
 let target_storage: LogSetting = {
     storage: "MongoDB",
+    cacheMessageLimit: 0,
     setting: {
         appName: 'Default from client',
         appLocName: 'To be generated in client',
@@ -74,7 +75,7 @@ let target_storage: LogSetting = {
         user: "testDB",
         password: "h1nt1OyXw6QeUnzS",
         server: "cluster0.29sklte.mongodb.net",
-        collection: "log",
+        database: "log",
     }
 }
 
@@ -111,7 +112,12 @@ source_synchronize.init(settings)
 // by 10th second 
 setTimeout(() => {
 
-    let triggerSync = from(['Newsynch'])
+    let sampleError: ErrorTrigger = {
+        status: 1,
+        message: "NO. I dont want to work"
+    }
+
+    let triggerSync = from([sampleError])
 
     let sync = source_synchronize.subscribe(triggerSync)
     sync.subscribe({

+ 9 - 4
test/test3b.ts

@@ -9,7 +9,7 @@ import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/d
 import { AcknowledgementService } from "../services/acknowledgement.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { MessageSyncrhonizationService } from "../services/synchronization.service";
-import { MessageSynchronisationServiceSetting } from "../type/datatype";
+import { ErrorTrigger, MessageSynchronisationServiceSetting } from "../type/datatype";
 import { StreamingService } from "./test-streamOBS";
 
 /* Pre - Defined Data && Settings */
@@ -53,7 +53,7 @@ source_payload_subject.subscribe({
 
 // Declare Source Storage
 let source_storage: LogSetting = {
-
+    cacheMessageLimit: 0,
     storage: "MongoDB",
     setting: {
         appName: 'Default from client',
@@ -65,7 +65,7 @@ let source_storage: LogSetting = {
         user: "testDB",
         password: "h1nt1OyXw6QeUnzS",
         server: "cluster0.29sklte.mongodb.net",
-        collection: "log",
+        database: "log",
     }
 }
 
@@ -119,7 +119,12 @@ source_synchronize.init(settings)
 // by 10th second 
 setTimeout(() => {
 
-    let triggerSync = from(['Newsynch'])
+    let sampleError: ErrorTrigger = {
+        status: 1,
+        message: "NO. I dont want to work"
+    }
+
+    let triggerSync = from([sampleError])
 
     let sync = source_synchronize.subscribe(triggerSync)
     sync.subscribe({

+ 48 - 24
test/test3c.ts

@@ -6,30 +6,36 @@ server data, and then synchronizing them */
 import { Observable, map, Subject, takeUntil, take, of, timer, from } from "rxjs";
 import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
 import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
-import { AcknowledgementService } from "../services/acknowledgement.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { MessageSyncrhonizationService } from "../services/synchronization.service";
-import { MessageSynchronisationServiceSetting } from "../type/datatype";
+import { ErrorTrigger, MessageSynchronisationServiceSetting } from "../type/datatype";
 import { StreamingService } from "./test-streamOBS";
 
 /* Pre - Defined Data && Settings */
+// This service will stream the messages from the local testRequest.json messages
+// into the designated location that will be specified later.
 const stream = new StreamingService()
 
-// Declare source Services && Observables (Using File Storage) Simulating Full Logs
+/* Using the instance of the streaming declared earlier, we feed 4 messages into the
+subscribers that are going to subsscribe to this source_payload. Please note that 
+source_payload will emite the messages stream from the instance of stream service
+and further feed them into the other Subject which is called source_payload_subject. */
 const source_synchronize = new MessageSyncrhonizationService()
-const source_payload: Observable<BaseMessage> = stream.stream().pipe(take(4))
 const source_incoming = new IncomingMessageService()
+const source_payload: Observable<BaseMessage> = stream.stream().pipe(take(4))
 const source_payload_subject: Subject<BaseMessage> = new Subject()
 source_payload.subscribe({
     next: (data) => {
         source_payload_subject.next(data)
-        // console.log(data)
     }
 })
-// Declare target Services && Observables (Using MongoDB Storage) Simulating Partial Logs
+
+/* Same thing as the above. The only difference is the we feed only 2 messages 
+to simulate streaming error. We want to see if it will sync the other 2 later 
+on. But generall the declarative structure is the same as the above. */
+const target_incoming = new IncomingMessageService()
 const target_payload: Observable<BaseMessage> = stream.stream().pipe(take(2))
 const target_payload_subject: Subject<BaseMessage> = new Subject()
-const target_incoming = new IncomingMessageService()
 target_payload.subscribe({
     next: (data) => {
         target_payload_subject.next(<BaseMessage>data)
@@ -38,15 +44,13 @@ target_payload.subscribe({
     complete: () => { `Target Payload Completed` }
 })
 
-// testing to see if data is sent in
-target_payload_subject.subscribe({
-    next: element => {
-        console.log(`target_payload_subject emits :00 ${element.header.messageID}`)
-    }
-})
 
-// Declare Source Storage
+/* Declare the designated database. I am using windev's mongo storage to store the data.
+Hence here, is the block that definte the target and it's associated specifications.
+This will be the target and will receive the predefined set of data to be logged as 
+prepared earlier in the code above.s */
 let source_storage: LogSetting = {
+    cacheMessageLimit: 0,
     storage: "MongoDB",
     setting: {
         appName: 'Default from client',
@@ -54,11 +58,12 @@ let source_storage: LogSetting = {
         logLocName: 'To be generated in client',
     },
     customSetting: {
-        server: "localhost:27017",
-        collection: "log"
+        server: "192.168.100.59:27017",
+        database: "test"
     }
 }
 
+// I forgot what this is for lol...
 let source_dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> } = {
     storage: source_storage.storage,
     setting: source_storage.setting,
@@ -66,8 +71,12 @@ let source_dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> }
     incomingObservable: source_payload_subject
 }
 
-//Declare Target Storage
+/* Same as above. Also declaring another designated database. But this one will be used
+as the target for synching. For such I purposely push only half the of the completed 
+dataset in order to test out the sync later. I am using my own cloud atlas mongo
+database on this. The address can always be changed. */
 let target_storage: LogSetting = {
+    cacheMessageLimit: 0,
     storage: "MongoDB",
     setting: {
         appName: 'Default from client',
@@ -79,7 +88,7 @@ let target_storage: LogSetting = {
         user: "testDB",
         password: "h1nt1OyXw6QeUnzS",
         server: "cluster0.29sklte.mongodb.net",
-        collection: "log",
+        database: "log",
     }
 }
 
@@ -104,27 +113,42 @@ let settings: MessageSynchronisationServiceSetting = {
 }
 
 /* -------- SYNCHRONIZATION --------- */
-function initializeData() {
+// This is where the real test begin. THe process before this were merely just configuring 
+// the settings of where to sync. Here the initial intialize data will first log the 
+// messages into the designated database as specified earlier.
+function initializeData() { // To store the data into the designated databases.
     source_incoming.init(source_dataSet)
     target_incoming.init(target_dataSet)
 }
 
 // Done by appoximately 5-8 Seconds
-initializeData()
+initializeData() // Call the function to store the data into the designated databases.
 source_synchronize.init(settings)
 
-// by 10th second 
+/* This is where the synchronizatin happens. Upon initializin ghe target earlier, we
+will put in a trigger to execute the synching. It will check for missing datas and
+publish the missing messages accordingly. For this test purpose, we purposely set to
+7 seconds to allow the logging process earlier to take place first before engaging
+in synching activity to prevent confusion and interference of the pre-logging stage. */
 setTimeout(() => {
 
-    let triggerSync = from(['Newsynch'])
+    // This wil act as the trigger error. Although the definition of this error is 
+    // still subject for enhancements in the near future.
+    let sampleError: ErrorTrigger = {
+        status: 1,
+        message: "NO. I dont want to work"
+    }
+
+    let triggerSync: Observable<ErrorTrigger> = from([sampleError])
 
     let sync = source_synchronize.subscribe(triggerSync)
     sync.subscribe({
         next: (msgToBeSynched) => {
-            // console.log(`synching ... ${msgToBeSynched.header.messageID}`)
+            console.log(`synching ... ${msgToBeSynched.header.messageID}`)
             target_payload_subject.next(msgToBeSynched)
         }
     })
 
-}, 7000)
+}, 5000)
+
 

+ 6 - 1
type/datatype.ts

@@ -29,5 +29,10 @@ export interface MessageSynchronisationServiceInterface {
     // Set default setting
     init(settings: MessageSynchronisationServiceSetting): void;
     // Subscribe to trigger
-    subscribe(obs: Observable<string>): Observable<any>;
+    subscribe(obs: Observable<ErrorTrigger>): Observable<any>;
+}
+
+export interface ErrorTrigger {
+    status: 0 | 1,
+    message: any | string
 }