Przeglądaj źródła

update on synchronization

Enzo 1 rok temu
rodzic
commit
45c190e6fd
3 zmienionych plików z 63 dodań i 34 usunięć
  1. 45 28
      services/synchronization.service.ts
  2. 17 5
      test/test3a.ts
  3. 1 1
      type/datatype.ts

+ 45 - 28
services/synchronization.service.ts

@@ -1,4 +1,5 @@
-import { Observable, Subject } from "rxjs";
+import { resolve } from "path";
+import { Observable, of, Subject } from "rxjs";
 import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
 import { LoggingService } from "../dependencies/fisloggingservice/services/logging-service";
 import { MessageSynchronisationServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype";
@@ -16,13 +17,13 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
             try {
                 this.sourceSrc.init(settings.incomingSource).then((data) => {
                     if (!data) reject()
-                    console.log(`File Storage:`)
-                    data.forEach(e => console.log(e.appLogLocId))
+                    // console.log(`File Storage:`)
+                    // data.forEach(e => console.log(e.appLogLocId))
                 }).then(() => {
                     this.targetSrc.init(settings.target).then((data) => {
                         if (!data) reject()
-                        console.log(`Mongo Storage`)
-                        data.forEach(e => console.log(e.appLogLocId))
+                        // console.log(`Mongo Storage`)
+                        // data.forEach(e => console.log(e.appLogLocId))
                         resolve()
                     })
                 })
@@ -34,37 +35,53 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
         return promiseInit
     }
 
-    public subscribe(obs: Observable<string>): Observable<any> {
+    public async subscribe(obs: Observable<string>): Promise<Observable<any>> {
         let subjectOutput = new Subject()
 
-        // filter all source tags[0] log data  
-        let set1 = this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] })
-        set1.then((data) => console.log(data.length))
-        let set2 = this.targetSrc.filter({ msgTag: this.settings.target.tags[0] })
-        set2.then((data) => console.log(data.length))
-
-        // this.compareResult(set1, set2).then((data) => {
-        //     data.forEach(element => {
-        //         subjectOutput.next(element)
-        //     })
-        // })
+        this.acquireData().then((data) => {
+            this.compareResult(data).then((data) => {
+                const obs : Observable<any> = of(...data)
+                obs.subscribe(subjectOutput)
+            })
+        }).catch((e) => console.error(e))
 
         return subjectOutput.asObservable()
     }
 
-    // compare results and return differences
-    private async compareResult(args: Promise<any>, args2: Promise<any>): Promise<any> {
-        let arr1 = []
-        let arr2 = []
-        args.then((element) => {
-            arr1.push(element)
+    // Acquires the available data from designated storage
+    private async acquireData(): Promise<any> {
+        const promiseQuery: Promise<any> = new Promise((resolve, reject) => {
+            let allSets: any = {}
+            let set1
+            let set2
+            this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data) => {
+                set1 = data
+            }).then(() => {
+                this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data) => {
+                    set2 = data
+                    allSets.arr1 = set1
+                    allSets.arr2 = set2
+                    resolve(allSets)
+                })
+            })
         })
-        args2.then((element) => {
-            arr2.push(element)
+        return promiseQuery
+    }
+    // compare results and return differences
+    private async compareResult(args: any): Promise<any> {
+        return new Promise((resolve, reject) => {
+            let data = []
+            args.arr1.forEach((element) => {
+                if (args.arr2.some(obj => obj.appData.msgId === element.appData.msgId)) {
+                    console.log(`Item Found!`)
+                } else {
+                    console.log(`This ${element.appData.msgId} is not found`)
+                    data.push(element)
+                    resolve(data)
+                }
+            })
         })
-        console.log(arr1)
-        console.log(arr2)
-        return arr1
     }
 
+
 }

+ 17 - 5
test/test3a.ts

@@ -101,10 +101,22 @@ function initializeData() {
 
 /*  Type 1 synchronization */
 source_synchronize.init(settings).then(() => {
-    let synch = source_synchronize.subscribe(source_payload_string)
-    synch.subscribe({
-        next: (data) => {
-            target_payload_subject.next(data)
-        }
+    source_synchronize.subscribe(source_payload_string).then((data) => {
+        data.subscribe({
+            next: data => {
+                console.log(`Here's the missing data to be synchronized to target`)
+                console.log(data.appData.msgId)
+                target_payload_subject.next(data)
+            }
+        })
     })
 })
+
+// synch.subscribe({
+//     next: (data) => {
+//         target_payload_subject.next(data)
+//         console.log(data)
+//     }
+// })
+
+// initializeData()

+ 1 - 1
type/datatype.ts

@@ -29,5 +29,5 @@ export interface MessageSynchronisationServiceInterface {
     // Set default setting
     init(settings: MessageSynchronisationServiceSetting): void;
     // Subscribe to trigger
-    subscribe(obs: Observable<string>): Observable<any>;
+    subscribe(obs: Observable<string>): Promise<Observable<any>>;
 }