Browse Source

synchronize fix

Enzo 1 year ago
parent
commit
245d2bbf50

+ 1 - 1
dependencies/fisloggingservice

@@ -1 +1 @@
-Subproject commit 3779a2dd53d7acf06cb67ced30a2f565d589b835
+Subproject commit d6cf2798d6b5d01e921e15b3c312452ecf66dfb1

+ 2 - 0
package.json

@@ -12,6 +12,8 @@
     "start2a": "node test/test2a.js",
     "start2b": "node test/test2b.js",
     "start3a": "node test/test3a.js",
+    "start3b": "node test/test3b.js",
+    "start3c": "node test/test3c.js",
     "test": "echo \"Error: no test specified\" && exit 1"
   },
   "repository": {

+ 1 - 4
services/acknowledgement.service.ts

@@ -23,13 +23,11 @@ export class AcknowledgementService implements Acknowledgemeent {
         }
     }
 
-    constructor(private logService1?: LoggingService, private logService?: LoggingService) {
-        // this.logService1 = new LoggingService()
+    constructor(private logService?: LoggingService) {
         this.logService = new LoggingService()
     }
 
     public async init(settings: AcknowledgementLogSetting) {
-
         let logSetting: LogSetting = {
             ...settings,
             setting: {
@@ -38,7 +36,6 @@ export class AcknowledgementService implements Acknowledgemeent {
                 appLocName: "appLocName2"
             }
         }
-
         this.settings = logSetting
         this.logService.init(logSetting)
     }

+ 18 - 18
services/synchronization.service.ts

@@ -11,15 +11,15 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
     private sourceSrc: LoggingService = new LoggingService()
     private targetSrc: LoggingService = new LoggingService()
 
-    public  init(settings: MessageSynchronisationServiceSetting): void {
-        this.settings = settings; 
+    public init(settings: MessageSynchronisationServiceSetting): void {
+        this.settings = settings;
     }
 
     // Incoming obstriger serves as a trigger point to perform another synchronization
     public subscribe(obsTrigger: Observable<string>): Observable<BaseMessage> {
         let msg: Subject<BaseMessage> = new Subject()
         obsTrigger.subscribe({
-            next: obs => {
+            next: obsTrigger => {
                 let missingMsg = this.dataConversion()
                 missingMsg.subscribe({
                     next: element => {
@@ -28,7 +28,6 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
                 })
             }
         })
-        // trigger by timer
         if (!obsTrigger) {
             this.dataConversion()
         }
@@ -48,6 +47,7 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
     }
 
 
+    // Returns all the missing data to be synchronized in the observables later
     private syncrhonize(): Subject<any> {
         let subjectOutput = new Subject()
         this.acquireData().then((data: { arr1: MessageLog[], arr2: MessageLog[] }) => {
@@ -64,7 +64,7 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
         return subjectOutput
     }
 
-    // Acquires the available data from designated storage
+    // Acquires the available data from designated target and source storage
     private async acquireData(): Promise<any> {
         const promiseQuery: Promise<any> = new Promise((resolve, reject) => {
             let allSets: {
@@ -77,25 +77,25 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
             let set1
             let set2
 
-            
-        this.sourceSrc.init(this.settings.incomingSource).then(()=>{
-            this.targetSrc.init(this.settings.target).then(()=>{
-                this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data: MessageLog[]) => {
-                    set1 = data
-                }).then(() => {
-                    this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data: MessageLog[]) => {
-                        set2 = data
-                        allSets.arr1 = set1
-                        allSets.arr2 = set2
-                        resolve(allSets)
+            this.sourceSrc.init(this.settings.incomingSource).then(() => {
+                this.targetSrc.init(this.settings.target).then(() => {
+                    this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data: MessageLog[]) => {
+                        set1 = data
+                    }).then(() => {
+                        this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data: MessageLog[]) => {
+                            set2 = data
+                            allSets.arr1 = set1
+                            allSets.arr2 = set2
+                            resolve(allSets)
+                        })
                     })
                 })
             })
         })
-        })
         return promiseQuery
     }
-    // compare results and return differences`
+
+    // compare results and return differences
     private async checkArrayDifferences(args: { arr1?: any[], arr2?: any[] }): Promise<any[]> {
         return new Promise((resolve, reject) => {
             let missingMsg: MessageLog[] = []

+ 0 - 185
test/temptest.ts

@@ -1,185 +0,0 @@
-import { resolve } from "path";
-import { Observable, map, Subject, takeUntil, take } from "rxjs";
-import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
-import { LogSetting } from "../dependencies/fisloggingservice/type/datatype";
-import { AcknowledgementService } from "../services/acknowledgement.service";
-import { IncomingMessageService } from "../services/incomingMessage.service";
-import { MessageSyncrhonizationService } from "../services/synchronization.service";
-import { MessageSynchronisationServiceSetting } from "../type/datatype";
-import { StreamingService } from "./test-streamOBS";
-
-
-/* Pre - Defined Data && Settings */
-const stream = new StreamingService()
-// Declare source Services && Observables (Using File Storage) Simulating Full Logs
-const source_synchronize = new MessageSyncrhonizationService()
-const source_payload: Observable<BaseMessage> = stream.stream()
-const source_incoming = new IncomingMessageService()
-const source_payload_subject : Subject<BaseMessage> = new Subject()
-source_payload.subscribe(
-    {
-        next: (data) => {
-            source_payload_subject.next(data)
-            // console.log(data)
-        }
-    }
-)
-const source_acknowledge = new AcknowledgementService()
-const source_payload_string = source_payload.pipe(
-    map((data) => {
-        return JSON.stringify(data);
-    })
-)
-
-// Declare target Services && Observables (Using MongoDB Storage) Simulating Partial Logs
-const target_payload: Observable<BaseMessage> = stream.stream().pipe(take(2))
-const target_payload_subject: Subject<BaseMessage> = new Subject()
-target_payload.subscribe(
-    {
-        next: (data) => {
-            target_payload_subject.next(data)
-            // console.log(data)
-        }
-    }
-)
-const target_incoming = new IncomingMessageService()
-const target_syncrhonize = new MessageSyncrhonizationService()
-const target_acknowledge = new AcknowledgementService()
-const target_payload_string = target_payload.pipe(
-    map((data) => {
-        return JSON.stringify(data);
-    }),
-)
-
-// Decalre Source Storage
-let source_storage: LogSetting = {
-    storage: "File",
-    setting: {
-        appName: 'Default from client',
-        appLocName: 'To be generated in client',
-        logLocName: 'To be generated in client',
-    }
-}
-
-let source_dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> } = {
-    storage: source_storage.storage,
-    setting: source_storage.setting,
-    customSetting: source_storage.customSetting,
-    incomingObservable: source_payload_subject
-}
-
-//Declare Target Storage
-let target_storage: LogSetting = {
-    storage: "MongoDB",
-    setting: {
-        appName: 'Default from client',
-        appLocName: 'To be generated in client',
-        logLocName: 'To be generated in client',
-    },
-    customSetting: {
-        srv: true,
-        user: "testDB",
-        password: "h1nt1OyXw6QeUnzS",
-        server: "cluster0.29sklte.mongodb.net",
-        collection: "log",
-    }
-}
-let target_dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> } = {
-    storage: target_storage.storage,
-    setting: target_storage.setting,
-    customSetting: target_storage.customSetting,
-    incomingObservable: target_payload_subject
-}
-
-// Combine source and target storage to form MessageSynchronisationServiceSetting
-let settings: MessageSynchronisationServiceSetting = {
-    incomingSource: {
-        //all of the settings to be combined here
-        ...source_storage,
-        tags: ['Incoming']
-    }, //LogSetting & {tags:string[] },   
-    target: {
-        ...target_storage,
-        tags: ['Incoming']
-    }  //LogSetting & {tags:string[] }  
-}
-
-
-/* ------------- CODING TEST --------------- */
-const new_subject: Subject<BaseMessage> = new Subject();
-const subjectObs = { // Observer but a subject looking to subscribe
-    next: (data) => {
-        console.log(data)
-    }
-}
-new_subject.subscribe(subjectObs) // Observerble(Subject) subcribed and stream data for observer(subject) "subjectObs"
-
-source_incoming.init(
-    {
-        storage: source_storage.storage,
-        setting: source_storage.setting,
-        customSetting: source_storage.customSetting,
-        incomingObservable: new_subject.asObservable()
-    }
-)
-target_incoming.init(
-    {
-        storage: target_storage.storage,
-        setting: target_storage.setting,
-        customSetting: target_storage.customSetting,
-        incomingObservable: new_subject.asObservable()
-    }
-)
-setTimeout(() => {
-    new_subject.next(<BaseMessage>{
-        "header": {
-            "messageType": "Command",
-            "messageID": "ab05f310-f3c5-4fd0-9af1-15cda97b4444",
-            "messageName": "Command",
-            "dateCreated": "2023-02-13T03:33:58.746Z",
-            "isAggregate": false,
-            "dataSourceTiming": "",
-            "serviceId": "",
-            "userId": "",
-            "requesterId": "Generatede203a86a-c99e-460e-95ff-f2dc7f484a7d",
-            "messageProducerInformation": {
-                "origin": {
-                    "userApplication": {
-                        "userAppId": "FisAppID/Name",
-                        "userAppName": "Client"
-                    }
-                },
-                "components": "Presentation"
-            },
-            "security": {
-                "ucpId": "GeneratedFromMessageSync"
-            },
-            "messageDataLocation": {
-                "isEmbaded": true
-            },
-            "messageDataFormat": {
-                "dataFormat": "Json"
-            },
-            "requestExecutionMode": 0,
-            "resquestTimeOut": 0,
-            "command": "New"
-        },
-        "data": {
-            "header": "fa29074d-9718-4aba-9999-0001",
-            "data": {
-                "appLogLocId": "fa29074d-9718-4aba-9999-0001",
-                "appData": {
-                    "msgId": "6c162cd3-d42d-4ab4-8882-0001",
-                    "msgLogDateTime": "2022-12-06T15:01:46.987Z",
-                    "msgDateTime": "2022-12-06T08:50:33.809Z",
-                    "msgTag": [
-                        "oval",
-                        "likable"
-                    ],
-                    "msgPayload": "Molestias facilis iusto similique iste voluptas facere. Alias est sequi. Quos consequatur temporibus blanditiis numquam vel. Eos repellat eaque. Voluptatibus optio optio magni eveniet. Quidem architecto esse aut sint neque error magnam perspiciatis."
-                }
-            }
-        }
-    }
-    )
-}, 1000)

+ 4 - 16
test/test-streamOBS.ts

@@ -1,3 +1,6 @@
+/* ----------------------        Simulate a stream of messages to be inserted or used by the test        ---------------------- */
+
+
 import { from, map, Observable, of, Subject } from "rxjs";
 import * as fs from "fs"
 import { BaseMessage } from "../dependencies/fisloggingservice/services/logging-service";
@@ -6,21 +9,6 @@ export class StreamingService {
     private messagesJSON: any = fs.readFileSync("testRequest.json")
     private messages = JSON.parse(this.messagesJSON)
 
-    // public stream(): Observable<BaseMessage> {
-    //     return new Observable(subject => {
-    //         let messages = this.messages
-    //         let count = 0
-    //         const intervalId = setInterval(() => {
-    //             subject.next(messages[count]);
-    //             count++;
-    //             if (count >= 4) {
-    //                 clearInterval(intervalId);
-    //                 subject.complete();
-    //             }
-    //         }, 1000)
-    //     })
-    // }
-
     public stream(): Subject<BaseMessage> {
         let result: Subject<BaseMessage> = new Subject()
         let messages = this.messages
@@ -28,7 +16,7 @@ export class StreamingService {
         const intervalId = setInterval(() => {
             result.next(messages[count]);
             count++;
-            if (count >= 4) {
+            if (count >= 5) {
                 clearInterval(intervalId);
                 result.complete();
             }

+ 14 - 35
test/test3a.ts

@@ -1,21 +1,22 @@
-import { stat } from "fs";
-import { resolve } from "path";
+/*  -----------------------       TEST3A {File to Mongo}    -----------------------   */
+/* This test is focusing on comparing 2 different arrays of message logs from 2 different storage. 
+Which is local file storage as the control/source, and then comparing the data from cloud mongoDB
+server data, and then synchronizing them */
+
 import { Observable, map, Subject, takeUntil, take, of, timer, from } from "rxjs";
 import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
 import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
-import { AcknowledgementService } from "../services/acknowledgement.service";
 import { IncomingMessageService } from "../services/incomingMessage.service";
 import { MessageSyncrhonizationService } from "../services/synchronization.service";
 import { MessageSynchronisationServiceSetting } from "../type/datatype";
 import { StreamingService } from "./test-streamOBS";
 
-
 /* Pre - Defined Data && Settings */
 const stream = new StreamingService()
 
 // Declare source Services && Observables (Using File Storage) Simulating Full Logs
 const source_synchronize = new MessageSyncrhonizationService()
-const source_payload: Observable<BaseMessage> = stream.stream()
+const source_payload: Observable<BaseMessage> = stream.stream().pipe(take(4))
 const source_incoming = new IncomingMessageService()
 const source_payload_subject: Subject<BaseMessage> = new Subject()
 source_payload.subscribe({
@@ -24,15 +25,7 @@ source_payload.subscribe({
         // console.log(data)
     }
 })
-const source_payload_string = source_payload.pipe(
-    map((data) => {
-        return JSON.stringify(data);
-    })
-)
-
-
 // Declare target Services && Observables (Using MongoDB Storage) Simulating Partial Logs
-const target_syncrhonize = new MessageSyncrhonizationService()
 const target_payload: Observable<BaseMessage> = stream.stream().pipe(take(2))
 const target_payload_subject: Subject<BaseMessage> = new Subject()
 const target_incoming = new IncomingMessageService()
@@ -43,16 +36,11 @@ target_payload.subscribe({
     error: e => console.error(e),
     complete: () => { `Target Payload Completed` }
 })
-const target_payload_string = target_payload.pipe(
-    map((data) => {
-        return JSON.stringify(data);
-    }),
-)
+
 // testing to see if data is sent in
 target_payload_subject.subscribe({
     next: element => {
-        console.log(`target_payload_subject emits : ${element.header.messageID}`)
-        // Missing MessageLog Data is sent in
+        console.log(`target_payload_subject emits :00 ${element.header.messageID}`)
     }
 })
 
@@ -110,25 +98,21 @@ let settings: MessageSynchronisationServiceSetting = {
     }  //LogSetting & {tags:string[] }  
 }
 
-const triggerSync = timer(5000).pipe(map(
-    (value) => String(value)
-))
-
 /* -------- SYNCHRONIZATION --------- */
 function initializeData() {
     source_incoming.init(source_dataSet)
-    target_incoming.init(target_dataSet) 
+    target_incoming.init(target_dataSet)
 }
 
-// Done by 4 seconds
+// Done by appoximately 5-8 Seconds
 initializeData()
 source_synchronize.init(settings)
 
-// by 5th second 
-setTimeout(() => { 
+// by 10th second 
+setTimeout(() => {
 
     let triggerSync = from(['Newsynch'])
- 
+
     let sync = source_synchronize.subscribe(triggerSync)
     sync.subscribe({
         next: (msgToBeSynched) => {
@@ -137,9 +121,4 @@ setTimeout(() => {
         }
     })
 
-}, 30000)//30s
-
-// To DO
-// more test files
-// synch an additional source message at runtime(after 30sec)
-// more comments to be more readable
+}, 7000)//30s

+ 132 - 0
test/test3b.ts

@@ -0,0 +1,132 @@
+/*  -----------------------       TEST3B {Mongo to File}     -----------------------   */
+/* This test is focusing on comparing 2 different arrays of message logs from 2 different storage. 
+Which is cloud mongo storage as the control/source, and then comparing the data from local file
+server data, and then synchronizing them */
+
+import { Observable, map, Subject, takeUntil, take, of, timer, from } from "rxjs";
+import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
+import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
+import { AcknowledgementService } from "../services/acknowledgement.service";
+import { IncomingMessageService } from "../services/incomingMessage.service";
+import { MessageSyncrhonizationService } from "../services/synchronization.service";
+import { MessageSynchronisationServiceSetting } from "../type/datatype";
+import { StreamingService } from "./test-streamOBS";
+
+/* Pre - Defined Data && Settings */
+const stream = new StreamingService()
+
+// Declare source Services && Observables (Using File Storage) Simulating Full Logs
+const source_synchronize = new MessageSyncrhonizationService()
+const source_payload: Observable<BaseMessage> = stream.stream().pipe(take(2))
+const source_incoming = new IncomingMessageService()
+const source_payload_subject: Subject<BaseMessage> = new Subject()
+source_payload.subscribe({
+    next: (data) => {
+        source_payload_subject.next(<BaseMessage>data)
+        // console.log(data)
+    }
+})
+// Declare target Services && Observables (Using MongoDB Storage) Simulating Partial Logs
+const target_payload: Observable<BaseMessage> = stream.stream().pipe(take(4))
+const target_payload_subject: Subject<BaseMessage> = new Subject()
+const target_incoming = new IncomingMessageService()
+target_payload.subscribe({
+    next: (data) => {
+        target_payload_subject.next(<BaseMessage>data)
+    },
+    error: e => console.error(e),
+    complete: () => { `Target Payload Completed` }
+})
+
+// testing to see if data is sent in
+target_payload_subject.subscribe({
+    next: element => {
+        console.log(`target_payload_subject emits :00 ${element.header.messageID}`)
+    }
+})
+
+source_payload_subject.subscribe({
+    next: element => {
+        console.log(`source_payload_subject emits :00 ${element.header.messageID}`)
+    }
+})
+
+// Declare Source Storage
+let source_storage: LogSetting = {
+
+    storage: "MongoDB",
+    setting: {
+        appName: 'Default from client',
+        appLocName: 'To be generated in client',
+        logLocName: 'To be generated in client',
+    },
+    customSetting: {
+        srv: true,
+        user: "testDB",
+        password: "h1nt1OyXw6QeUnzS",
+        server: "cluster0.29sklte.mongodb.net",
+        collection: "log",
+    }
+}
+
+let source_dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> } = {
+    storage: source_storage.storage,
+    setting: source_storage.setting,
+    customSetting: source_storage.customSetting,
+    incomingObservable: source_payload_subject
+}
+
+//Declare Target Storage
+let target_storage: LogSetting = {
+    storage: "File",
+    setting: {
+        appName: 'Default from client',
+        appLocName: 'To be generated in client',
+        logLocName: 'To be generated in client',
+    }
+}
+
+let target_dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> } = {
+    storage: target_storage.storage,
+    setting: target_storage.setting,
+    customSetting: target_storage.customSetting,
+    incomingObservable: target_payload_subject
+}
+
+// Combine source and target storage to form MessageSynchronisationServiceSetting
+let settings: MessageSynchronisationServiceSetting = {
+    incomingSource: {
+        //all of the settings to be combined here
+        ...target_storage,
+        tags: ['Incoming']
+    }, //LogSetting & {tags:string[] },   
+    target: {
+        ...source_storage,
+        tags: ['Incoming']
+    }  //LogSetting & {tags:string[] }  
+}
+
+/* -------- SYNCHRONIZATION --------- */
+function initializeData() {
+    source_incoming.init(source_dataSet)
+    target_incoming.init(target_dataSet)
+}
+
+// Done by appoximately 5-8 Seconds
+initializeData()
+source_synchronize.init(settings)
+
+// by 10th second 
+setTimeout(() => {
+
+    let triggerSync = from(['Newsynch'])
+
+    let sync = source_synchronize.subscribe(triggerSync)
+    sync.subscribe({
+        next: (msgToBeSynched) => {
+            // console.log(`synching ... ${msgToBeSynched.header.messageID}`)
+            source_payload_subject.next(msgToBeSynched)
+        }
+    })
+
+}, 7000)//30s

+ 130 - 0
test/test3c.ts

@@ -0,0 +1,130 @@
+/*  -----------------------       TEST3A {Mongo to Mongo}    -----------------------   */
+/* This test is focusing on comparing 2 different arrays of message logs from 2 different storage. 
+Which is local file mongo as the control/source, and then comparing the data from cloud mongoDB
+server data, and then synchronizing them */
+
+import { Observable, map, Subject, takeUntil, take, of, timer, from } from "rxjs";
+import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
+import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
+import { AcknowledgementService } from "../services/acknowledgement.service";
+import { IncomingMessageService } from "../services/incomingMessage.service";
+import { MessageSyncrhonizationService } from "../services/synchronization.service";
+import { MessageSynchronisationServiceSetting } from "../type/datatype";
+import { StreamingService } from "./test-streamOBS";
+
+/* Pre - Defined Data && Settings */
+const stream = new StreamingService()
+
+// Declare source Services && Observables (Using File Storage) Simulating Full Logs
+const source_synchronize = new MessageSyncrhonizationService()
+const source_payload: Observable<BaseMessage> = stream.stream().pipe(take(4))
+const source_incoming = new IncomingMessageService()
+const source_payload_subject: Subject<BaseMessage> = new Subject()
+source_payload.subscribe({
+    next: (data) => {
+        source_payload_subject.next(data)
+        // console.log(data)
+    }
+})
+// Declare target Services && Observables (Using MongoDB Storage) Simulating Partial Logs
+const target_payload: Observable<BaseMessage> = stream.stream().pipe(take(2))
+const target_payload_subject: Subject<BaseMessage> = new Subject()
+const target_incoming = new IncomingMessageService()
+target_payload.subscribe({
+    next: (data) => {
+        target_payload_subject.next(<BaseMessage>data)
+    },
+    error: e => console.error(e),
+    complete: () => { `Target Payload Completed` }
+})
+
+// testing to see if data is sent in
+target_payload_subject.subscribe({
+    next: element => {
+        console.log(`target_payload_subject emits :00 ${element.header.messageID}`)
+    }
+})
+
+// Declare Source Storage
+let source_storage: LogSetting = {
+    storage: "MongoDB",
+    setting: {
+        appName: 'Default from client',
+        appLocName: 'To be generated in client',
+        logLocName: 'To be generated in client',
+    },
+    customSetting: {
+        server: "localhost:27017",
+        collection: "log"
+    }
+}
+
+let source_dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> } = {
+    storage: source_storage.storage,
+    setting: source_storage.setting,
+    customSetting: source_storage.customSetting,
+    incomingObservable: source_payload_subject
+}
+
+//Declare Target Storage
+let target_storage: LogSetting = {
+    storage: "MongoDB",
+    setting: {
+        appName: 'Default from client',
+        appLocName: 'To be generated in client',
+        logLocName: 'To be generated in client',
+    },
+    customSetting: {
+        srv: true,
+        user: "testDB",
+        password: "h1nt1OyXw6QeUnzS",
+        server: "cluster0.29sklte.mongodb.net",
+        collection: "log",
+    }
+}
+
+let target_dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> } = {
+    storage: target_storage.storage,
+    setting: target_storage.setting,
+    customSetting: target_storage.customSetting,
+    incomingObservable: target_payload_subject
+}
+
+// Combine source and target storage to form MessageSynchronisationServiceSetting
+let settings: MessageSynchronisationServiceSetting = {
+    incomingSource: {
+        //all of the settings to be combined here
+        ...source_storage,
+        tags: ['Incoming']
+    }, //LogSetting & {tags:string[] },   
+    target: {
+        ...target_storage,
+        tags: ['Incoming']
+    }  //LogSetting & {tags:string[] }  
+}
+
+/* -------- SYNCHRONIZATION --------- */
+function initializeData() {
+    source_incoming.init(source_dataSet)
+    target_incoming.init(target_dataSet)
+}
+
+// Done by appoximately 5-8 Seconds
+initializeData()
+source_synchronize.init(settings)
+
+// by 10th second 
+setTimeout(() => {
+
+    let triggerSync = from(['Newsynch'])
+
+    let sync = source_synchronize.subscribe(triggerSync)
+    sync.subscribe({
+        next: (msgToBeSynched) => {
+            // console.log(`synching ... ${msgToBeSynched.header.messageID}`)
+            target_payload_subject.next(msgToBeSynched)
+        }
+    })
+
+}, 7000)
+

+ 50 - 0
testRequest.json

@@ -198,5 +198,55 @@
                 }
             }
         }
+    },
+    {
+        "header": {
+            "messageType": "Command",
+            "messageID": "ab05f310-f3c5-4fd0-9af1-15cda97b5555",
+            "messageName": "Command",
+            "dateCreated": "2023-02-13T03:33:58.746Z",
+            "isAggregate": false,
+            "dataSourceTiming": "",
+            "serviceId": "",
+            "userId": "",
+            "requesterId": "Generatede203a86a-c99e-460e-95ff-f2dc7f484a7d",
+            "messageProducerInformation": {
+                "origin": {
+                    "userApplication": {
+                        "userAppId": "FisAppID/Name",
+                        "userAppName": "Client"
+                    }
+                },
+                "components": "Presentation"
+            },
+            "security": {
+                "ucpId": "GeneratedFromMessageSync"
+            },
+            "messageDataLocation": {
+                "isEmbaded": true
+            },
+            "messageDataFormat": {
+                "dataFormat": "Json"
+            },
+            "requestExecutionMode": 0,
+            "resquestTimeOut": 0,
+            "command": "New"
+        },
+        "data": {
+            "header": "fa29074d-9718-4aba-9999-0001",
+            "data": {
+                "appLogLocId": "fa29074d-9718-4aba-9999-0001",
+                "appData": {
+                    "msgId": "6c162cd3-d42d-4ab4-8882-0001",
+                    "msgLogDateTime": "2022-12-06T15:01:46.987Z",
+                    "msgDateTime": "2022-12-06T08:50:33.809Z",
+                    "msgTag": [
+                        "oval",
+                        "likable"
+                    ],
+                    "msgPayload": "Molestias facilis iusto similique iste voluptas facere. Alias est sequi. Quos consequatur temporibus blanditiis numquam vel. Eos repellat eaque. Voluptatibus optio optio magni eveniet. Quidem architecto esse aut sint neque error magnam perspiciatis."
+                }
+            }
+        }
     }
 ]