import { Observable, Subject } from "rxjs"; import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies"; import { LoggingService } from "../dependencies/fisloggingservice/services/logging-service"; import { MessageSynchronisationServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype"; export class MessageSyncrhonizationService implements MessageSynchronisationServiceInterface { settings: MessageSynchronisationServiceSetting sourceSrc: LoggingService = new LoggingService() targetSrc: LoggingService = new LoggingService() public async init(settings: MessageSynchronisationServiceSetting): Promise { this.settings = settings; // Wrap promise so that init must be instantiated with resolve before proceeding with subscribe method let promiseInit : Promise = new Promise((resolve, reject) => { try { this.sourceSrc.init(settings.incomingSource).then(() => { this.targetSrc.init(settings.target); }).then(() => resolve()) } catch (e) { console.error(e) } }) return promiseInit } public subscribe(obs: Observable): Observable { let subjectOutput = new Subject() obs.subscribe({ next: element => { let data = JSON.parse(element) console.log(data.header.messageID) } }) // filter all source tags[0] log data let set1 = this.sourceSrc.filter( { msgTag: this.settings.incomingSource.tags[0] }) let set2 = this.targetSrc.filter( { msgTag: this.settings.target.tags[0] } ) this.compareResult(set1, set2).then((data) => { data.forEach(element => { subjectOutput.next(element) }) }) return subjectOutput.asObservable() } // compare results and return differences private async compareResult(args: Promise, args2: Promise): Promise { let dataset3 await args.then((data) => { console.log(data) args2.then((data2) => { data.forEach((element) => { data2.forEach((element2) => { dataset3 = '' // missing messages }) }) }) }) return dataset3 } }