synchronization.service.ts 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. import { resolve } from "path";
  2. import { Observable, of, Subject } from "rxjs";
  3. import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
  4. import { LoggingService } from "../dependencies/fisloggingservice/services/logging-service";
  5. import { MessageSynchronisationServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype";
  6. export class MessageSyncrhonizationService implements MessageSynchronisationServiceInterface {
  7. settings: MessageSynchronisationServiceSetting
  8. sourceSrc: LoggingService = new LoggingService()
  9. targetSrc: LoggingService = new LoggingService()
  10. public async init(settings: MessageSynchronisationServiceSetting): Promise<void> {
  11. this.settings = settings;
  12. // Wrap promise so that init must be instantiated with resolve before proceeding with subscribe method
  13. let promiseInit: Promise<void> = new Promise((resolve, reject) => {
  14. try {
  15. this.sourceSrc.init(settings.incomingSource).then((data) => {
  16. if (!data) reject()
  17. // console.log(`File Storage:`)
  18. // data.forEach(e => console.log(e.appLogLocId))
  19. }).then(() => {
  20. this.targetSrc.init(settings.target).then((data) => {
  21. if (!data) reject()
  22. // console.log(`Mongo Storage`)
  23. // data.forEach(e => console.log(e.appLogLocId))
  24. resolve()
  25. })
  26. })
  27. }
  28. catch (e) {
  29. console.error(e)
  30. }
  31. })
  32. return promiseInit
  33. }
  34. public async subscribe(obs: Observable<string>): Promise<Observable<any>> {
  35. let subjectOutput = new Subject()
  36. this.acquireData().then((data) => {
  37. this.compareResult(data).then((data) => {
  38. const obs : Observable<any> = of(...data)
  39. obs.subscribe(subjectOutput)
  40. })
  41. }).catch((e) => console.error(e))
  42. return subjectOutput.asObservable()
  43. }
  44. // Acquires the available data from designated storage
  45. private async acquireData(): Promise<any> {
  46. const promiseQuery: Promise<any> = new Promise((resolve, reject) => {
  47. let allSets: any = {}
  48. let set1
  49. let set2
  50. this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data) => {
  51. set1 = data
  52. }).then(() => {
  53. this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data) => {
  54. set2 = data
  55. allSets.arr1 = set1
  56. allSets.arr2 = set2
  57. resolve(allSets)
  58. })
  59. })
  60. })
  61. return promiseQuery
  62. }
  63. // compare results and return differences
  64. private async compareResult(args: any): Promise<any> {
  65. return new Promise((resolve, reject) => {
  66. let data = []
  67. args.arr1.forEach((element) => {
  68. if (args.arr2.some(obj => obj.appData.msgId === element.appData.msgId)) {
  69. console.log(`Item Found!`)
  70. } else {
  71. console.log(`This ${element.appData.msgId} is not found`)
  72. data.push(element)
  73. resolve(data)
  74. }
  75. })
  76. })
  77. }
  78. }