import { resolve } from "path"; import { map, Observable, of, Subject } from "rxjs"; import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies"; import { LoggingService } from "../dependencies/fisloggingservice/services/logging-service"; import { MessageLog } from "../dependencies/fisloggingservice/type/datatype"; import { MessageSynchronisationServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype"; export class MessageSyncrhonizationService implements MessageSynchronisationServiceInterface { private settings: MessageSynchronisationServiceSetting private sourceSrc: LoggingService = new LoggingService() private targetSrc: LoggingService = new LoggingService() public init(settings: MessageSynchronisationServiceSetting): void { this.settings = settings; } // Incoming obstriger serves as a trigger point to perform another synchronization public subscribe(obsTrigger: Observable): Observable { let msg: Subject = new Subject() obsTrigger.subscribe({ next: obsTrigger => { let missingMsg = this.dataConversion() missingMsg.subscribe({ next: element => { msg.next(element) } }) } }) if (!obsTrigger) { this.dataConversion() } let result: Observable = msg.asObservable() return result } private dataConversion(): Observable { // let subjectOutput = this.syncrhonize() let obsOutput: Observable = this.syncrhonize().pipe( map((msg: MessageLog) => { // console.log(`Converting this ${msg.appData.msgId}`) return JSON.parse(msg.appData.msgPayload) }) ) return obsOutput } // Returns all the missing data to be synchronized in the observables later private syncrhonize(): Subject { let subjectOutput = new Subject() this.acquireData().then((data: { arr1: MessageLog[], arr2: MessageLog[] }) => { if (data.arr1.length === data.arr2.length) { console.log(`No synchronization needed`) } else { this.checkArrayDifferences(data).then((data: MessageLog[]) => { data.forEach(msgElement => { subjectOutput.next(msgElement) }) }) } }).catch((e) => console.error(e)) return subjectOutput } // Acquires the available data from designated target and source storage private async acquireData(): Promise { const promiseQuery: Promise = new Promise((resolve, reject) => { let allSets: { arr1: MessageLog[], arr2: MessageLog[] } = { arr1: [], arr2: [] } let set1 let set2 this.sourceSrc.init(this.settings.incomingSource).then(() => { this.targetSrc.init(this.settings.target).then(() => { this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data: MessageLog[]) => { set1 = data }).then(() => { this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data: MessageLog[]) => { set2 = data allSets.arr1 = set1 allSets.arr2 = set2 resolve(allSets) }) }) }) }) }) return promiseQuery } // compare results and return differences private async checkArrayDifferences(args: { arr1?: any[], arr2?: any[] }): Promise { return new Promise((resolve, reject) => { let missingMsg: MessageLog[] = [] args.arr1.forEach((msgElement: MessageLog) => { if (args.arr2.some(obj => obj.appData.msgId === msgElement.appData.msgId)) { console.log(`Item Found!`) } else { console.log(`This ${msgElement.appData.msgId} is not found`) missingMsg.push(msgElement) resolve(missingMsg) } }) }) } }