Enzo 1 年間 前
コミット
f70f88c481
2 ファイル変更49 行追加35 行削除
  1. 27 16
      services/synchronization.service.ts
  2. 22 19
      test/test3a.ts

+ 27 - 16
services/synchronization.service.ts

@@ -1,5 +1,5 @@
 import { resolve } from "path";
-import { Observable, of, Subject } from "rxjs";
+import { map, Observable, of, Subject } from "rxjs";
 import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
 import { LoggingService } from "../dependencies/fisloggingservice/services/logging-service";
 import { MessageLog } from "../dependencies/fisloggingservice/type/datatype";
@@ -7,9 +7,9 @@ import { MessageSynchronisationServiceInterface, MessageSynchronisationServiceSe
 
 export class MessageSyncrhonizationService implements MessageSynchronisationServiceInterface {
 
-    settings: MessageSynchronisationServiceSetting
-    sourceSrc: LoggingService = new LoggingService()
-    targetSrc: LoggingService = new LoggingService()
+    private settings: MessageSynchronisationServiceSetting
+    private sourceSrc: LoggingService = new LoggingService()
+    private targetSrc: LoggingService = new LoggingService()
 
     public async init(settings: MessageSynchronisationServiceSetting): Promise<void> {
         this.settings = settings;
@@ -33,22 +33,33 @@ export class MessageSyncrhonizationService implements MessageSynchronisationServ
     }
 
     // Incoming obstriger serves as a trigger point to perform another synchronization
-    public subscribe(obsTrigger: Observable<string>): Observable<any> {
-        let subjectOutput = this.syncrhonize()
-        this.targetSrc.subscribe(subjectOutput)
-        subjectOutput.subscribe({
-            next: missingMsg => console.log(`Synchronizing ${missingMsg.appData.msgId}`)
-        })
-        
-        // obs.subscribe({
-        //     next: element => {
-        //         subjectOutput = this.syncrhonize()
-        //     }
+    public subscribe(obsTrigger: Observable<string>): Observable<BaseMessage> {
+
+        // let subjectOutput = this.syncrhonize()
+        // subjectOutput.subscribe({
+        //     next: missingMsg => console.log(`Synchronizing ${missingMsg.appData.msgId}`)
         // })
 
-        return subjectOutput.asObservable()
+        let result = this.dataConversion()
+        // let result = this.dataConversion()
+        // result.subscribe({
+        //     next: e => console.log(e)
+        // })
+        return result
     }
 
+    private dataConversion(): Observable<BaseMessage>{
+        // let subjectOutput = this.syncrhonize()
+        let obsOutput: Observable<BaseMessage> = this.syncrhonize().pipe(
+            map((msg: MessageLog) => {
+                // console.log(`Converting this ${msg.appData.msgId}`)
+                return JSON.parse(<string>msg.appData.msgPayload)
+            })
+        )
+        return obsOutput
+    }
+
+
     private syncrhonize(): Subject<any> {
         let subjectOutput = new Subject()
         this.acquireData().then((data: { arr1: MessageLog[], arr2: MessageLog[] }) => {

+ 22 - 19
test/test3a.ts

@@ -47,7 +47,7 @@ const target_payload_string = target_payload.pipe(
 // testing to see if data is sent in
 target_payload_subject.subscribe({
     next: element => {
-        // console.log(element) 
+        console.log(`target_payload_subject emits : ${element.header.messageID}`)
         // Missing MessageLog Data is sent in
     }
 })
@@ -108,33 +108,36 @@ let settings: MessageSynchronisationServiceSetting = {
 
 
 /* -------- SYNCHRONIZATION --------- */
-function initializeData() {
+async function initializeData(): Promise<void> {
     source_incoming.init(source_dataSet)
     target_incoming.init(target_dataSet)
 }
 
+// initializeData().then(() => {
+//     source_synchronize.init(settings)
+// }).then(() => {
+//     let stream: Observable<BaseMessage> = new MessageSyncrhonizationService().subscribe(source_payload_string)
+//     stream.subscribe({
+//         next: (msgToBeSynced) => {
+//             target_payload_subject.next(msgToBeSynced)
+//             console.log(msgToBeSynced.header.messageID)
+//         }
+//     })
+// })
+
+
+
 /*  Run this code to put some data into the database. 4 in File storage and 2 in Mongo */
-// initializeData()
+initializeData()
 
 /*  Type 1 synchronization */
 /* Please note that this operation assumes that there's already existing data in the designated storage place. It still cannot perform real-time live streaming dynamically
 when there is a streaming occuring.  */
-// source_synchronize.init(settings).then(() => {
-//     source_synchronize.subscribe(source_payload_string).subscribe({
-//         next: (msgToBeSynchronized) => {
-//             target_payload_subject.next(msgToBeSynchronized)
-//             // console.log(msgToBeSynchronized) // It does log missing items
-//         }
-//     })
-// })
-
 source_synchronize.init(settings).then(() => {
-    let stream = source_synchronize.subscribe(source_payload_string) // this returns an observable declared with stream
-    stream.subscribe({
-        next: (messageToBeSync) => {
-            target_payload_subject.next(messageToBeSync)
-            // console.log(messageToBeSync)
+    source_synchronize.subscribe(source_payload_string).subscribe({
+        next: (msgToBeSynchronized) => {
+            target_payload_subject.next(msgToBeSynchronized)
+            // console.log(`Synchronizing ${msgToBeSynchronized.header.messageID}`)
         }
     })
-})
-
+})