synchronization.service.ts 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. import { resolve } from "path";
  2. import { map, Observable, of, Subject } from "rxjs";
  3. import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
  4. import { LoggingService } from "../dependencies/fisloggingservice/services/logging-service";
  5. import { MessageLog } from "../dependencies/fisloggingservice/type/datatype";
  6. import { MessageSynchronisationServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype";
  7. export class MessageSyncrhonizationService implements MessageSynchronisationServiceInterface {
  8. private settings: MessageSynchronisationServiceSetting
  9. private sourceSrc: LoggingService = new LoggingService()
  10. private targetSrc: LoggingService = new LoggingService()
  11. public async init(settings: MessageSynchronisationServiceSetting): Promise<void> {
  12. this.settings = settings;
  13. // Wrap promise so that init must be instantiated with resolve before proceeding with subscribe method
  14. let promiseInit: Promise<void> = new Promise((resolve, reject) => {
  15. this.sourceSrc.init(settings.incomingSource).then((data) => {
  16. if (!data) reject()
  17. }).then(() => {
  18. this.targetSrc.init(settings.target).then((data) => {
  19. if (!data) reject()
  20. resolve()
  21. })
  22. })
  23. })
  24. return promiseInit
  25. }
  26. // Incoming obstriger serves as a trigger point to perform another synchronization
  27. public subscribe(obsTrigger: Observable<string>): Observable<BaseMessage> {
  28. let msg : Subject<BaseMessage> = new Subject()
  29. obsTrigger.subscribe({
  30. next: obs => {
  31. console.log(`${obsTrigger} has trigged synchronization`)
  32. this.dataConversion().subscribe({
  33. next: e => console.log(e)
  34. })
  35. // let missingMsg = this.dataConversion()
  36. // missingMsg.subscribe({
  37. // next: element => {
  38. // msg.next(element)
  39. // }
  40. // })
  41. }
  42. })
  43. // trigger by timer
  44. if(!obsTrigger) {
  45. this.dataConversion()
  46. }
  47. let result: Observable<BaseMessage> = msg.asObservable()
  48. return result
  49. }
  50. private dataConversion(): Observable<BaseMessage> {
  51. // let subjectOutput = this.syncrhonize()
  52. let obsOutput: Observable<BaseMessage> = this.syncrhonize().pipe(
  53. map((msg: MessageLog) => {
  54. // console.log(`Converting this ${msg.appData.msgId}`)
  55. return JSON.parse(<string>msg.appData.msgPayload)
  56. })
  57. )
  58. return obsOutput
  59. }
  60. private syncrhonize(): Subject<any> {
  61. let subjectOutput = new Subject()
  62. this.acquireData().then((data: { arr1: MessageLog[], arr2: MessageLog[] }) => {
  63. if (data.arr1.length === data.arr2.length) {
  64. console.log(`No synchronization needed`)
  65. } else {
  66. this.checkArrayDifferences(data).then((data: MessageLog[]) => {
  67. data.forEach(msgElement => {
  68. subjectOutput.next(msgElement)
  69. })
  70. })
  71. }
  72. }).catch((e) => console.error(e))
  73. return subjectOutput
  74. }
  75. // Acquires the available data from designated storage
  76. private async acquireData(): Promise<any> {
  77. const promiseQuery: Promise<any> = new Promise((resolve, reject) => {
  78. let allSets: {
  79. arr1: MessageLog[],
  80. arr2: MessageLog[]
  81. } = {
  82. arr1: [],
  83. arr2: []
  84. }
  85. let set1
  86. let set2
  87. this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data: MessageLog[]) => {
  88. set1 = data
  89. }).then(() => {
  90. this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data: MessageLog[]) => {
  91. set2 = data
  92. allSets.arr1 = set1
  93. allSets.arr2 = set2
  94. resolve(allSets)
  95. })
  96. })
  97. })
  98. return promiseQuery
  99. }
  100. // compare results and return differences`
  101. private async checkArrayDifferences(args: { arr1?: any[], arr2?: any[] }): Promise<any[]> {
  102. return new Promise((resolve, reject) => {
  103. let missingMsg: MessageLog[] = []
  104. args.arr1.forEach((msgElement: MessageLog) => {
  105. if (args.arr2.some(obj => obj.appData.msgId === msgElement.appData.msgId)) {
  106. console.log(`Item Found!`)
  107. } else {
  108. console.log(`This ${msgElement.appData.msgId} is not found`)
  109. missingMsg.push(msgElement)
  110. resolve(missingMsg)
  111. }
  112. })
  113. })
  114. }
  115. }