synchronization.service.ts 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. import { resolve } from "path";
  2. import { map, Observable, of, Subject } from "rxjs";
  3. import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
  4. import { LoggingService } from "../dependencies/fisloggingservice/services/logging-service";
  5. import { MessageLog } from "../dependencies/fisloggingservice/type/datatype";
  6. import { MessageSynchronisationServiceInterface, MessageSynchronisationServiceSetting, ErrorTrigger } from "../type/datatype";
  7. export class MessageSyncrhonizationService implements MessageSynchronisationServiceInterface {
  8. private settings: MessageSynchronisationServiceSetting
  9. private sourceSrc: LoggingService = new LoggingService()
  10. private targetSrc: LoggingService = new LoggingService()
  11. public init(settings: MessageSynchronisationServiceSetting): void {
  12. this.settings = settings;
  13. }
  14. /* This functions will subsribe to the designated error triggers. The error will trigger the need to
  15. sync, should the user or circumstances necessitates it. */
  16. public subscribe(obsTrigger: Observable<ErrorTrigger>): Observable<BaseMessage> {
  17. // Create a subject as a means to return the missing messages if there's any.
  18. let msg: Subject<BaseMessage> = new Subject()
  19. // Subsribe to the errorTrigger obs to listen to any notification.
  20. obsTrigger.subscribe({
  21. next: obsTrigger => {
  22. let missingMsg = this.dataConversion()
  23. missingMsg.subscribe({
  24. next: element => {
  25. msg.next(element)
  26. }
  27. })
  28. }
  29. })
  30. if (!obsTrigger) {
  31. this.dataConversion()
  32. }
  33. let result: Observable<BaseMessage> = msg.asObservable()
  34. return result
  35. }
  36. private dataConversion(): Observable<BaseMessage> {
  37. // let subjectOutput = this.syncrhonize()
  38. let obsOutput: Observable<BaseMessage> = this.synchronize().pipe(
  39. map((msg: MessageLog) => {
  40. console.log(`Converting this ${msg.appData.msgId}`)
  41. return JSON.parse(<string>msg.appData.msgPayload)
  42. })
  43. )
  44. return obsOutput
  45. }
  46. // Returns all the missing data to be synchronized in the observables later
  47. private synchronize(): Subject<any> {
  48. let subjectOutput = new Subject()
  49. this.acquireData().then((data: { arr1: MessageLog[], arr2: MessageLog[] }) => {
  50. if (data.arr1.length === data.arr2.length) {
  51. console.log(`No synchronization needed`)
  52. } else {
  53. this.checkArrayDifferences(data).then((data: MessageLog[]) => {
  54. data.forEach(msgElement => {
  55. subjectOutput.next(msgElement)
  56. })
  57. })
  58. }
  59. }).catch((e) => console.error(e))
  60. return subjectOutput
  61. }
  62. // Acquires the available data from designated target and source storage
  63. private async acquireData(): Promise<any> {
  64. const promiseQuery: Promise<any> = new Promise((resolve, reject) => {
  65. let allSets: {
  66. arr1: MessageLog[],
  67. arr2: MessageLog[]
  68. } = {
  69. arr1: [],
  70. arr2: []
  71. }
  72. let set1
  73. let set2
  74. this.sourceSrc.init(this.settings.incomingSource).then(() => {
  75. this.targetSrc.init(this.settings.target).then(() => {
  76. this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data: MessageLog[]) => {
  77. set1 = data
  78. }).then(() => {
  79. this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data: MessageLog[]) => {
  80. set2 = data
  81. allSets.arr1 = set1
  82. allSets.arr2 = set2
  83. resolve(allSets)
  84. })
  85. })
  86. })
  87. })
  88. })
  89. return promiseQuery
  90. }
  91. // compare results and return differences
  92. private async checkArrayDifferences(args: { arr1?: any[], arr2?: any[] }): Promise<any[]> {
  93. return new Promise((resolve, reject) => {
  94. let missingMsg: MessageLog[] = []
  95. args.arr1.forEach((msgElement: MessageLog) => {
  96. if (args.arr2.some(obj => obj.appData.msgId === msgElement.appData.msgId)) {
  97. console.log(`Item Found!`)
  98. } else {
  99. console.log(`This ${msgElement.appData.msgId} is not found`)
  100. missingMsg.push(msgElement)
  101. resolve(missingMsg)
  102. }
  103. })
  104. })
  105. }
  106. }