synchronization.service.ts 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import { resolve } from "path";
  2. import { Observable, of, Subject } from "rxjs";
  3. import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
  4. import { LoggingService } from "../dependencies/fisloggingservice/services/logging-service";
  5. import { MessageSynchronisationServiceInterface, MessageSynchronisationServiceSetting } from "../type/datatype";
  6. export class MessageSyncrhonizationService implements MessageSynchronisationServiceInterface {
  7. settings: MessageSynchronisationServiceSetting
  8. sourceSrc: LoggingService = new LoggingService()
  9. targetSrc: LoggingService = new LoggingService()
  10. public async init(settings: MessageSynchronisationServiceSetting): Promise<void> {
  11. this.settings = settings;
  12. // Wrap promise so that init must be instantiated with resolve before proceeding with subscribe method
  13. let promiseInit: Promise<void> = new Promise((resolve, reject) => {
  14. try {
  15. this.sourceSrc.init(settings.incomingSource).then((data) => {
  16. if (!data) reject()
  17. // console.log(`File Storage:`)
  18. // data.forEach(e => console.log(e.appLogLocId))
  19. }).then(() => {
  20. this.targetSrc.init(settings.target).then((data) => {
  21. if (!data) reject()
  22. // console.log(`Mongo Storage`)
  23. // data.forEach(e => console.log(e.appLogLocId))
  24. resolve()
  25. })
  26. })
  27. }
  28. catch (e) {
  29. console.error(e)
  30. }
  31. })
  32. return promiseInit
  33. }
  34. public async subscribe(obs: Observable<string>): Promise<Observable<any>> {
  35. let subjectOutput = new Subject()
  36. this.acquireData().then((data) => {
  37. this.compareResult(data).then((data) => {
  38. const obs : Observable<any> = of(...data)
  39. obs.subscribe(subjectOutput)
  40. // log remaining data?
  41. this.targetSrc.subscribe(obs)
  42. })
  43. }).catch((e) => console.error(e))
  44. return subjectOutput.asObservable()
  45. }
  46. // Acquires the available data from designated storage
  47. private async acquireData(): Promise<any> {
  48. const promiseQuery: Promise<any> = new Promise((resolve, reject) => {
  49. let allSets: any = {}
  50. let set1
  51. let set2
  52. this.sourceSrc.filter({ msgTag: this.settings.incomingSource.tags[0] }).then((data) => {
  53. set1 = data
  54. }).then(() => {
  55. this.targetSrc.filter({ msgTag: this.settings.target.tags[0] }).then((data) => {
  56. set2 = data
  57. allSets.arr1 = set1
  58. allSets.arr2 = set2
  59. resolve(allSets)
  60. })
  61. })
  62. })
  63. return promiseQuery
  64. }
  65. // compare results and return differences
  66. private async compareResult(args: any): Promise<any> {
  67. return new Promise((resolve, reject) => {
  68. let data = []
  69. args.arr1.forEach((element) => {
  70. if (args.arr2.some(obj => obj.appData.msgId === element.appData.msgId)) {
  71. console.log(`Item Found!`)
  72. } else {
  73. console.log(`This ${element.appData.msgId} is not found`)
  74. data.push(element)
  75. resolve(data)
  76. }
  77. })
  78. })
  79. }
  80. }