import { resolve } from "path"; import { Observable, of, Subject } from "rxjs"; import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies"; import { LoggingService } from "../dependencies/fisloggingservice/services/logging-service"; import { MessageLog } from "../dependencies/fisloggingservice/type/datatype"; import { MessageSynchronisationServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype"; export class MessageSyncrhonizationService implements MessageSynchronisationServiceInterface { settings: MessageSynchronisationServiceSetting sourceSrc: LoggingService = new LoggingService() targetSrc: LoggingService = new LoggingService() public async init(settings: MessageSynchronisationServiceSetting): Promise { this.settings = settings; // Wrap promise so that init must be instantiated with resolve before proceeding with subscribe method let promiseInit: Promise = new Promise((resolve, reject) => { try { this.sourceSrc.init(settings.incomingSource).then((data) => { if (!data) reject() }).then(() => { this.targetSrc.init(settings.target).then((data) => { if (!data) reject() resolve() }) }) } catch (e) { console.error(e) } }) return promiseInit } // Incoming obstriger serves as a trigger point to perform another synchronization public subscribe(obsTrigger: Observable): Observable { let subjectOutput = this.syncrhonize() this.targetSrc.subscribe(subjectOutput) subjectOutput.subscribe({ next: missingMsg => console.log(`Synchronizing ${missingMsg.appData.msgId}`) }) // obs.subscribe({ // next: element => { // subjectOutput = this.syncrhonize() // } // }) return subjectOutput.asObservable() } private syncrhonize(): Subject { let subjectOutput = new Subject() this.acquireData().then((data: { arr1: MessageLog[], arr2: MessageLog[] }) => { if (data.arr1.length === data.arr2.length) { console.log(`No synchronization needed`) } else { this.checkArrayDifferences(data).then((data: MessageLog[]) => { data.forEach(msgElement => { subjectOutput.next(msgElement) }) }) } }).catch((e) => console.error(e)) return subjectOutput } // Acquires the available data from designated storage private async acquireData(): Promise { const promiseQuery: Promise = new Promise((resolve, reject) => { let allSets: { arr1: MessageLog[], arr2: MessageLog[] } = { arr1: [], arr2: [] } let set1 let set2 this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data: MessageLog[]) => { set1 = data }).then(() => { this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data: MessageLog[]) => { set2 = data allSets.arr1 = set1 allSets.arr2 = set2 resolve(allSets) }) }) }) return promiseQuery } // compare results and return differences` private async checkArrayDifferences(args: { arr1?: any[], arr2?: any[] }): Promise { return new Promise((resolve, reject) => { let missingMsg: MessageLog[] = [] args.arr1.forEach((msgElement: MessageLog) => { if (args.arr2.some(obj => obj.appData.msgId === msgElement.appData.msgId)) { console.log(`Item Found!`) } else { console.log(`This ${msgElement.appData.msgId} is not found`) missingMsg.push(msgElement) resolve(missingMsg) } }) }) } }