test3.ts 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. // Test Observable //
  2. import mongoose, { Model } from "mongoose";
  3. import { Subject, from, map, of, interval, buffer, asyncScheduler, observeOn, takeUntil, delay, queueScheduler, bufferWhen } from "rxjs";
  4. const used = process.memoryUsage();
  5. let MongooseConnection: mongoose.Connection
  6. let connectionStatus = 0
  7. let mongoStorage: any = {
  8. type: `MongoDB`,
  9. url: `mongodb://192.168.100.59:27017/fromEnzo`
  10. }
  11. let testSubject: Subject<any> = new Subject()
  12. let count = 0
  13. // Connect to designated storage destination
  14. async function connectMongo(storage: Storage) {
  15. return new Promise((resolve, reject) => {
  16. try {
  17. console.log(`Connecting to ${storage.url}`)
  18. MongooseConnection = mongoose.createConnection(storage.url)
  19. connectionStatus = 1
  20. resolve(connectionStatus)
  21. }
  22. catch (error) {
  23. connectionStatus = 0
  24. console.error('An error occurred while connecting to the database:', error);
  25. setTimeout(() => {
  26. connectMongo(storage).then(() => {
  27. resolve(connectionStatus)
  28. })
  29. console.log(`Reconnecting...`)
  30. }, 3000);
  31. }
  32. })
  33. }
  34. // Acquire data from Mongo
  35. function streamMongoData(storage: Storage, subjectStream: Subject<any>) {
  36. connectMongo(storage).then(() => {
  37. let message: Model<any> = MongooseConnection.model('Message', require('../types/message.schema'))
  38. let stream = message.find().limit(10).cursor()
  39. stream.on('data', (data: any) => subjectStream.next(data));
  40. stream.on('error', (error) => subjectStream.error(error));
  41. stream.on('end', () => subjectStream.complete());
  42. })
  43. }
  44. /* --------------- Understanding the Concepts && Behaviour --------------- */
  45. // Callbacks to be used to emulate high traffic observables subscription to observe it's behaviour.
  46. // What happens if the data received is used to call other functions that may take a while to finish, observe what happens to the stream and event stack.
  47. async function handler1(element): Promise<any> {
  48. return new Promise((resolve, reject) => {
  49. const min = 1;
  50. const max = 5;
  51. const randomInt = Math.floor(Math.random() * (max - min + 1)) + min;
  52. setTimeout(() => {
  53. console.log(`Handler 1: Processing data ${element} for ${randomInt} seconds.`)
  54. resolve(element)
  55. }, randomInt * 1000)
  56. })
  57. }
  58. function handler2(element: any) {
  59. const min = 1;
  60. const max = 5;
  61. const randomInt = Math.floor(Math.random() * (max - min + 1)) + min;
  62. setTimeout(() => {
  63. console.log(`Handler 2: Processing data ${element} for ${randomInt} seconds.`)
  64. handlers(element)
  65. }, randomInt * 1000)
  66. }
  67. function handlers(element: any) {
  68. const min = 1;
  69. const max = 5;
  70. const randomInt = Math.floor(Math.random() * (max - min + 1)) + min;
  71. setTimeout(() => {
  72. console.log(`Callback Handlers: Processing data ${element} for ${randomInt} seconds.`)
  73. }, randomInt * 1000)
  74. }
  75. function printLog() {
  76. const t0 = performance.now()
  77. let i
  78. for (i = 0; i <= 6000000000; i++) {
  79. }
  80. const t1 = performance.now()
  81. const timeTakenInSeconds = (t1 - t0) / 1000;
  82. console.log(`Time taken: ${timeTakenInSeconds} seconds to run this printLog()`);
  83. }
  84. /* Explanation: So the producer will emit 1 data very 1 second indefinitely. For the consumer, when they subscribes to the producer, the data will be consumed.
  85. So when the producer emits data, the next method of the consumer is called, and the following tasks are registered into the call stack or event queue:
  86. 1.The console.log() statement logs the received data to the console. This task is synchronous and is executed immediately.
  87. 2.The handler1() function is called with the received data as an argument. This task is asynchronous and is added to the event queue.
  88. 3.The then() method of the Promise returned by handler1() is called with a callback function as an argument. This task is also asynchronous and is added to the event queue.
  89. 4.The printLog() function is called. This task is synchronous and is executed immediately.
  90. After all synchronous tasks in the call stack are completed, the asynchronous tasks in the event queue are executed one by one, starting with the handler1() function call
  91. and followed by the then() callback function call.*/
  92. let publishDataEverySecond = interval(1000)
  93. let control = interval(5000)
  94. function understandingOBS() {
  95. publishDataEverySecond.subscribe({
  96. next: element => {
  97. console.log(`Data received: ${element}`)
  98. handler1(element).then((data) => { // asynchronous
  99. handler2(data) // setTimeout will put the call into the call stack
  100. })
  101. printLog() // synchronous: this must complete before the next data to be received
  102. }
  103. })
  104. }
  105. /* Buffer */
  106. let bufferring = publishDataEverySecond.pipe(buffer(control)) // standard buffer
  107. let buffered = publishDataEverySecond.pipe( // using buffer when
  108. bufferWhen(() => interval(1000 + Math.random() * 4000))
  109. );
  110. function bufferOBS() {
  111. buffered.subscribe({
  112. next(element) {
  113. console.log(`Data received: ${element}`)
  114. handler1(element).then((data) => {
  115. handler2(data)
  116. })
  117. }
  118. })
  119. }
  120. /* Scheduler */
  121. let scheduler = publishDataEverySecond.pipe(observeOn(asyncScheduler)) //async scheduler
  122. let source$ = interval(1000, queueScheduler); // queue scheduler
  123. let result$ = source$.pipe(takeUntil(control))
  124. function scheduleOBS() {
  125. result$.subscribe({
  126. next: element => {
  127. console.log(`Scheduler: ${element}`)
  128. }
  129. })
  130. }
  131. understandingOBS()