Browse Source

current code batch

Enzo 1 year ago
parent
commit
88a4e3df99
4 changed files with 77 additions and 65 deletions
  1. 1 1
      dependencies/fisloggingservice
  2. 28 23
      services/synchronization.service.ts
  3. 29 14
      test/test-streamOBS.ts
  4. 19 27
      test/test3a.ts

+ 1 - 1
dependencies/fisloggingservice

@@ -1 +1 @@
-Subproject commit 811be556e1141e6f4d25ef7b29a87941cc21b047
+Subproject commit 3779a2dd53d7acf06cb67ced30a2f565d589b835

+ 28 - 23
services/synchronization.service.ts

@@ -15,40 +15,45 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
         this.settings = settings;
         // Wrap promise so that init must be instantiated with resolve before proceeding with subscribe method
         let promiseInit: Promise<void> = new Promise((resolve, reject) => {
-            try {
-                this.sourceSrc.init(settings.incomingSource).then((data) => {
+            this.sourceSrc.init(settings.incomingSource).then((data) => {
+                if (!data) reject()
+            }).then(() => {
+                this.targetSrc.init(settings.target).then((data) => {
                     if (!data) reject()
-                }).then(() => {
-                    this.targetSrc.init(settings.target).then((data) => {
-                        if (!data) reject()
-                        resolve()
-                    })
+                    resolve()
                 })
-            }
-            catch (e) {
-                console.error(e)
-            }
+            })
+
         })
         return promiseInit
     }
 
     // Incoming obstriger serves as a trigger point to perform another synchronization
     public subscribe(obsTrigger: Observable<string>): Observable<BaseMessage> {
-
-        // let subjectOutput = this.syncrhonize()
-        // subjectOutput.subscribe({
-        //     next: missingMsg => console.log(`Synchronizing ${missingMsg.appData.msgId}`)
-        // })
-
-        let result = this.dataConversion()
-        // let result = this.dataConversion()
-        // result.subscribe({
-        //     next: e => console.log(e)
-        // })
+        let msg : Subject<BaseMessage> = new Subject()
+        obsTrigger.subscribe({
+            next: obs => {
+                console.log(`${obsTrigger} has trigged synchronization`)
+                this.dataConversion().subscribe({
+                    next: e => console.log(e)
+                })
+                // let missingMsg = this.dataConversion()
+                // missingMsg.subscribe({
+                //     next: element => {
+                //         msg.next(element)
+                //     }
+                // })
+            }
+        })
+        // trigger by timer
+        if(!obsTrigger) {
+            this.dataConversion()
+        }
+        let result: Observable<BaseMessage> = msg.asObservable()
         return result
     }
 
-    private dataConversion(): Observable<BaseMessage>{
+    private dataConversion(): Observable<BaseMessage> {
         // let subjectOutput = this.syncrhonize()
         let obsOutput: Observable<BaseMessage> = this.syncrhonize().pipe(
             map((msg: MessageLog) => {

+ 29 - 14
test/test-streamOBS.ts

@@ -1,4 +1,4 @@
-import { from, map, Observable, of } from "rxjs";
+import { from, map, Observable, of, Subject } from "rxjs";
 import * as fs from "fs"
 import { BaseMessage } from "../dependencies/fisloggingservice/services/logging-service";
 
@@ -6,18 +6,33 @@ export class StreamingService {
     private messagesJSON: any = fs.readFileSync("testRequest.json")
     private messages = JSON.parse(this.messagesJSON)
 
-    public stream(): Observable<BaseMessage> {
-        return new Observable(observer => {
-            let messages = this.messages
-            let count = 0
-            const intervalId = setInterval(() => {
-                observer.next(messages[count]);
-                count++;
-                if (count >= 4) {
-                    clearInterval(intervalId);
-                    observer.complete();
-                }
-            }, 1000)
-        })
+    // public stream(): Observable<BaseMessage> {
+    //     return new Observable(subject => {
+    //         let messages = this.messages
+    //         let count = 0
+    //         const intervalId = setInterval(() => {
+    //             subject.next(messages[count]);
+    //             count++;
+    //             if (count >= 4) {
+    //                 clearInterval(intervalId);
+    //                 subject.complete();
+    //             }
+    //         }, 1000)
+    //     })
+    // }
+
+    public stream(): Subject<BaseMessage> {
+        let result: Subject<BaseMessage> = new Subject()
+        let messages = this.messages
+        let count = 0
+        const intervalId = setInterval(() => {
+            result.next(messages[count]);
+            count++;
+            if (count >= 4) {
+                clearInterval(intervalId);
+                result.complete();
+            }
+        }, 1000)
+        return result
     }
 }

+ 19 - 27
test/test3a.ts

@@ -1,5 +1,6 @@
+import { stat } from "fs";
 import { resolve } from "path";
-import { Observable, map, Subject, takeUntil, take } from "rxjs";
+import { Observable, map, Subject, takeUntil, take, of, timer } from "rxjs";
 import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
 import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
 import { AcknowledgementService } from "../services/acknowledgement.service";
@@ -14,7 +15,7 @@ const stream = new StreamingService()
 
 // Declare source Services && Observables (Using File Storage) Simulating Full Logs
 const source_synchronize = new MessageSyncrhonizationService()
-const source_payload: Observable<BaseMessage> = stream.stream().pipe()
+const source_payload: Observable<BaseMessage> = stream.stream()
 const source_incoming = new IncomingMessageService()
 const source_payload_subject: Subject<BaseMessage> = new Subject()
 source_payload.subscribe({
@@ -29,6 +30,7 @@ const source_payload_string = source_payload.pipe(
     })
 )
 
+
 // Declare target Services && Observables (Using MongoDB Storage) Simulating Partial Logs
 const target_syncrhonize = new MessageSyncrhonizationService()
 const target_payload: Observable<BaseMessage> = stream.stream().pipe(take(2))
@@ -37,7 +39,9 @@ const target_incoming = new IncomingMessageService()
 target_payload.subscribe({
     next: (data) => {
         target_payload_subject.next(<BaseMessage>data)
-    }
+    },
+    error: e => console.error(e),
+    complete: () => { `Target Payload Completed` }
 })
 const target_payload_string = target_payload.pipe(
     map((data) => {
@@ -106,38 +110,26 @@ let settings: MessageSynchronisationServiceSetting = {
     }  //LogSetting & {tags:string[] }  
 }
 
+const triggerSync = timer(5000).pipe(map(
+    (value) => String(value)
+))
 
 /* -------- SYNCHRONIZATION --------- */
-async function initializeData(): Promise<void> {
+function initializeData() {
     source_incoming.init(source_dataSet)
     target_incoming.init(target_dataSet)
+    setTimeout(() => {
+    }, 4500)
 }
 
-// initializeData().then(() => {
-//     source_synchronize.init(settings)
-// }).then(() => {
-//     let stream: Observable<BaseMessage> = new MessageSyncrhonizationService().subscribe(source_payload_string)
-//     stream.subscribe({
-//         next: (msgToBeSynced) => {
-//             target_payload_subject.next(msgToBeSynced)
-//             console.log(msgToBeSynced.header.messageID)
-//         }
-//     })
-// })
-
-
-
-/*  Run this code to put some data into the database. 4 in File storage and 2 in Mongo */
 initializeData()
 
-/*  Type 1 synchronization */
-/* Please note that this operation assumes that there's already existing data in the designated storage place. It still cannot perform real-time live streaming dynamically
-when there is a streaming occuring.  */
 source_synchronize.init(settings).then(() => {
-    source_synchronize.subscribe(source_payload_string).subscribe({
-        next: (msgToBeSynchronized) => {
-            target_payload_subject.next(msgToBeSynchronized)
-            // console.log(`Synchronizing ${msgToBeSynchronized.header.messageID}`)
+    let sync = source_synchronize.subscribe(triggerSync)
+    sync.subscribe({
+        next: (msgToBeSynched) => {
+            // console.log(`synching ... ${msgToBeSynched.header.messageID}`)
+            target_payload_subject.next(msgToBeSynched)
         }
     })
-})
+})