test3c.ts 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. /* ----------------------- TEST3A {Mongo to Mongo} ----------------------- */
  2. /* This test is focusing on comparing 2 different arrays of message logs from 2 different storage.
  3. Which is local file mongo as the control/source, and then comparing the data from cloud mongoDB
  4. server data, and then synchronizing them */
  5. import { Observable, map, Subject, takeUntil, take, of, timer, from } from "rxjs";
  6. import { BaseMessage } from "../dependencies/fisappmessagejsutilty/dependencies/dependencies";
  7. import { LogSetting, MessageLog } from "../dependencies/fisloggingservice/type/datatype";
  8. import { IncomingMessageService } from "../services/incomingMessage.service";
  9. import { MessageSyncrhonizationService } from "../services/synchronization.service";
  10. import { ErrorTrigger, MessageSynchronisationServiceSetting } from "../type/datatype";
  11. import { StreamingService } from "./test-streamOBS";
  12. /* Pre - Defined Data && Settings */
  13. // This service will stream the messages from the local testRequest.json messages
  14. // into the designated location that will be specified later.
  15. const stream = new StreamingService()
  16. /* Using the instance of the streaming declared earlier, we feed 4 messages into the
  17. subscribers that are going to subsscribe to this source_payload. Please note that
  18. source_payload will emite the messages stream from the instance of stream service
  19. and further feed them into the other Subject which is called source_payload_subject. */
  20. const source_synchronize = new MessageSyncrhonizationService()
  21. const source_incoming = new IncomingMessageService()
  22. const source_payload: Observable<BaseMessage> = stream.stream().pipe(take(4))
  23. const source_payload_subject: Subject<BaseMessage> = new Subject()
  24. source_payload.subscribe({
  25. next: (data) => {
  26. source_payload_subject.next(data)
  27. }
  28. })
  29. /* Same thing as the above. The only difference is the we feed only 2 messages
  30. to simulate streaming error. We want to see if it will sync the other 2 later
  31. on. But generall the declarative structure is the same as the above. */
  32. const target_incoming = new IncomingMessageService()
  33. const target_payload: Observable<BaseMessage> = stream.stream().pipe(take(2))
  34. const target_payload_subject: Subject<BaseMessage> = new Subject()
  35. target_payload.subscribe({
  36. next: (data) => {
  37. target_payload_subject.next(<BaseMessage>data)
  38. },
  39. error: e => console.error(e),
  40. complete: () => { `Target Payload Completed` }
  41. })
  42. /* Declare the designated database. I am using windev's mongo storage to store the data.
  43. Hence here, is the block that definte the target and it's associated specifications.
  44. This will be the target and will receive the predefined set of data to be logged as
  45. prepared earlier in the code above.s */
  46. let source_storage: LogSetting = {
  47. cacheMessageLimit: 0,
  48. storage: "MongoDB",
  49. setting: {
  50. appName: 'Default from client',
  51. appLocName: 'To be generated in client',
  52. logLocName: 'To be generated in client',
  53. },
  54. customSetting: {
  55. server: "192.168.100.59:27017",
  56. database: "test"
  57. }
  58. }
  59. // I forgot what this is for lol...
  60. let source_dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> } = {
  61. storage: source_storage.storage,
  62. setting: source_storage.setting,
  63. customSetting: source_storage.customSetting,
  64. incomingObservable: source_payload_subject
  65. }
  66. /* Same as above. Also declaring another designated database. But this one will be used
  67. as the target for synching. For such I purposely push only half the of the completed
  68. dataset in order to test out the sync later. I am using my own cloud atlas mongo
  69. database on this. The address can always be changed. */
  70. let target_storage: LogSetting = {
  71. cacheMessageLimit: 0,
  72. storage: "MongoDB",
  73. setting: {
  74. appName: 'Default from client',
  75. appLocName: 'To be generated in client',
  76. logLocName: 'To be generated in client',
  77. },
  78. customSetting: {
  79. srv: true,
  80. user: "testDB",
  81. password: "h1nt1OyXw6QeUnzS",
  82. server: "cluster0.29sklte.mongodb.net",
  83. database: "log",
  84. }
  85. }
  86. let target_dataSet: LogSetting & { incomingObservable: Observable<BaseMessage> } = {
  87. storage: target_storage.storage,
  88. setting: target_storage.setting,
  89. customSetting: target_storage.customSetting,
  90. incomingObservable: target_payload_subject
  91. }
  92. // Combine source and target storage to form MessageSynchronisationServiceSetting
  93. let settings: MessageSynchronisationServiceSetting = {
  94. incomingSource: {
  95. //all of the settings to be combined here
  96. ...source_storage,
  97. tags: ['Incoming']
  98. }, //LogSetting & {tags:string[] },
  99. target: {
  100. ...target_storage,
  101. tags: ['Incoming']
  102. } //LogSetting & {tags:string[] }
  103. }
  104. /* -------- SYNCHRONIZATION --------- */
  105. // This is where the real test begin. THe process before this were merely just configuring
  106. // the settings of where to sync. Here the initial intialize data will first log the
  107. // messages into the designated database as specified earlier.
  108. function initializeData() { // To store the data into the designated databases.
  109. source_incoming.init(source_dataSet)
  110. target_incoming.init(target_dataSet)
  111. }
  112. // Done by appoximately 5-8 Seconds
  113. initializeData() // Call the function to store the data into the designated databases.
  114. source_synchronize.init(settings)
  115. /* This is where the synchronizatin happens. Upon initializin ghe target earlier, we
  116. will put in a trigger to execute the synching. It will check for missing datas and
  117. publish the missing messages accordingly. For this test purpose, we purposely set to
  118. 7 seconds to allow the logging process earlier to take place first before engaging
  119. in synching activity to prevent confusion and interference of the pre-logging stage. */
  120. setTimeout(() => {
  121. // This wil act as the trigger error. Although the definition of this error is
  122. // still subject for enhancements in the near future.
  123. let sampleError: ErrorTrigger = {
  124. status: 1,
  125. message: "NO. I dont want to work"
  126. }
  127. let triggerSync: Observable<ErrorTrigger> = from([sampleError])
  128. let sync = source_synchronize.subscribe(triggerSync)
  129. sync.subscribe({
  130. next: (msgToBeSynched) => {
  131. console.log(`synching ... ${msgToBeSynched.header.messageID}`)
  132. target_payload_subject.next(msgToBeSynched)
  133. }
  134. })
  135. }, 5000)