message-auditor.service.ts 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. import { map, Observable, of, Subject } from "rxjs";
  2. import { ErrorTrigger, MessageAuditorServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype";
  3. import { LoggingService } from "../dependencies/log/interface/export";
  4. import { MessageLog } from "../dependencies/log/type/datatype";
  5. import { _ } from 'lodash'
  6. import { BaseMessage, RequestMessage, ResponseMessage } from "../dependencies/msgutil/interface/export";
  7. export class MessageAuditorService implements MessageAuditorServiceInterface {
  8. private settings: MessageSynchronisationServiceSetting
  9. private sourceSrc: LoggingService = new LoggingService()
  10. private targetSrc: LoggingService = new LoggingService()
  11. private missingMessageSubject: Subject<MessageLog> = new Subject()
  12. private filter: any
  13. /* Set up the targets or points of synchronization. This is where it will register the 2 different location of
  14. the data to be synchronized */
  15. public init(settings: MessageSynchronisationServiceSetting): void {
  16. this.settings = settings;
  17. }
  18. /* This is the main interface of the message sync service. The argument will take in an observable stream of
  19. error notifications, prompting it to perform the subscription of the targeted sources and it's corresponding
  20. target. Essentially, this does not synchronize, but rather it checks against the two sources and compare
  21. and return the missing data, which will then be passed into the targeted subject stream as specified by the
  22. respective client. They can choose how they want to handle the missing messages returned. */
  23. public subscribe(obsTrigger: Observable<ErrorTrigger>): Observable<MessageLog> {
  24. // Subsribe to the errorTrigger obs to listen to any notification.
  25. obsTrigger.subscribe({
  26. next: obsTrigger => {
  27. console.log(obsTrigger.message)// just checking the message
  28. if (!this.filter) {
  29. console.log(`No filter applies`)
  30. } else {
  31. console.log(`Synchronizating with filters: ${Object.keys(this.filter)}: ${Object.values(this.filter)}`)
  32. }
  33. let missingMsg: Observable<MessageLog> = this.synchronize()
  34. missingMsg.subscribe({
  35. next: element => {
  36. this.missingMessageSubject.next(element)
  37. console.log(`AuditService: Returning missing messages ${element.appData.msgId} ....`)
  38. }
  39. })
  40. }
  41. })
  42. return this.missingMessageSubject
  43. }
  44. public setFilter(filters: any) {
  45. this.filter = filters
  46. console.log(`Integrating filters: ${Object.keys(this.filter)} in AuditMessage service`)
  47. }
  48. /* ________________ Private Functions _________________ */
  49. // Filtering functions to filter out messages
  50. private filterData(filters: any, message: MessageLog): boolean {
  51. let response: boolean = true //Just using this like a statemanagement
  52. let payload: BaseMessage = JSON.parse(message.appData.msgPayload as string) // Extract the payload from the messageLog first
  53. // Making a separate function to cater to different multi filter conditions are coded below
  54. function checkValues(filter): boolean { //FYI, all parameters are string
  55. let key = Object.keys(filter)
  56. let value = Object.values(filter)
  57. let res = _.get(payload, key[0])
  58. // Check first if the payload has the filtering properties/path
  59. if (_.has(payload, key[0])) {
  60. // check if value is equal to fitler's
  61. if (value == res) {
  62. return true
  63. } else {
  64. return false
  65. }
  66. } else {
  67. console.log(`${key} does not exists in payload`)
  68. return false
  69. }
  70. }
  71. if (filters) { // if filters is not null
  72. if (Object.keys(filters).length > 1) {
  73. let totalCount = Object.keys(filters).length
  74. let matchedCount = 0
  75. Object.entries(filters).forEach(([key, value]) => {
  76. let filter = { [key]: value }
  77. // console.log(filter)
  78. if (checkValues(filter) == true) matchedCount++
  79. })
  80. if (totalCount == matchedCount) {
  81. response = true
  82. } else {
  83. response = false
  84. }
  85. } else {
  86. if (checkValues(filters) == true) {
  87. response = true
  88. } else {
  89. response = false
  90. }
  91. }
  92. } else {
  93. response = true
  94. }
  95. return response
  96. }
  97. /* This is where the 'synching' operation takes place. */
  98. private synchronize(): Subject<MessageLog> {
  99. let subjectOutput: Subject<MessageLog> = new Subject()
  100. // Acquire the data from both location and return them as an array respectively.
  101. this.acquireData().then((data: { arr1: MessageLog[], arr2: MessageLog[] }) => {
  102. // In the case where there are differences in the array length, then extensive comparison
  103. // will be carried out to filter out the differences. Differences are the missing data.
  104. this.checkArrayDifferences(data).then((data: MessageLog[]) => {
  105. data.forEach(msgElement => {
  106. let refined = JSON.parse(JSON.stringify(msgElement))
  107. // Once the missing data has been weeded out, it is then passed into the Subject
  108. // to be returned for the subscribe method.`
  109. subjectOutput.next(refined)
  110. })
  111. })
  112. }).catch((e) => console.error(e))
  113. return subjectOutput
  114. }
  115. /* This is where the targeted data is queried. The process is pretty straightforward. */
  116. private async acquireData(): Promise<any> {
  117. const promiseQuery: Promise<any> = new Promise((resolve, reject) => {
  118. // declare what to expect.
  119. let allSets: {
  120. arr1: MessageLog[],
  121. arr2: MessageLog[]
  122. } = {
  123. arr1: [],
  124. arr2: []
  125. }
  126. let set1: MessageLog[] = []
  127. let set2: MessageLog[] = []
  128. // Initiate the source to find the location of the targeted data to be synched.
  129. this.sourceSrc.init(this.settings.incomingSource).then(() => {
  130. this.targetSrc.init(this.settings.target).then(() => {
  131. // Filter also carries out the query aspect of the operation, allowing it to acquire all the relevant data.
  132. this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data: MessageLog[]) => {
  133. data.forEach((message: MessageLog) => {
  134. if (this.filterData(this.filter, message)) set1.push(message)
  135. })
  136. }).catch((err) => {
  137. console.error(err.message)
  138. }).then(() => {
  139. this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data: MessageLog[]) => {
  140. data.forEach(message => {
  141. if (this.filterData(this.filter, message)) set2.push(message)
  142. })
  143. allSets.arr1 = set1
  144. allSets.arr2 = set2
  145. resolve(allSets)
  146. })
  147. })
  148. })
  149. })
  150. })
  151. return promiseQuery
  152. }
  153. // compare results and return differences
  154. private async checkArrayDifferences(args: { arr1: MessageLog[], arr2: MessageLog[] }): Promise<MessageLog[]> {
  155. return new Promise((resolve, reject) => {
  156. let missingMsg: MessageLog[] = []
  157. args.arr1.forEach((msgElement: MessageLog) => {
  158. // In this case, we are just checking if the msgId matches within the given the array.
  159. // Just to save time, there's no need to check the entire message structure unless
  160. // the circumstances necessitates it.
  161. if (args.arr2.some(obj => obj.appData.msgId === msgElement.appData.msgId)) {
  162. console.log(`Item Found!`)
  163. } else {
  164. console.log(`This ${msgElement.appData.msgId} is missing`)
  165. missingMsg.push(msgElement)
  166. resolve(missingMsg)
  167. }
  168. })
  169. })
  170. }
  171. }