message-auditor.service.ts 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. import { map, Observable, of, Subject } from "rxjs";
  2. import { BaseMessage } from "../dependencies/msgutil/dependencies/dependencies";
  3. import { LoggingService } from "../dependencies/log/services/logging-service";
  4. import { LogSetting, MessageLog } from "../dependencies/log/type/datatype";
  5. import { ErrorTrigger, MessageAuditorServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype";
  6. export class MessageAuditorService implements MessageAuditorServiceInterface {
  7. private settings: MessageSynchronisationServiceSetting
  8. private sourceSrc: LoggingService = new LoggingService()
  9. private targetSrc: LoggingService = new LoggingService()
  10. /* Set up the targets or points of synchronization. This is where it will register the 2 different location of
  11. the data to be synchronized */
  12. public init(settings: MessageSynchronisationServiceSetting): void {
  13. this.settings = settings;
  14. }
  15. /* This is the main interface of the message sync service. The argument will take in an observable stream of
  16. error notifications, prompting it to perform the subscription of the targeted sources and it's corresponding
  17. target. Essentially, this does not in synchronize, but rather it checks against the two sources and compare
  18. and return the missing data, which will then be passed into the targeted subject stream as specified by the
  19. respective client. They can choose how they want to handle the missing messages returned. */
  20. public subscribe(obsTrigger: Observable<ErrorTrigger>): Observable<BaseMessage> {
  21. // Create a subject as a means to return the missing messages if there's any.
  22. let msg: Subject<BaseMessage> = new Subject()
  23. // Subsribe to the errorTrigger obs to listen to any notification.
  24. obsTrigger.subscribe({
  25. next: obsTrigger => {
  26. console.log(obsTrigger.message)// just checking the message
  27. let missingMsg = this.dataConversion()
  28. missingMsg.subscribe({
  29. next: element => {
  30. msg.next(element)
  31. }
  32. })
  33. }
  34. })
  35. // Not sure why this piece of code is here. It generally doesn't affect the function
  36. // if (!obsTrigger) {
  37. // this.dataConversion()
  38. // }
  39. let result: Observable<BaseMessage> = msg.asObservable()
  40. return result
  41. }
  42. // Need to change the data to json format first
  43. private dataConversion(): Observable<BaseMessage> {
  44. // let subjectOutput = this.syncrhonize()
  45. let obsOutput: Observable<BaseMessage> = this.synchronize().pipe(
  46. map((msg: MessageLog) => {
  47. console.log(`Converting this ${msg.appData.msgId}`)
  48. return JSON.parse(<string>msg.appData.msgPayload)
  49. })
  50. )
  51. return obsOutput
  52. }
  53. /* This is where the 'synching' operation takes place. */
  54. private synchronize(): Subject<any> {
  55. let subjectOutput = new Subject()
  56. // Acquire the data from both location and return them as an array respectively.
  57. this.acquireData().then((data: { arr1: MessageLog[], arr2: MessageLog[] }) => {
  58. // Check for length first. If the length matches, then there's no need to sync
  59. // since there's nothing missing.
  60. if (data.arr1.length === data.arr2.length) {
  61. console.log(`No synchronization needed`)
  62. } else {
  63. // In the case where there are differences in the array lengthh, then extensive comparison
  64. // will be carried to filter out the differences. Differences are the missing data.
  65. this.checkArrayDifferences(data).then((data: MessageLog[]) => {
  66. data.forEach(msgElement => {
  67. // Once the missing data has been weeded out, it is then passed into the Subject
  68. // to be returned for the subscribe method.
  69. subjectOutput.next(msgElement)
  70. })
  71. })
  72. }
  73. }).catch((e) => console.error(e))
  74. return subjectOutput
  75. }
  76. /* This is where the targeted data is queried. The process is pretty straightforward. */
  77. private async acquireData(): Promise<any> {
  78. const promiseQuery: Promise<any> = new Promise((resolve, reject) => {
  79. // declare what to expect.
  80. let allSets: {
  81. arr1: MessageLog[],
  82. arr2: MessageLog[]
  83. } = {
  84. arr1: [],
  85. arr2: []
  86. }
  87. let set1
  88. let set2
  89. // Initiate the source to find the location of the targeted data to be synched.
  90. this.sourceSrc.init(this.settings.incomingSource).then(() => {
  91. this.targetSrc.init(this.settings.target).then(() => {
  92. // Filter also carries out the query aspect of the operation, allowing it to acquire all the relevant data.
  93. this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data: MessageLog[]) => {
  94. set1 = data
  95. }).then(() => {
  96. this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data: MessageLog[]) => {
  97. set2 = data
  98. allSets.arr1 = set1
  99. allSets.arr2 = set2
  100. resolve(allSets)
  101. })
  102. })
  103. })
  104. })
  105. })
  106. return promiseQuery
  107. }
  108. // compare results and return differences
  109. private async checkArrayDifferences(args: { arr1?: any[], arr2?: any[] }): Promise<any[]> {
  110. return new Promise((resolve, reject) => {
  111. let missingMsg: MessageLog[] = []
  112. args.arr1.forEach((msgElement: MessageLog) => {
  113. // In this case, we are just checking if the msgId matches within the given the array.
  114. // Just to save time, there's no need to check the entire message structure unless
  115. // the circumstances necessitates it.
  116. if (args.arr2.some(obj => obj.appData.msgId === msgElement.appData.msgId)) {
  117. console.log(`Item Found!`)
  118. } else {
  119. console.log(`This ${msgElement.appData.msgId} is not found`)
  120. missingMsg.push(msgElement)
  121. resolve(missingMsg)
  122. }
  123. })
  124. })
  125. }
  126. }