message-auditor.service.ts 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. import { map, Observable, of, Subject } from "rxjs";
  2. import { ErrorTrigger, MessageAuditorServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype";
  3. import { MessageLog } from "../dependencies/log/type/datatype";
  4. import * as _ from 'lodash'
  5. import { LoggingService } from "../dependencies/log/interface/export";
  6. import { BaseMessage } from "../dependencies/log/dependencies/msgutil/interface/export";
  7. let processedMsgIds = new Set();
  8. export class MessageAuditorService implements MessageAuditorServiceInterface {
  9. private settings: MessageSynchronisationServiceSetting
  10. private sourceSrc: LoggingService = new LoggingService()
  11. private targetSrc: LoggingService = new LoggingService()
  12. private missingMessageSubject: Subject<MessageLog> = new Subject()
  13. /* Set up the targets or points of synchronization. This is where it will register the 2 different location of
  14. the data to be synchronized */
  15. public init(settings: MessageSynchronisationServiceSetting): void {
  16. this.settings = settings;
  17. if (settings.filters) {
  18. console.log(`Integrating filters: ${Object.keys(this.settings.filters)} in AuditMessage service`)
  19. }
  20. }
  21. /* This is the main interface of the message sync service. The argument will take in an observable stream of
  22. error notifications, prompting it to perform the subscription of the targeted sources and it's corresponding
  23. target. Essentially, this does not synchronize, but rather it checks against the two sources and compare
  24. and return the missing data, which will then be passed into the targeted subject stream as specified by the
  25. respective client. They can choose how they want to handle the missing messages returned. */
  26. public subscribe(obsTrigger: Observable<ErrorTrigger>): Observable<MessageLog> {
  27. // Subsribe to the errorTrigger obs to listen to any notification.
  28. obsTrigger.subscribe({
  29. next: obsTrigger => {
  30. console.log(obsTrigger.message)// just checking the message
  31. if (!this.settings.filters) {
  32. console.log(`No filters applies`)
  33. } else {
  34. console.log(`Synchronizating with filters: '${Object.keys(this.settings.filters)}': '${Object.values(this.settings.filters)}'`)
  35. }
  36. let missingMsg: Observable<MessageLog> = this.synchronize()
  37. missingMsg.subscribe({
  38. next: element => {
  39. this.missingMessageSubject.next(element)
  40. console.log(`AuditService: Returning missing messages ${element.appData.msgId} ....`)
  41. }
  42. })
  43. }
  44. })
  45. return this.missingMessageSubject
  46. }
  47. /* ________________ Private Functions _________________ */
  48. // Filtering functions to filters out messages
  49. private filterData(filters: any, message: MessageLog): boolean {
  50. let response: boolean = true //Just using this like a statemanagement
  51. let payload: BaseMessage = JSON.parse(message.appData.msgPayload as string) // Extract the payload from the messageLog first
  52. this.checkIfIsInPayloadDataFormat(payload) // Convert stringified nested payload if there's any
  53. // Making a separate function to cater to different multi filters conditions are coded below
  54. if (filters) { // if filters is not null
  55. if (Object.keys(filters).length > 1) {
  56. let totalCount = Object.keys(filters).length
  57. let matchedCount = 0
  58. Object.entries(filters).forEach(([key, value]) => {
  59. let filters = { [key]: value }
  60. // console.log(filters)
  61. if (this.checkValues(payload, filters) == true) matchedCount++
  62. })
  63. if (totalCount == matchedCount) { // check if all the criterias are met
  64. response = true
  65. } else {
  66. response = false
  67. }
  68. } else {
  69. if (this.checkValues(payload, filters) == true) {
  70. response = true
  71. } else {
  72. response = false
  73. }
  74. }
  75. } else { // if not filters is provided. Then the just let response be true so that the data can be further processed
  76. response = true
  77. }
  78. return response
  79. }
  80. /* This is where the 'synching' operation takes place. */
  81. private synchronize(): Subject<MessageLog> {
  82. let subjectOutput: Subject<MessageLog> = new Subject()
  83. // Acquire the data from both location and return them as an array respectively.
  84. this.acquireData().then((data: { arr1: MessageLog[], arr2: MessageLog[] }) => {
  85. // In the case where there are differences in the array length, then extensive comparison
  86. // will be carried out to filters out the differences. Differences are the missing data.
  87. if(process.env.CheckAudit)
  88. {
  89. console.log("[CheckAudit] Record set 1: ", _.keys(_.countBy(data.arr1,function(data:MessageLog){return data.appData['msgId']})).length);
  90. console.log("[CheckAudit] Record set 2: ", _.keys(_.countBy(data.arr2,function(data:MessageLog){return data.appData['msgId']})).length);
  91. }
  92. this.checkArrayDifferences(data).then((data: MessageLog[]) => {
  93. if(process.env.CheckAudit)
  94. {
  95. console.log("[CheckAudit] Difference: ", _.keys(_.countBy(data,function(msg:MessageLog){return msg.appData['msgId']})).length);
  96. console.log("[CheckAudit] Missing msgId: ", _.keys(_.countBy(data, function (msg: MessageLog) { return msg.appData['msgId'] })));
  97. }
  98. // filter out the duplicate record and only emit the first record
  99. let missingMsgIdArray = _.keys(_.countBy(data, function (msg: MessageLog) { return msg.appData['msgId'] }));
  100. let uniqueRecords = missingMsgIdArray.map(msgId => {
  101. let dataForMsgId = data.find(msg => msg.appData['msgId'] === msgId);
  102. return dataForMsgId;
  103. });
  104. data = uniqueRecords;
  105. data.forEach(msgElement => {
  106. let refined = JSON.parse(JSON.stringify(msgElement))
  107. // Once the missing data has been weeded out, it is then passed into the Subject
  108. // to be returned for the subscribe method.`
  109. subjectOutput.next(refined)
  110. })
  111. })
  112. }).catch((e) => console.error(e))
  113. return subjectOutput
  114. }
  115. /* This is where the targeted data is queried. The process is pretty straightforward. */
  116. private async acquireData(): Promise<any> {
  117. const promiseQuery: Promise<any> = new Promise((resolve, reject) => {
  118. // declare what to expect.
  119. let allSets: { arr1: MessageLog[], arr2: MessageLog[] } = {
  120. arr1: [],
  121. arr2: []
  122. }
  123. let set1: MessageLog[] = []
  124. let set2: MessageLog[] = []
  125. // Initiate the source to find the location of the targeted data to be synched.
  126. this.sourceSrc.init(this.settings.incomingSource).then(() => {
  127. this.targetSrc.init(this.settings.target).then(() => {
  128. // Filter also carries out the query aspect of the operation, allowing it to acquire all the relevant data.
  129. this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data: MessageLog[]) => {
  130. data.forEach((message: MessageLog) => {
  131. if (this.filterData(this.settings.filters, message)) set1.push(message)
  132. })
  133. }).catch((err) => {
  134. console.error(err.message)
  135. }).then(() => {
  136. this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data: MessageLog[]) => {
  137. data.forEach(message => {
  138. if (this.filterData(this.settings.filters, message)) set2.push(message)
  139. })
  140. allSets.arr1 = set1
  141. allSets.arr2 = set2
  142. resolve(allSets)
  143. })
  144. })
  145. })
  146. })
  147. })
  148. return promiseQuery
  149. }
  150. // compare results and return differences
  151. private async checkArrayDifferences(args: { arr1: MessageLog[], arr2: MessageLog[] }): Promise<MessageLog[]> {
  152. return new Promise((resolve, reject) => {
  153. let missingMsg: MessageLog[] = []
  154. args.arr1.forEach((msgElement: MessageLog) => {
  155. // In this case, we are just checking if the msgId matches within the given the array.
  156. // Just to save time, there's no need to check the entire message structure unless
  157. // the circumstances necessitates it.
  158. if (args.arr2.some(obj => obj.appData.msgId === msgElement.appData.msgId)) {
  159. console.log(`Item Found!`)
  160. } else {
  161. console.log(`This ${msgElement.appData.msgId} is missing`)
  162. missingMsg.push(msgElement)
  163. resolve(missingMsg)
  164. }
  165. })
  166. })
  167. }
  168. // To be used by the filterData function to check between payload values and filter conditions
  169. private checkValues(payload, filters): boolean { //FYI, all parameters are string
  170. let key = Object.keys(filters)
  171. // console.log(Object.values(filters))
  172. let value = Object.values(filters)[0]
  173. let res = _.get(payload, key[0])
  174. // Check first if the payload has the filtering properties/path
  175. if (_.has(payload, key[0])) {
  176. let strarray: string[]
  177. // check array
  178. if (Array.isArray(value)) {
  179. strarray = value as string[]
  180. }
  181. else {
  182. strarray = [value as string]
  183. }
  184. // compare array with that string
  185. if (strarray.includes(res)) {
  186. return true
  187. } else {
  188. return false
  189. }
  190. } else {
  191. console.log(`${key} does not exists in payload`)
  192. return false
  193. }
  194. }
  195. // Check in the case of notification messages, for the nested data properties
  196. // Notification message may have multiple nested data properties that maybe in string format
  197. private checkIfIsInPayloadDataFormat(payload: BaseMessage | any) {
  198. let parsedData: any
  199. if (payload.data
  200. && payload.data.data
  201. && payload.data.data.data && typeof payload.data.data.data === 'string') {
  202. parsedData = JSON.parse(payload.data.data.data)
  203. // console.log(parsedData)
  204. payload.data.data.data = parsedData
  205. return payload
  206. } else {
  207. return payload
  208. }
  209. }
  210. }