import { resolve } from "path"; import { map, Observable, of, Subject } from "rxjs"; import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies"; import { LoggingService } from "../dependencies/fisloggingservice/services/logging-service"; import { MessageLog } from "../dependencies/fisloggingservice/type/datatype"; import { MessageSynchronisationServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype"; export class MessageSyncrhonizationService implements MessageSynchronisationServiceInterface { private settings: MessageSynchronisationServiceSetting private sourceSrc: LoggingService = new LoggingService() private targetSrc: LoggingService = new LoggingService() public async init(settings: MessageSynchronisationServiceSetting): Promise { this.settings = settings; // Wrap promise so that init must be instantiated with resolve before proceeding with subscribe method let promiseInit: Promise = new Promise((resolve, reject) => { this.sourceSrc.init(settings.incomingSource).then((data) => { if (!data) reject() }).then(() => { this.targetSrc.init(settings.target).then((data) => { if (!data) reject() resolve() }) }) }) return promiseInit } // Incoming obstriger serves as a trigger point to perform another synchronization public subscribe(obsTrigger: Observable): Observable { let msg : Subject = new Subject() obsTrigger.subscribe({ next: obs => { let missingMsg = this.dataConversion() missingMsg.subscribe({ next: element => { msg.next(element) } }) } }) // trigger by timer if(!obsTrigger) { this.dataConversion() } let result: Observable = msg.asObservable() return result } private dataConversion(): Observable { // let subjectOutput = this.syncrhonize() let obsOutput: Observable = this.syncrhonize().pipe( map((msg: MessageLog) => { // console.log(`Converting this ${msg.appData.msgId}`) return JSON.parse(msg.appData.msgPayload) }) ) return obsOutput } private syncrhonize(): Subject { let subjectOutput = new Subject() this.acquireData().then((data: { arr1: MessageLog[], arr2: MessageLog[] }) => { if (data.arr1.length === data.arr2.length) { console.log(`No synchronization needed`) } else { this.checkArrayDifferences(data).then((data: MessageLog[]) => { data.forEach(msgElement => { subjectOutput.next(msgElement) }) }) } }).catch((e) => console.error(e)) return subjectOutput } // Acquires the available data from designated storage private async acquireData(): Promise { const promiseQuery: Promise = new Promise((resolve, reject) => { let allSets: { arr1: MessageLog[], arr2: MessageLog[] } = { arr1: [], arr2: [] } let set1 let set2 this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data: MessageLog[]) => { set1 = data }).then(() => { this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data: MessageLog[]) => { set2 = data allSets.arr1 = set1 allSets.arr2 = set2 resolve(allSets) }) }) }) return promiseQuery } // compare results and return differences` private async checkArrayDifferences(args: { arr1?: any[], arr2?: any[] }): Promise { return new Promise((resolve, reject) => { let missingMsg: MessageLog[] = [] args.arr1.forEach((msgElement: MessageLog) => { if (args.arr2.some(obj => obj.appData.msgId === msgElement.appData.msgId)) { console.log(`Item Found!`) } else { console.log(`This ${msgElement.appData.msgId} is not found`) missingMsg.push(msgElement) resolve(missingMsg) } }) }) } }