123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- import { map, Observable, of, Subject } from "rxjs";
- import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
- import { LoggingService } from "../dependencies/log/services/logging-service";
- import { LogSetting, MessageLog } from "../dependencies/log/type/datatype";
- import { ErrorTrigger, MessageAuditorServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype";
- export class MessageAuditorService implements MessageAuditorServiceInterface {
- private settings: MessageSynchronisationServiceSetting
- private sourceSrc: LoggingService = new LoggingService()
- private targetSrc: LoggingService = new LoggingService()
- /* Set up the targets or points of synchronization. This is where it will register the 2 different location of
- the data to be synchronized */
- public init(settings: MessageSynchronisationServiceSetting): void {
- this.settings = settings;
- }
- /* This is the main interface of the message sync service. The argument will take in an observable stream of
- error notifications, prompting it to perform the subscription of the targeted sources and it's corresponding
- target. Essentially, this does not in synchronize, but rather it checks against the two sources and compare
- and return the missing data, which will then be passed into the targeted subject stream as specified by the
- respective client. They can choose how they want to handle the missing messages returned. */
- public subscribe(obsTrigger: Observable<ErrorTrigger>): Observable<BaseMessage> {
- // Create a subject as a means to return the missing messages if there's any.
- let msg: Subject<BaseMessage> = new Subject()
- // Subsribe to the errorTrigger obs to listen to any notification.
- obsTrigger.subscribe({
- next: obsTrigger => {
- console.log(obsTrigger.message)// just checking the message
- let missingMsg = this.dataConversion()
- missingMsg.subscribe({
- next: element => {
- msg.next(element)
- }
- })
- }
- })
- // Not sure why this piece of code is here. It generally doesn't affect the function
- // if (!obsTrigger) {
- // this.dataConversion()
- // }
- let result: Observable<BaseMessage> = msg.asObservable()
- return result
- }
- // Need to change the data to json format first
- private dataConversion(): Observable<BaseMessage> {
- // let subjectOutput = this.syncrhonize()
- let obsOutput: Observable<BaseMessage> = this.synchronize().pipe(
- map((msg: MessageLog) => {
- console.log(`Converting this ${msg.appData.msgId}`)
- return JSON.parse(<string>msg.appData.msgPayload)
- })
- )
- return obsOutput
- }
- /* This is where the 'synching' operation takes place. */
- private synchronize(): Subject<any> {
- let subjectOutput = new Subject()
- // Acquire the data from both location and return them as an array respectively.
- this.acquireData().then((data: { arr1: MessageLog[], arr2: MessageLog[] }) => {
- // Check for length first. If the length matches, then there's no need to sync
- // since there's nothing missing.
- if (data.arr1.length === data.arr2.length) {
- console.log(`No synchronization needed`)
- } else {
- // In the case where there are differences in the array lengthh, then extensive comparison
- // will be carried to filter out the differences. Differences are the missing data.
- this.checkArrayDifferences(data).then((data: MessageLog[]) => {
- data.forEach(msgElement => {
- // Once the missing data has been weeded out, it is then passed into the Subject
- // to be returned for the subscribe method.
- subjectOutput.next(msgElement)
- })
- })
- }
- }).catch((e) => console.error(e))
- return subjectOutput
- }
- /* This is where the targeted data is queried. The process is pretty straightforward. */
- private async acquireData(): Promise<any> {
- const promiseQuery: Promise<any> = new Promise((resolve, reject) => {
- // declare what to expect.
- let allSets: {
- arr1: MessageLog[],
- arr2: MessageLog[]
- } = {
- arr1: [],
- arr2: []
- }
- let set1
- let set2
- // Initiate the source to find the location of the targeted data to be synched.
- this.sourceSrc.init(this.settings.incomingSource).then(() => {
- this.targetSrc.init(this.settings.target).then(() => {
- // Filter also carries out the query aspect of the operation, allowing it to acquire all the relevant data.
- this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data: MessageLog[]) => {
- set1 = data
- }).then(() => {
- this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data: MessageLog[]) => {
- set2 = data
- allSets.arr1 = set1
- allSets.arr2 = set2
- resolve(allSets)
- })
- })
- })
- })
- })
- return promiseQuery
- }
- // compare results and return differences
- private async checkArrayDifferences(args: { arr1?: any[], arr2?: any[] }): Promise<any[]> {
- return new Promise((resolve, reject) => {
- let missingMsg: MessageLog[] = []
- args.arr1.forEach((msgElement: MessageLog) => {
- // In this case, we are just checking if the msgId matches within the given the array.
- // Just to save time, there's no need to check the entire message structure unless
- // the circumstances necessitates it.
- if (args.arr2.some(obj => obj.appData.msgId === msgElement.appData.msgId)) {
- console.log(`Item Found!`)
- } else {
- console.log(`This ${msgElement.appData.msgId} is not found`)
- missingMsg.push(msgElement)
- resolve(missingMsg)
- }
- })
- })
- }
- }
|