Enzo 1 年間 前
コミット
9707582e6b
4 ファイル変更181 行追加27 行削除
  1. 1 0
      package.json
  2. 50 3
      services/synchronization.service.ts
  3. 104 0
      test/test3a.ts
  4. 26 24
      type/datatype.ts

+ 1 - 0
package.json

@@ -10,6 +10,7 @@
     "start1b": "node test/test1b.js",
     "start2a": "node test/test2a.js",
     "start2b": "node test/test2b.js",
+    "start3a": "node test/test3a.js",
     "test": "echo \"Error: no test specified\" && exit 1"
   },
   "repository": {

+ 50 - 3
services/synchronization.service.ts

@@ -1,13 +1,60 @@
-import { Observable } from "rxjs";
+import { Observable, Subject } from "rxjs";
+import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
+import { LoggingService } from "../dependencies/fisloggingservice/services/logging-service";
 import { MessageSynchronisationServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype";
 
-
 export class MessageSyncrhonizationService implements MessageSynchronisationServiceInterface {
     
-    public init(settings: MessageSynchronisationServiceSetting): void {
+    settings:MessageSynchronisationServiceSetting
+    sourceSrc:LoggingService = new LoggingService()
+    targetSrc:LoggingService = new LoggingService()
+
+    public async init(settings: MessageSynchronisationServiceSetting): Promise<void> {
         console.log(settings)
+        this.settings = settings;
+        this.sourceSrc.init(settings.incomingSource); 
+        this.targetSrc.init(settings.target); 
+
     }
     public subscribe(obs: Observable<string>): Observable<any> {
+
+        let returnObs:Subject<BaseMessage> = new Subject();
+
+        // filter all source tags[0] log data  
+        // this.logSrv1.filter(
+        //    this.settings.incomingSource.tags[0]
+        //    ) = set1
+        // filter all target tags[0] log data    
+        // this.logSrv2.filter(
+        //    this.settings.target.tags[0]
+        //    ) = set2
+
+        // Compare set1 and set2
+        // >> found missing message "4"
+
+        // Send missing messages "4" ( in a loop) 
+        //returnObs.next(message "4")
+
+        obs.subscribe(
+            {
+                next:(data)=>{
+                     // filter all source tags[0] log data  
+                    // this.logSrv1.filter(
+                    //    this.settings.incomingSource.tags[0]
+                    //    ) = set1
+                    // filter all target tags[0] log data    
+                    // this.logSrv2.filter(
+                    //    this.settings.target.tags[0]
+                    //    ) = set2
+
+                    // Compare set1 and set2
+                    // >> found missing message "5x"
+
+                    // Send missing messages "5x" ( in a loop) 
+                    //returnObs.next(message "5x") 
+                }
+            }
+        )
         console.log(obs)
         return
     }

+ 104 - 0
test/test3a.ts

@@ -0,0 +1,104 @@
+import { Observable, map, Subject, takeUntil, take } from "rxjs";
+import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
+import { LogSetting } from "../dependencies/fisloggingservice/type/datatype";
+import { AcknowledgementService } from "../services/acknowledgement.service";
+import { IncomingMessageService } from "../services/incomingMessage.service";
+import { MessageSyncrhonizationService } from "../services/synchronization.service";
+import { MessageSynchronisationServiceSetting } from "../type/datatype";
+import { StreamingService } from "./test-streamOBS";
+
+
+/* Pre - Defined Data && Settings */
+const stream = new StreamingService()
+// Declare source Services && Observables (Using File Storage) Simulating Full Logs
+const source_synchronize = new MessageSyncrhonizationService()
+const source_payload: Observable<BaseMessage> = stream.stream()
+const source_incoming = new IncomingMessageService()
+const source_acknowledge = new AcknowledgementService()
+const source_payload_string = source_payload.pipe(
+    map((data) => {
+        return JSON.stringify(data);
+    })
+)
+
+// Declare target Services && Observables (Using MongoDB Storage) Simlluating Partial Logs
+const target_payload: Observable<BaseMessage> = stream.stream().pipe(take(2))
+const target_incoming = new IncomingMessageService()
+const target_syncrhonize = new MessageSyncrhonizationService()
+const target_acknowledge = new AcknowledgementService()
+const target_payload_string = target_payload.pipe(
+    map((data) => {
+        return JSON.stringify(data);
+    }),
+)
+
+// Decalre Source Storage
+let source_storage: LogSetting = {
+    storage: "File",
+    setting: {
+        appName: 'Default from client',
+        appLocName: 'To be generated in client',
+        logLocName: 'To be generated in client',
+    }
+}
+
+let source_dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> } = {
+    storage: source_storage.storage,
+    setting: source_storage.setting,
+    customSetting: source_storage.customSetting,
+    incomingObservable: source_payload
+}
+
+//Declare Target Storage
+let target_storage: LogSetting = {
+    storage: "MongoDB",
+    setting: {
+        appName: 'Default from client',
+        appLocName: 'To be generated in client',
+        logLocName: 'To be generated in client',
+    },
+    customSetting: {
+        srv: true,
+        user: "testDB",
+        password: "h1nt1OyXw6QeUnzS",
+        server: "cluster0.29sklte.mongodb.net",
+        collection: "log",
+    }
+}
+let target_dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> } = {
+    storage: target_storage.storage,
+    setting: target_storage.setting,
+    customSetting: target_storage.customSetting,
+    incomingObservable: target_payload
+}
+
+// Combine source and target storage to form MessageSynchronisationServiceSetting
+let settings: MessageSynchronisationServiceSetting = {
+    incomingSource: {
+        //all of the settings to be combined here
+        ...source_storage,
+        tags: ['Incoming']
+    }, //LogSetting & {tags:string[] },   
+    target: {
+        ...target_storage,
+        tags: ['Incoming']
+    }  //LogSetting & {tags:string[] }  
+}
+
+/* -------- SYNCHRONIZATION --------- */
+source_incoming.init(source_dataSet)
+target_incoming.init(target_dataSet)
+
+/*  Type 1 synchronization */
+source_synchronize.init(settings).then(() => {
+    source_synchronize.subscribe(source_payload_string)
+})
+
+// let sychnedObs = source_synchronize.subscribe(source_payload_string)
+// sychnedObs.subscribe(
+//     {
+//         next: (data) => {
+//             target_payload.next(data)
+//         }
+//     }
+// )

+ 26 - 24
type/datatype.ts

@@ -2,30 +2,32 @@ import { Observable } from "rxjs";
 import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
 import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
 
-type IncomingMessageSetting =  LogSetting & {
-    incomingObservable:Observable<BaseMessage>
- }
- 
- // Incoming message Service Class
- export interface IncomingMessageServiceInterface  { 
- 
-     // Set default setting
-     init(settings: IncomingMessageSetting ): void;
- }
+type IncomingMessageSetting = LogSetting & {
+    incomingObservable: Observable<BaseMessage>
+}
 
- //  It can have an incoming and a target logging server. The tags are used to search for specific sets of messages from the logging server. 
+// Incoming message Service Class
+export interface IncomingMessageServiceInterface {
+    // Set default setting
+    init(settings: IncomingMessageSetting): void;
+}
+
+//  It can have an incoming and a target logging server. The tags are used to search for specific sets of messages from the logging server. 
 
 export interface MessageSynchronisationServiceSetting {
-    incomingSource:LogSetting & {tags:string[] },   
-    target:LogSetting & {tags:string[] }
- }
- 
- // Acknowledgement Service Class
- export interface MessageSynchronisationServiceInterface{ 
- 
-     // Set default setting
-     init(settings: MessageSynchronisationServiceSetting ): void;
- 
-     // Subscribe to trigger
-     subscribe(obs:Observable<string>): Observable<any>;
- }
+    incomingSource: LogSettingwTags,
+    target: LogSettingwTags
+}
+
+// Renew Structure To fix undefined issue at test3a.ts init()
+type LogSettingwTags = LogSetting & Tags
+interface Tags {
+    tags: string[]
+}
+// Acknowledgement Service Class
+export interface MessageSynchronisationServiceInterface {
+    // Set default setting
+    init(settings: MessageSynchronisationServiceSetting): void;
+    // Subscribe to trigger
+    subscribe(obs: Observable<string>): Observable<any>;
+}