import { stat } from "fs"; import { resolve } from "path"; import { Observable, map, Subject, takeUntil, take, of, timer, from } from "rxjs"; import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies"; import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype"; import { AcknowledgementService } from "../services/acknowledgement.service"; import { IncomingMessageService } from "../services/incomingMessage.service"; import { MessageSyncrhonizationService } from "../services/synchronization.service"; import { MessageSynchronisationServiceSetting } from "../type/datatype"; import { StreamingService } from "./test-streamOBS"; /* Pre - Defined Data && Settings */ const stream = new StreamingService() // Declare source Services && Observables (Using File Storage) Simulating Full Logs const source_synchronize = new MessageSyncrhonizationService() const source_payload: Observable = stream.stream() const source_incoming = new IncomingMessageService() const source_payload_subject: Subject = new Subject() source_payload.subscribe({ next: (data) => { source_payload_subject.next(data) // console.log(data) } }) const source_payload_string = source_payload.pipe( map((data) => { return JSON.stringify(data); }) ) // Declare target Services && Observables (Using MongoDB Storage) Simulating Partial Logs const target_syncrhonize = new MessageSyncrhonizationService() const target_payload: Observable = stream.stream().pipe(take(2)) const target_payload_subject: Subject = new Subject() const target_incoming = new IncomingMessageService() target_payload.subscribe({ next: (data) => { target_payload_subject.next(data) }, error: e => console.error(e), complete: () => { `Target Payload Completed` } }) const target_payload_string = target_payload.pipe( map((data) => { return JSON.stringify(data); }), ) // testing to see if data is sent in target_payload_subject.subscribe({ next: element => { console.log(`target_payload_subject emits : ${element.header.messageID}`) // Missing MessageLog Data is sent in } }) // Declare Source Storage let source_storage: LogSetting = { storage: "File", setting: { appName: 'Default from client', appLocName: 'To be generated in client', logLocName: 'To be generated in client', } } let source_dataSet: LogSetting & { incomingObservable: Observable } = { storage: source_storage.storage, setting: source_storage.setting, customSetting: source_storage.customSetting, incomingObservable: source_payload_subject } //Declare Target Storage let target_storage: LogSetting = { storage: "MongoDB", setting: { appName: 'Default from client', appLocName: 'To be generated in client', logLocName: 'To be generated in client', }, customSetting: { srv: true, user: "testDB", password: "h1nt1OyXw6QeUnzS", server: "cluster0.29sklte.mongodb.net", collection: "log", } } let target_dataSet: LogSetting & { incomingObservable: Observable } = { storage: target_storage.storage, setting: target_storage.setting, customSetting: target_storage.customSetting, incomingObservable: target_payload_subject } // Combine source and target storage to form MessageSynchronisationServiceSetting let settings: MessageSynchronisationServiceSetting = { incomingSource: { //all of the settings to be combined here ...source_storage, tags: ['Incoming'] }, //LogSetting & {tags:string[] }, target: { ...target_storage, tags: ['Incoming'] } //LogSetting & {tags:string[] } } const triggerSync = timer(5000).pipe(map( (value) => String(value) )) /* -------- SYNCHRONIZATION --------- */ function initializeData() { source_incoming.init(source_dataSet) target_incoming.init(target_dataSet) } // Done by 4 seconds initializeData() source_synchronize.init(settings) // by 5th second setTimeout(() => { let triggerSync = from(['Newsynch']) let sync = source_synchronize.subscribe(triggerSync) sync.subscribe({ next: (msgToBeSynched) => { // console.log(`synching ... ${msgToBeSynched.header.messageID}`) target_payload_subject.next(msgToBeSynched) } }) }, 30000)//30s // To DO // more test files // synch an additional source message at runtime(after 30sec) // more comments to be more readable