Enzo 1 рік тому
батько
коміт
1f168f5193
4 змінених файлів з 73 додано та 59 видалено
  1. 1 1
      dependencies/fisloggingservice
  2. 52 45
      services/synchronization.service.ts
  3. 2 1
      test/test1a.ts
  4. 18 12
      test/test3a.ts

+ 1 - 1
dependencies/fisloggingservice

@@ -1 +1 @@
-Subproject commit a148de57e81e42fbe60f22f79dccb2ddc51d173b
+Subproject commit 811be556e1141e6f4d25ef7b29a87941cc21b047

+ 52 - 45
services/synchronization.service.ts

@@ -4,59 +4,66 @@ import { LoggingService } from "../dependencies/fisloggingservice/services/loggi
 import { MessageSynchronisationServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype";
 
 export class MessageSyncrhonizationService implements MessageSynchronisationServiceInterface {
-    
-    settings:MessageSynchronisationServiceSetting
-    sourceSrc:LoggingService = new LoggingService()
-    targetSrc:LoggingService = new LoggingService()
+
+    settings: MessageSynchronisationServiceSetting
+    sourceSrc: LoggingService = new LoggingService()
+    targetSrc: LoggingService = new LoggingService()
 
     public async init(settings: MessageSynchronisationServiceSetting): Promise<void> {
-        console.log(settings)
         this.settings = settings;
-        this.sourceSrc.init(settings.incomingSource); 
-        this.targetSrc.init(settings.target); 
-
+        // Wrap promise so that init must be instantiated with resolve before proceeding with subscribe method
+        let promiseInit : Promise<void> = new Promise((resolve, reject) => {
+            try {
+                this.sourceSrc.init(settings.incomingSource).then(() => {
+                    this.targetSrc.init(settings.target);
+                }).then(() => resolve())
+            }
+            catch (e) {
+                console.error(e)
+            }
+        })
+        return promiseInit
     }
-    public subscribe(obs: Observable<string>): Observable<any> {
 
-        let returnObs:Subject<BaseMessage> = new Subject();
+    public subscribe(obs: Observable<string>): Observable<any> {
+        let subjectOutput = new Subject()
+        obs.subscribe({
+            next: element => {
+                let data = JSON.parse(element)
+                console.log(data.header.messageID)
+            }
+        })
 
         // filter all source tags[0] log data  
-        // this.logSrv1.filter(
-        //    this.settings.incomingSource.tags[0]
-        //    ) = set1
-        // filter all target tags[0] log data    
-        // this.logSrv2.filter(
-        //    this.settings.target.tags[0]
-        //    ) = set2
-
-        // Compare set1 and set2
-        // >> found missing message "4"
-
-        // Send missing messages "4" ( in a loop) 
-        //returnObs.next(message "4")
-
-        obs.subscribe(
-            {
-                next:(data)=>{
-                     // filter all source tags[0] log data  
-                    // this.logSrv1.filter(
-                    //    this.settings.incomingSource.tags[0]
-                    //    ) = set1
-                    // filter all target tags[0] log data    
-                    // this.logSrv2.filter(
-                    //    this.settings.target.tags[0]
-                    //    ) = set2
-
-                    // Compare set1 and set2
-                    // >> found missing message "5x"
-
-                    // Send missing messages "5x" ( in a loop) 
-                    //returnObs.next(message "5x") 
-                }
-            }
+        let set1 = this.sourceSrc.filter(
+            { msgTag: this.settings.incomingSource.tags[0] })
+        let set2 = this.targetSrc.filter(
+            { msgTag: this.settings.target.tags[0] }
         )
-        console.log(obs)
-        return
+        this.compareResult(set1, set2).then((data)  => {
+            data.forEach(element => {
+                subjectOutput.next(element)
+            })
+        })
+
+        return subjectOutput.asObservable()
+    }
+
+    // compare results and return differences
+    private async compareResult(args: Promise<any>, args2: Promise<any>): Promise<any[]> {
+        let dataset3
+        await args.then((data) => {
+            console.log(data)
+            args2.then((data2) => {
+                data.forEach((element) => {
+                    data2.forEach((element2) => {
+                        dataset3 = '' // missing messages
+                    })
+                })
+            })
+        })
+
+        return dataset3
     }
 
 }

+ 2 - 1
test/test1a.ts

@@ -34,4 +34,5 @@ incoming.init(dataSet)
 
 // acknowledge.init(storage).then(() => {
 //     acknowledge.subscribe(dataSet.incomingObservable)
-// })
+// })
+

+ 18 - 12
test/test3a.ts

@@ -23,6 +23,14 @@ const source_payload_string = source_payload.pipe(
 
 // Declare target Services && Observables (Using MongoDB Storage) Simlluating Partial Logs
 const target_payload: Observable<BaseMessage> = stream.stream().pipe(take(2))
+const target_payload_subject: Subject<BaseMessage> = new Subject();
+target_payload.subscribe(
+    {
+        next: (data) => {
+            target_payload_subject.next(data)
+        }
+    }
+)
 const target_incoming = new IncomingMessageService()
 const target_syncrhonize = new MessageSyncrhonizationService()
 const target_acknowledge = new AcknowledgementService()
@@ -86,19 +94,17 @@ let settings: MessageSynchronisationServiceSetting = {
 }
 
 /* -------- SYNCHRONIZATION --------- */
-source_incoming.init(source_dataSet)
-target_incoming.init(target_dataSet)
+function initializeData() {
+    source_incoming.init(source_dataSet)
+    target_incoming.init(target_dataSet)
+}
 
 /*  Type 1 synchronization */
 source_synchronize.init(settings).then(() => {
-    source_synchronize.subscribe(source_payload_string)
+    let synch = source_synchronize.subscribe(source_payload_string)
+    synch.subscribe({
+        next: (data) => {
+            target_payload_subject.next(data)
+        }
+    })
 })
-
-// let sychnedObs = source_synchronize.subscribe(source_payload_string)
-// sychnedObs.subscribe(
-//     {
-//         next: (data) => {
-//             target_payload.next(data)
-//         }
-//     }
-// )