import { Observable, map, Subject, takeUntil, take } from "rxjs"; import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies"; import { LogSetting } from "../dependencies/fisloggingservice/type/datatype"; import { AcknowledgementService } from "../services/acknowledgement.service"; import { IncomingMessageService } from "../services/incomingMessage.service"; import { MessageSyncrhonizationService } from "../services/synchronization.service"; import { MessageSynchronisationServiceSetting } from "../type/datatype"; import { StreamingService } from "./test-streamOBS"; /* Pre - Defined Data && Settings */ const stream = new StreamingService() // Declare source Services && Observables (Using File Storage) Simulating Full Logs const source_synchronize = new MessageSyncrhonizationService() const source_payload: Observable = stream.stream() const source_incoming = new IncomingMessageService() const source_acknowledge = new AcknowledgementService() const source_payload_string = source_payload.pipe( map((data) => { return JSON.stringify(data); }) ) // Declare target Services && Observables (Using MongoDB Storage) Simlluating Partial Logs const target_payload: Observable = stream.stream().pipe(take(2)) const target_payload_subject: Subject = new Subject(); target_payload.subscribe( { next: (data) => { target_payload_subject.next(data) } } ) const target_incoming = new IncomingMessageService() const target_syncrhonize = new MessageSyncrhonizationService() const target_acknowledge = new AcknowledgementService() const target_payload_string = target_payload.pipe( map((data) => { return JSON.stringify(data); }), ) // Decalre Source Storage let source_storage: LogSetting = { storage: "File", setting: { appName: 'Default from client', appLocName: 'To be generated in client', logLocName: 'To be generated in client', } } let source_dataSet: LogSetting & { incomingObservable: Observable } = { storage: source_storage.storage, setting: source_storage.setting, customSetting: source_storage.customSetting, incomingObservable: source_payload } //Declare Target Storage let target_storage: LogSetting = { storage: "MongoDB", setting: { appName: 'Default from client', appLocName: 'To be generated in client', logLocName: 'To be generated in client', }, customSetting: { srv: true, user: "testDB", password: "h1nt1OyXw6QeUnzS", server: "cluster0.29sklte.mongodb.net", collection: "log", } } let target_dataSet: LogSetting & { incomingObservable: Observable } = { storage: target_storage.storage, setting: target_storage.setting, customSetting: target_storage.customSetting, incomingObservable: target_payload } // Combine source and target storage to form MessageSynchronisationServiceSetting let settings: MessageSynchronisationServiceSetting = { incomingSource: { //all of the settings to be combined here ...source_storage, tags: ['Incoming'] }, //LogSetting & {tags:string[] }, target: { ...target_storage, tags: ['Incoming'] } //LogSetting & {tags:string[] } } /* -------- SYNCHRONIZATION --------- */ function initializeData() { source_incoming.init(source_dataSet) target_incoming.init(target_dataSet) } /* Type 1 synchronization */ /* Please note that this operation assumes that there's already existing data in the designated storage place. It still cannot perform real-time live streaming dynamically when there is a streaming occuring. */ source_synchronize.init(settings).then(() => { source_synchronize.subscribe(source_payload_string).then((data) => { data.subscribe({ next: data => { console.log(`Here's the missing data to be synchronized to target`) console.log(data.appData.msgId) target_payload_subject.next(data) } }) }) }) /* Run this code to pluck some data into the database. 4 in File storage and 2 in Mongo */ // initializeData()