123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- import { map, Observable, of, Subject } from "rxjs";
- import { ErrorTrigger, MessageAuditorServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype";
- import { MessageLog } from "../dependencies/log/type/datatype";
- import * as _ from 'lodash'
- import { LoggingService } from "../dependencies/log/interface/export";
- import { BaseMessage } from "../dependencies/log/dependencies/msgutil/interface/export";
- let processedMsgIds = new Set();
- require('dotenv').config();
- export class MessageAuditorService implements MessageAuditorServiceInterface {
- private settings: MessageSynchronisationServiceSetting
- private sourceSrc: LoggingService = new LoggingService()
- private targetSrc: LoggingService = new LoggingService()
- private missingMessageSubject: Subject<MessageLog> = new Subject()
- /* Set up the targets or points of synchronization. This is where it will register the 2 different location of
- the data to be synchronized */
- public init(settings: MessageSynchronisationServiceSetting): void {
- this.settings = settings;
- if (settings.filters) {
- console.log(`Integrating filters: ${Object.keys(this.settings.filters)} in AuditMessage service`)
- }
- }
- /* This is the main interface of the message sync service. The argument will take in an observable stream of
- error notifications, prompting it to perform the subscription of the targeted sources and it's corresponding
- target. Essentially, this does not synchronize, but rather it checks against the two sources and compare
- and return the missing data, which will then be passed into the targeted subject stream as specified by the
- respective client. They can choose how they want to handle the missing messages returned. */
- public subscribe(obsTrigger: Observable<ErrorTrigger>): Observable<MessageLog> {
- // Subsribe to the errorTrigger obs to listen to any notification.
- obsTrigger.subscribe({
- next: obsTrigger => {
- console.log(obsTrigger.message)// just checking the message
- if (!this.settings.filters) {
- console.log(`No filters applies`)
- } else {
- console.log(`Synchronizating with filters: '${Object.keys(this.settings.filters)}': '${Object.values(this.settings.filters)}'`)
- }
- let missingMsg: Observable<MessageLog> = this.synchronize()
- missingMsg.subscribe({
- next: element => {
- this.missingMessageSubject.next(element)
- console.log(`AuditService: Returning missing messages ${element.appData.msgId} ....`)
- }
- })
- }
- })
- return this.missingMessageSubject
- }
- /* ________________ Private Functions _________________ */
- // Filtering functions to filters out messages
- private filterData(filters: any, message: MessageLog): boolean {
- let response: boolean = true //Just using this like a statemanagement
- let payload: BaseMessage = JSON.parse(message.appData.msgPayload as string) // Extract the payload from the messageLog first
- this.checkIfIsInPayloadDataFormat(payload) // Convert stringified nested payload if there's any
- // Making a separate function to cater to different multi filters conditions are coded below
- if (filters) { // if filters is not null
- if (Object.keys(filters).length > 1) {
- let totalCount = Object.keys(filters).length
- let matchedCount = 0
- Object.entries(filters).forEach(([key, value]) => {
- let filters = { [key]: value }
- // console.log(filters)
- if (this.checkValues(payload, filters) == true) matchedCount++
- })
- if (totalCount == matchedCount) { // check if all the criterias are met
- response = true
- } else {
- response = false
- }
- } else {
- if (this.checkValues(payload, filters) == true) {
- response = true
- } else {
- response = false
- }
- }
- } else { // if not filters is provided. Then the just let response be true so that the data can be further processed
- response = true
- }
- return response
- }
- /* This is where the 'synching' operation takes place. */
- private synchronize(): Subject<MessageLog> {
- let subjectOutput: Subject<MessageLog> = new Subject()
- // Acquire the data from both location and return them as an array respectively.
- this.acquireData().then((data: { arr1: MessageLog[], arr2: MessageLog[] }) => {
- // In the case where there are differences in the array length, then extensive comparison
- // will be carried out to filters out the differences. Differences are the missing data.
- if(process.env.CheckAudit)
- {
- console.log("Record set 1: ", _.keys(_.countBy(data.arr1,function(data:MessageLog){return data.appData['msgId']})).length);
- console.log("Record set 2: ", _.keys(_.countBy(data.arr2,function(data:MessageLog){return data.appData['msgId']})).length);
- }
- if(process.env.CheckAudit)
- {
- console.log("[CheckAudit] Record set 1: ", _.keys(_.countBy(data.arr1,function(data:MessageLog){return data.appData['msgId']})).length);
- console.log("[CheckAudit] Record set 2: ", _.keys(_.countBy(data.arr2,function(data:MessageLog){return data.appData['msgId']})).length);
- }
- this.checkArrayDifferences(data).then((data: MessageLog[]) => {
- if(process.env.CheckAudit)
- {
- console.log("Difference: ",data.length);
- }
- data.forEach(msgElement => {
- let refined = JSON.parse(JSON.stringify(msgElement))
- // Once the missing data has been weeded out, it is then passed into the Subject
- // to be returned for the subscribe method.`
- subjectOutput.next(refined)
- })
- })
- }).catch((e) => console.error(e))
- return subjectOutput
- }
- /* This is where the targeted data is queried. The process is pretty straightforward. */
- private async acquireData(): Promise<any> {
- const promiseQuery: Promise<any> = new Promise((resolve, reject) => {
- // declare what to expect.
- let allSets: { arr1: MessageLog[], arr2: MessageLog[] } = {
- arr1: [],
- arr2: []
- }
- let set1: MessageLog[] = []
- let set2: MessageLog[] = []
- // Initiate the source to find the location of the targeted data to be synched.
- this.sourceSrc.init(this.settings.incomingSource).then(() => {
- this.targetSrc.init(this.settings.target).then(() => {
- // Filter also carries out the query aspect of the operation, allowing it to acquire all the relevant data.
- this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data: MessageLog[]) => {
- data.forEach((message: MessageLog) => {
- if (this.filterData(this.settings.filters, message)) set1.push(message)
- })
- }).catch((err) => {
- console.error(err.message)
- }).then(() => {
- this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data: MessageLog[]) => {
- data.forEach(message => {
- if (this.filterData(this.settings.filters, message)) set2.push(message)
- })
- allSets.arr1 = set1
- allSets.arr2 = set2
- resolve(allSets)
- })
- })
- })
- })
- })
- return promiseQuery
- }
- // compare results and return differences
- private async checkArrayDifferences(args: { arr1: MessageLog[], arr2: MessageLog[] }): Promise<MessageLog[]> {
- return new Promise((resolve, reject) => {
- let missingMsg: MessageLog[] = []
- args.arr1.forEach((msgElement: MessageLog) => {
- // In this case, we are just checking if the msgId matches within the given the array.
- // Just to save time, there's no need to check the entire message structure unless
- // the circumstances necessitates it.
- if (args.arr2.some(obj => obj.appData.msgId === msgElement.appData.msgId)) {
- console.log(`Item Found!`)
- } else {
- console.log(`This ${msgElement.appData.msgId} is missing`)
- missingMsg.push(msgElement)
- resolve(missingMsg)
- }
- })
- })
- }
- // To be used by the filterData function to check between payload values and filter conditions
- private checkValues(payload, filters): boolean { //FYI, all parameters are string
- let key = Object.keys(filters)
- // console.log(Object.values(filters))
- let value = Object.values(filters)[0]
- let res = _.get(payload, key[0])
- // Check first if the payload has the filtering properties/path
- if (_.has(payload, key[0])) {
- let strarray: string[]
- // check array
- if (Array.isArray(value)) {
- strarray = value as string[]
- }
- else {
- strarray = [value as string]
- }
- // compare array with that string
- if (strarray.includes(res)) {
- return true
- } else {
- return false
- }
- } else {
- console.log(`${key} does not exists in payload`)
- return false
- }
- }
- // Check in the case of notification messages, for the nested data properties
- // Notification message may have multiple nested data properties that maybe in string format
- private checkIfIsInPayloadDataFormat(payload: BaseMessage | any) {
- let parsedData: any
- if (payload.data
- && payload.data.data
- && payload.data.data.data && typeof payload.data.data.data === 'string') {
- parsedData = JSON.parse(payload.data.data.data)
- // console.log(parsedData)
- payload.data.data.data = parsedData
- return payload
- } else {
- return payload
- }
- }
- }
|