synchronization.service.ts 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import { resolve } from "path";
  2. import { Observable, of, Subject } from "rxjs";
  3. import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
  4. import { LoggingService } from "../dependencies/fisloggingservice/services/logging-service";
  5. import { MessageLog } from "../dependencies/fisloggingservice/type/datatype";
  6. import { MessageSynchronisationServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype";
  7. export class MessageSyncrhonizationService implements MessageSynchronisationServiceInterface {
  8. settings: MessageSynchronisationServiceSetting
  9. sourceSrc: LoggingService = new LoggingService()
  10. targetSrc: LoggingService = new LoggingService()
  11. public async init(settings: MessageSynchronisationServiceSetting): Promise<void> {
  12. this.settings = settings;
  13. // Wrap promise so that init must be instantiated with resolve before proceeding with subscribe method
  14. let promiseInit: Promise<void> = new Promise((resolve, reject) => {
  15. try {
  16. this.sourceSrc.init(settings.incomingSource).then((data) => {
  17. if (!data) reject()
  18. }).then(() => {
  19. this.targetSrc.init(settings.target).then((data) => {
  20. if (!data) reject()
  21. resolve()
  22. })
  23. })
  24. }
  25. catch (e) {
  26. console.error(e)
  27. }
  28. })
  29. return promiseInit
  30. }
  31. // Incoming obstriger serves as a trigger point to perform another synchronization
  32. public subscribe(obsTrigger: Observable<string>): Observable<any> {
  33. let subjectOutput = this.syncrhonize()
  34. this.targetSrc.subscribe(subjectOutput)
  35. subjectOutput.subscribe({
  36. next: missingMsg => console.log(`Synchronizing ${missingMsg.appData.msgId}`)
  37. })
  38. // obs.subscribe({
  39. // next: element => {
  40. // subjectOutput = this.syncrhonize()
  41. // }
  42. // })
  43. return subjectOutput.asObservable()
  44. }
  45. private syncrhonize(): Subject<any> {
  46. let subjectOutput = new Subject()
  47. this.acquireData().then((data: { arr1: MessageLog[], arr2: MessageLog[] }) => {
  48. if (data.arr1.length === data.arr2.length) {
  49. console.log(`No synchronization needed`)
  50. } else {
  51. this.checkArrayDifferences(data).then((data: MessageLog[]) => {
  52. data.forEach(msgElement => {
  53. subjectOutput.next(msgElement)
  54. })
  55. })
  56. }
  57. }).catch((e) => console.error(e))
  58. return subjectOutput
  59. }
  60. // Acquires the available data from designated storage
  61. private async acquireData(): Promise<any> {
  62. const promiseQuery: Promise<any> = new Promise((resolve, reject) => {
  63. let allSets: {
  64. arr1: MessageLog[],
  65. arr2: MessageLog[]
  66. } = {
  67. arr1: [],
  68. arr2: []
  69. }
  70. let set1
  71. let set2
  72. this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data: MessageLog[]) => {
  73. set1 = data
  74. }).then(() => {
  75. this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data: MessageLog[]) => {
  76. set2 = data
  77. allSets.arr1 = set1
  78. allSets.arr2 = set2
  79. resolve(allSets)
  80. })
  81. })
  82. })
  83. return promiseQuery
  84. }
  85. // compare results and return differences`
  86. private async checkArrayDifferences(args: { arr1?: any[], arr2?: any[] }): Promise<any[]> {
  87. return new Promise((resolve, reject) => {
  88. let missingMsg: MessageLog[] = []
  89. args.arr1.forEach((msgElement: MessageLog) => {
  90. if (args.arr2.some(obj => obj.appData.msgId === msgElement.appData.msgId)) {
  91. console.log(`Item Found!`)
  92. } else {
  93. console.log(`This ${msgElement.appData.msgId} is not found`)
  94. missingMsg.push(msgElement)
  95. resolve(missingMsg)
  96. }
  97. })
  98. })
  99. }
  100. }