test3a.ts 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. /* ----------------------- TEST3A {File to Mongo} ----------------------- */
  2. /* This test is focusing on comparing 2 different arrays of message logs from 2 different storage.
  3. Which is local file storage as the control/source, and then comparing the data from cloud mongoDB
  4. server data, and then synchronizing them */
  5. import { Observable, map, Subject, takeUntil, take, of, timer, from } from "rxjs";
  6. import { IncomingMessageService } from "../services/incomingMessage.service";
  7. import { MessageAuditorService } from "../services/message-auditor.service";
  8. import { ErrorTrigger, MessageSynchronisationServiceSetting } from "../type/datatype";
  9. import { StreamingService } from "./test-streamOBS";
  10. import { BaseMessage } from "../dependencies/msgutil/interface/export";
  11. import { LogSetting } from "../dependencies/log/type/datatype";
  12. /* Pre - Defined Data && Settings */
  13. const stream = new StreamingService()
  14. // Declare source Services && Observables (Using File Storage) Simulating Full Logs
  15. const source_synchronize = new MessageAuditorService()
  16. const source_payload: Observable<BaseMessage> = stream.stream().pipe(take(4))
  17. const source_incoming = new IncomingMessageService()
  18. const source_payload_subject: Subject<BaseMessage> = new Subject()
  19. source_payload.subscribe({
  20. next: (data) => {
  21. source_payload_subject.next(data)
  22. // console.log(data)
  23. }
  24. })
  25. // Declare target Services && Observables (Using MongoDB Storage) Simulating Partial Logs
  26. const target_payload: Observable<BaseMessage> = stream.stream().pipe(take(2))
  27. const target_payload_subject: Subject<BaseMessage> = new Subject()
  28. const target_incoming = new IncomingMessageService()
  29. target_payload.subscribe({
  30. next: (data) => {
  31. target_payload_subject.next(<BaseMessage>data)
  32. },
  33. error: e => console.error(e),
  34. complete: () => { `Target Payload Completed` }
  35. })
  36. // testing to see if data is sent in
  37. target_payload_subject.subscribe({
  38. next: element => {
  39. console.log(`target_payload_subject emits :00 ${element.header.messageID}`)
  40. }
  41. })
  42. // Declare Source Storage
  43. let source_storage: LogSetting = {
  44. storage: "File",
  45. setting: {
  46. appName: 'Default from client',
  47. appLocName: 'To be generated in client',
  48. logLocName: 'To be generated in client',
  49. }
  50. }
  51. let source_dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> } = {
  52. storage: source_storage.storage,
  53. setting: source_storage.setting,
  54. customSetting: source_storage.customSetting,
  55. incomingObservable: source_payload_subject
  56. }
  57. //Declare Target Storage
  58. let target_storage: LogSetting = {
  59. storage: "MongoDB",
  60. cacheMessageLimit: 0,
  61. setting: {
  62. appName: 'Default from client',
  63. appLocName: 'To be generated in client',
  64. logLocName: 'To be generated in client',
  65. },
  66. customSetting: {
  67. srv: true,
  68. user: "testDB",
  69. password: "h1nt1OyXw6QeUnzS",
  70. server: "cluster0.29sklte.mongodb.net",
  71. database: "log",
  72. }
  73. }
  74. let target_dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> } = {
  75. storage: target_storage.storage,
  76. setting: target_storage.setting,
  77. customSetting: target_storage.customSetting,
  78. incomingObservable: target_payload_subject
  79. }
  80. // Combine source and target storage to form MessageSynchronisationServiceSetting
  81. let settings: MessageSynchronisationServiceSetting = {
  82. incomingSource: {
  83. //all of the settings to be combined here
  84. ...source_storage,
  85. tags: ['Incoming']
  86. }, //LogSetting & {tags:string[] },
  87. target: {
  88. ...target_storage,
  89. tags: ['Incoming']
  90. } //LogSetting & {tags:string[] }
  91. }
  92. /* -------- SYNCHRONIZATION --------- */
  93. function initializeData() {
  94. source_incoming.init(source_dataSet)
  95. target_incoming.init(target_dataSet)
  96. }
  97. // Done by appoximately 5-8 Seconds
  98. initializeData()
  99. source_synchronize.init(settings)
  100. // by 10th second
  101. setTimeout(() => {
  102. let sampleError: ErrorTrigger = {
  103. status: 1,
  104. message: "NO. I dont want to work"
  105. }
  106. let triggerSync = from([sampleError])
  107. let sync = source_synchronize.subscribe(triggerSync)
  108. sync.subscribe({
  109. next: (msgToBeSynchronized) => {
  110. let raw = msgToBeSynchronized.appData.msgPayload
  111. let data: BaseMessage = JSON.parse(<string>raw)
  112. // console.log(`synching ... ${msgToBeSynchronized.header.messageID}`)
  113. target_payload_subject.next(data)
  114. }
  115. })
  116. }, 7000)//30s