浏览代码

synchronize fixes and types structuring

Enzo 1 年之前
父节点
当前提交
74b33f11b0
共有 3 个文件被更改,包括 46 次插入33 次删除
  1. 41 24
      services/synchronization.service.ts
  2. 4 8
      test/test3a.ts
  3. 1 1
      type/datatype.ts

+ 41 - 24
services/synchronization.service.ts

@@ -2,6 +2,7 @@ import { resolve } from "path";
 import { Observable, of, Subject } from "rxjs";
 import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
 import { LoggingService } from "../dependencies/fisloggingservice/services/logging-service";
+import { MessageLog } from "../dependencies/fisloggingservice/type/datatype";
 import { MessageSynchronisationServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype";
 
 export class MessageSyncrhonizationService implements MessageSynchronisationServiceInterface {
@@ -17,13 +18,9 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
             try {
                 this.sourceSrc.init(settings.incomingSource).then((data) => {
                     if (!data) reject()
-                    // console.log(`File Storage:`)
-                    // data.forEach(e => console.log(e.appLogLocId))
                 }).then(() => {
                     this.targetSrc.init(settings.target).then((data) => {
                         if (!data) reject()
-                        // console.log(`Mongo Storage`)
-                        // data.forEach(e => console.log(e.appLogLocId))
                         resolve()
                     })
                 })
@@ -35,31 +32,51 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
         return promiseInit
     }
 
-    public async subscribe(obs: Observable<string>): Promise<Observable<any>> {
-        let subjectOutput = new Subject()
+    // Incoming obstriger serves as a trigger point to perform another synchronization
+    public subscribe(obsTrigger: Observable<string>): Observable<any> {
+        let subjectOutput = this.syncrhonize()
 
-        this.acquireData().then((data) => {
-            this.compareResult(data).then((data) => {
-                const obs : Observable<any> = of(...data)
-                obs.subscribe(subjectOutput)
-                // log remaining data?
-                this.targetSrc.subscribe(obs)
-            })
-        }).catch((e) => console.error(e))
+        // obs.subscribe({
+        //     next: element => {
+        //         subjectOutput = this.syncrhonize()
+        //     }
+        // })
 
         return subjectOutput.asObservable()
     }
 
+    private syncrhonize(): Subject<any> {
+        let subjectOutput = new Subject()
+        this.acquireData().then((data: { arr1: MessageLog[], arr2: MessageLog[] }) => {
+            if (data.arr1.length === data.arr2.length) {
+                console.log(`No synchronization needed`)
+            } else {
+                this.checkArrayDifferences(data).then((data: MessageLog[]) => {
+                    data.forEach(msgElement => {
+                        subjectOutput.next(msgElement)
+                    })
+                })
+            }
+        }).catch((e) => console.error(e))
+        return subjectOutput
+    }
+
     // Acquires the available data from designated storage
     private async acquireData(): Promise<any> {
         const promiseQuery: Promise<any> = new Promise((resolve, reject) => {
-            let allSets: any = {}
+            let allSets: {
+                arr1: MessageLog[],
+                arr2: MessageLog[]
+            } = {
+                arr1: [],
+                arr2: []
+            }
             let set1
             let set2
-            this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data) => {
+            this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data: MessageLog[]) => {
                 set1 = data
             }).then(() => {
-                this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data) => {
+                this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data: MessageLog[]) => {
                     set2 = data
                     allSets.arr1 = set1
                     allSets.arr2 = set2
@@ -70,16 +87,16 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
         return promiseQuery
     }
     // compare results and return differences
-    private async compareResult(args: any): Promise<any> {
+    private async checkArrayDifferences(args: { arr1?: any[], arr2?: any[] }): Promise<any[]> {
         return new Promise((resolve, reject) => {
-            let data = []
-            args.arr1.forEach((element) => {
-                if (args.arr2.some(obj => obj.appData.msgId === element.appData.msgId)) {
+            let missingMsg: MessageLog[] = []
+            args.arr1.forEach((msgElement: MessageLog) => {
+                if (args.arr2.some(obj => obj.appData.msgId === msgElement.appData.msgId)) {
                     console.log(`Item Found!`)
                 } else {
-                    console.log(`This ${element.appData.msgId} is not found`)
-                    data.push(element)
-                    resolve(data)
+                    console.log(`This ${msgElement.appData.msgId} is not found`)
+                    missingMsg.push(msgElement)
+                    resolve(missingMsg)
                 }
             })
         })

+ 4 - 8
test/test3a.ts

@@ -103,14 +103,10 @@ function initializeData() {
 /* Please note that this operation assumes that there's already existing data in the designated storage place. It still cannot perform real-time live streaming dynamically
 when there is a streaming occuring.  */
 source_synchronize.init(settings).then(() => {
-    source_synchronize.subscribe(source_payload_string).then((data) => {
-        data.subscribe({
-            next: data => {
-                console.log(`Here's the missing data to be synchronized to target`)
-                console.log(data.appData.msgId)
-                target_payload_subject.next(data)
-            }
-        })
+    source_synchronize.subscribe(source_payload_string).subscribe({
+        next: msgToBeSynchronized => {
+            target_payload_subject.next(msgToBeSynchronized)
+        }
     })
 })
 

+ 1 - 1
type/datatype.ts

@@ -29,5 +29,5 @@ export interface MessageSynchronisationServiceInterface {
     // Set default setting
     init(settings: MessageSynchronisationServiceSetting): void;
     // Subscribe to trigger
-    subscribe(obs: Observable<string>): Promise<Observable<any>>;
+    subscribe(obs: Observable<string>): Observable<any>;
 }