test4.ts 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. /* ----------------------- TEST4 {Mongo to Mongo} ----------------------- */
  2. /* Same with test 3 but this one it will be working with CDMS or any other potential data.
  3. Highly advisable to refer to test3c for the overall explanation of the logic flow in these
  4. test cases. Test4 is an adjusted version of test3 to cater for the need to deal with
  5. different types of data aside from messageLogs. */
  6. /* Note: MessageAudit will not work if storage is FIle. Search does not work at logging service
  7. does not cater for File system storage */
  8. import * as mongoose from 'mongoose'
  9. import { Observable, map, Subject, takeUntil, take, of, timer, from } from "rxjs";
  10. import { ErrorTrigger, MessageSynchronisationServiceSetting } from "../type/datatype";
  11. import { StreamingService } from "./test-streamOBS";
  12. import { MessageAuditorService } from "../services/message-auditor.service";
  13. import { LoggingService } from '../dependencies/log/interface/export';
  14. import { BaseMessage } from '../dependencies/msgutil/interface/export';
  15. import { LogSetting, MessageLog } from '../dependencies/log/type/datatype';
  16. import * as fs from "fs"
  17. /* Convert all the non message data in the database into messageLog type. This is to ensure it's compatibility
  18. to be used by the interface from logging and audit message features. */
  19. const Schema = mongoose.Schema;
  20. // Use existing schema.
  21. const messageSchema = require('../dependencies/log/type/schemas/message.schema')
  22. // Create the fingerprint schema. This is the type of the data to be transformed into messageLog type
  23. const fingerPrintSchema = new Schema({
  24. uuid: { type: String, required: true, lowercase: true, unique: true },
  25. fileName: { type: String, required: true, lowercase: true },
  26. fileType: { type: String, required: true, lowercase: true },
  27. entityName: { type: String, required: true, lowercase: true },
  28. fileData: { type: Object, required: true },
  29. });
  30. /* For basic explanation, pleas refer to test3c. Here we are just instantiating audit and logging service for both
  31. the primary and the secondary soures. And then the instantiation of the corresponding subjects.
  32. The idea is that the subject will receive the missing info provided by the auditor and then log the
  33. missing data in the designated database location.
  34. */
  35. const auditor = new MessageAuditorService()
  36. const primary_Log = new LoggingService()
  37. const primary: Subject<MessageLog> = new Subject()
  38. primary.subscribe((element) => {
  39. console.log(`Primary Received ${element.appData.msgId}`)
  40. })
  41. const secondary_log = new LoggingService()
  42. const secondary: Subject<MessageLog> = new Subject()
  43. secondary.subscribe((element: MessageLog) => {
  44. console.log(`Secondary Received ${element.appData.msgId}`)
  45. convertMessageLogToCDMS(element)
  46. })
  47. /* For basic explanation, please refer to test3c. Declaration of the source and target location. */
  48. let primary_storage: LogSetting = {
  49. cacheMessageLimit: 0,
  50. storage: "MongoDB",
  51. setting: {
  52. appName: 'Default from client',
  53. appLocName: 'To be generated in client',
  54. logLocName: 'To be generated in client',
  55. },
  56. customSetting: {
  57. url: 'mongodb://192.168.100.59:27017/primary'
  58. // server: "192.168.100.59:27017",
  59. // database: "primary"
  60. }
  61. }
  62. let secondary_storage: LogSetting = {
  63. cacheMessageLimit: 0,
  64. storage: "MongoDB",
  65. setting: {
  66. appName: 'Default from client',
  67. appLocName: 'To be generated in client',
  68. logLocName: 'To be generated in client',
  69. },
  70. customSetting: {
  71. url: 'mongodb+srv://testDB:h1nt1OyXw6QeUnzS@cluster0.29sklte.mongodb.net/secondary'
  72. // srv: true,
  73. // user: "testDB",
  74. // password: "h1nt1OyXw6QeUnzS",
  75. // server: "cluster0.29sklte.mongodb.net",
  76. // database: "secondary",
  77. }
  78. }
  79. // Combine source and target storage to form MessageSynchronisationServiceSetting. This is required in messageAudit initialization
  80. let settings: MessageSynchronisationServiceSetting = {
  81. incomingSource: {
  82. //all of the settings to be combined here
  83. ...primary_storage,
  84. tags: ['default']
  85. }, //LogSetting & {tags:string[] },
  86. target: {
  87. ...secondary_storage,
  88. tags: ['default']
  89. } //LogSetting & {tags:string[] }
  90. }
  91. /* -------- SYNCHRONIZATION --------- */
  92. // Primary will call the syncrhonization service
  93. auditor.init(settings)
  94. /* This is where the synchronization logic is called. The errorSubject will act as a trigger
  95. mechanism to execute the synchronization. */
  96. let errorSubject: Subject<ErrorTrigger> = new Subject()
  97. // Subscribe to errorSubject notification
  98. let sync = auditor.subscribe(errorSubject)
  99. sync.subscribe({
  100. next: (msgToBeSynchronized: MessageLog) => {
  101. console.log(`passing missing message: ${msgToBeSynchronized.appData.msgId} into target/secondary subject.`)
  102. // the missing data returned will be pushed (next(message)) into the target payload.
  103. secondary.next(msgToBeSynchronized)
  104. }
  105. })
  106. // Set time oout for 5 seconds to allow the initial logging stage to complete it's logging
  107. // implementation first before proceedint to trigger the sync
  108. setTimeout(() => {
  109. // This wil act as the trigger error.Although the definition of this error is
  110. // still subject for enhancements in the near future.
  111. let sampleError: ErrorTrigger = {
  112. status: 1,
  113. message: "NO. I dont want to work"
  114. }
  115. errorSubject.next(sampleError)
  116. }, 3000)
  117. countdown()
  118. // Convert all the existing cdms into message log
  119. convertDataInMongo(primary_storage.customSetting.url)
  120. convertDataInMongo(secondary_storage.customSetting.url)
  121. // These declaration are for the secondary to log the converted missing data back in it's own collection at their corresponding servers
  122. const dbConnection = mongoose.createConnection("mongodb+srv://testDB:h1nt1OyXw6QeUnzS@cluster0.29sklte.mongodb.net/secondary")
  123. const dataModel = dbConnection.model('genericdata', fingerPrintSchema)
  124. // Manually log the missing data given by audit
  125. primary_Log.init(settings.incomingSource).then(() => {
  126. primary_Log.subscribe(primary)
  127. })
  128. secondary_log.init(settings.target).then(() => {
  129. secondary_log.subscribe(secondary)
  130. })
  131. // This function is used for convert existing generic data in the designated database to be prepared for
  132. // AuditMessage service.
  133. function convertDataInMongo(url: string) {
  134. // Create a subject to stream data received from query at mongo, instantiate convert service and also the database location to read the datas
  135. let data: Subject<any> = new Subject()
  136. let convertService = new LoggingService()
  137. let dbConnection = mongoose.createConnection(url)
  138. let dataModel = dbConnection.model('genericdata', fingerPrintSchema)
  139. // Once the data is queried, it will be streamed into the data Subject declared earlier
  140. dataModel.find().then((res) => {
  141. // console.log(res)
  142. res.forEach((element) => {
  143. data.next(element)
  144. })
  145. })
  146. // Assign a `handler` so to speak to handle the element receivd in the data Subject
  147. // This is where the transformation happens. The logic is written on the logging service side.
  148. // Once that is done, the transformed data will be saved again bacn in the mongo database in a different databse/collection
  149. data.subscribe((element) => {
  150. convertService.convertCDMStoMessageLog(element, settings.incomingSource.tags).then((result: MessageLog) => {
  151. console.log(`Converting fingerprint .... ${result.appData.msgId}`)
  152. primary.next(result)
  153. }).catch((err) => {
  154. console.error(err.message)
  155. });
  156. })
  157. }
  158. // TO be used by the secondary Subject to convert the message log it receives to complete the synchronization process.
  159. function convertMessageLogToCDMS(args: MessageLog) {
  160. secondary_log.convertMessageLogtoCDMS(args).then((result) => {
  161. dataModel.create(result)
  162. }).catch((err) => {
  163. console.error(err.message)
  164. });
  165. }
  166. /* THis is testing for generating error message to be fed into the error subject
  167. to act as additional trigger to exectute the synchronization when there's no internet
  168. connection. */ // THis part is not mandatory and can be commented out---
  169. const dns = require('dns');
  170. // Function to check internet connectivity. Basically just look up the site of example.com
  171. // using the built in libray of DNS.
  172. function checkInternetConnectivity() {
  173. dns.lookup('example.com', (err) => {
  174. if (err && err.code === 'ENOTFOUND') {
  175. let errorMsg: ErrorTrigger = {
  176. status: 0,
  177. message: `No internet connection`
  178. }
  179. errorSubject.next(errorMsg)
  180. } else {
  181. // Emit a message indicating internet connectivity
  182. // console.log('Internet connection is available');
  183. }
  184. });
  185. }
  186. // Interval time (in milliseconds) for checking connectivity
  187. const intervalTime = 1000; // Check every 1 second
  188. // Start checking connectivity at intervals
  189. const interval = setInterval(checkInternetConnectivity, intervalTime);
  190. // Stop checking connectivity after a certain duration (e.g., 1 minute)
  191. const duration = 60000; // 1 minute
  192. setTimeout(function () {
  193. clearInterval(interval);
  194. console.log('Internet connectivity monitoring stopped');
  195. }, duration);
  196. function countdown() {
  197. let seconds = 0;
  198. const countUpInterval = setInterval(() => {
  199. console.log(`Elapsed seconds: ${seconds}`);
  200. seconds++;
  201. }, 1000); // Update every second (1000 milliseconds)
  202. }