test3.ts 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. // Test Observable //
  2. import mongoose, { Model } from "mongoose";
  3. import { Subject, from, map, of, interval, buffer, asyncScheduler, observeOn, takeUntil, delay, queueScheduler, bufferWhen } from "rxjs";
  4. import { queryService } from "../services/query.service";
  5. const used = process.memoryUsage();
  6. let MongooseConnection: mongoose.Connection
  7. let connectionStatus = 0
  8. let mongoStorage: any = {
  9. type: `MongoDB`,
  10. url: `mongodb://192.168.100.59:27017/fromEnzo`
  11. }
  12. let testSubject: Subject<any> = new Subject()
  13. let count = 0
  14. // Connect to designated storage destination
  15. async function connectMongo(storage: Storage) {
  16. return new Promise((resolve, reject) => {
  17. try {
  18. console.log(`Connecting to ${storage.url}`)
  19. MongooseConnection = mongoose.createConnection(storage.url)
  20. connectionStatus = 1
  21. resolve(connectionStatus)
  22. }
  23. catch (error) {
  24. connectionStatus = 0
  25. console.error('An error occurred while connecting to the database:', error);
  26. setTimeout(() => {
  27. connectMongo(storage).then(() => {
  28. resolve(connectionStatus)
  29. })
  30. console.log(`Reconnecting...`)
  31. }, 3000);
  32. }
  33. })
  34. }
  35. // Acquire data from Mongo
  36. function streamMongoData(storage: Storage, subjectStream: Subject<any>) {
  37. connectMongo(storage).then(() => {
  38. let message: Model<any> = MongooseConnection.model('Message', require('../types/message.schema'))
  39. let stream = message.find().limit(10).cursor()
  40. stream.on('data', (data: any) => subjectStream.next(data));
  41. stream.on('error', (error) => subjectStream.error(error));
  42. stream.on('end', () => subjectStream.complete());
  43. })
  44. }
  45. /* --------------- Understanding the Concepts && Behaviour --------------- */
  46. // Callbacks to be used to emulate high traffic observables subscription to observe it's behaviour.
  47. // What happens if the data received is used to call other functions that may take a while to finish, observe what happens to the stream and event stack.
  48. async function handler1(element): Promise<any> {
  49. return new Promise((resolve, reject) => {
  50. const min = 1;
  51. const max = 5;
  52. const randomInt = Math.floor(Math.random() * (max - min + 1)) + min;
  53. setTimeout(() => {
  54. console.log(`Handler 1: Processing data ${element} for ${randomInt} seconds.`)
  55. resolve(element)
  56. }, randomInt * 1000)
  57. })
  58. }
  59. function handler2(element: any) {
  60. const min = 1;
  61. const max = 5;
  62. const randomInt = Math.floor(Math.random() * (max - min + 1)) + min;
  63. setTimeout(() => {
  64. console.log(`Handler 2: Processing data ${element} for ${randomInt} seconds.`)
  65. handlers(element)
  66. }, randomInt * 1000)
  67. }
  68. function handlers(element: any) {
  69. const min = 1;
  70. const max = 5;
  71. const randomInt = Math.floor(Math.random() * (max - min + 1)) + min;
  72. setTimeout(() => {
  73. console.log(`Callback Handlers: Processing data ${element} for ${randomInt} seconds.`)
  74. }, randomInt * 1000)
  75. }
  76. function printLog() {
  77. let service = new queryService()
  78. service.callFromOtherClass()
  79. }
  80. /* Explanation: So the producer will emit 1 data very 1 second indefinitely. For the consumer, when they subscribes to the producer, the data will be consumed.
  81. So when the producer emits data, the next method of the consumer is called, and the following tasks are registered into the call stack or event queue:
  82. 1.The console.log() statement logs the received data to the console. This task is synchronous and is executed immediately.
  83. 2.The handler1() function is called with the received data as an argument. This task is asynchronous and is added to the event queue.
  84. The then() method of the Promise returned by handler1() is called with a callback function as an argument. This task is also asynchronous and is added to the event queue.
  85. 3.The printLog() function is called. This task is synchronous and is executed immediately.
  86. After all synchronous tasks in the call stack are completed, the asynchronous tasks in the event queue are executed one by one, starting with the handler1() function call
  87. and followed by the then() callback function call.*/
  88. let publishDataEverySecond = interval(1000)
  89. let control = interval(5000)
  90. function understandingOBS() {
  91. publishDataEverySecond.subscribe({
  92. next: element => {
  93. console.log(`Data received: ${element}`)
  94. handler1(element).then((data) => { // asynchronous
  95. handler2(data) // setTimeout will put the call into the call stack
  96. })
  97. printLog()
  98. console.trace() // Checking what's in the stack call
  99. const used = process.memoryUsage().heapUsed / 1024 / 1024;
  100. console.log(`Heap memory used: ${Math.round(used * 100) / 100} MB`);
  101. }
  102. })
  103. }
  104. understandingOBS()
  105. /* Buffer */
  106. function bufferOBS() {
  107. /* This code defines a function called bufferOBS that creates a new observable called bufferring using
  108. the buffer operator. The buffer operator collects all the emissions from the source observable (publishDataEverySecond)
  109. into an array and emits the array each time a notification is emitted from the control observable (control).
  110. The buffer operator takes the control observable as a parameter. In this case, control is not defined in
  111. the code snippet you provided, but it should be an observable that emits a notification to trigger the buffer emission.
  112. After creating the bufferring observable, the code subscribes to it using the subscribe method. The subscribe
  113. method takes a callback function that is invoked each time an element is emitted from the bufferring observable.
  114. Inside the callback function, we log the received data using console.log. We then call handler1 with the received
  115. data and chain a then operator to handle the result. In the then block, we call handler2 with the transformed data. */
  116. let bufferring = publishDataEverySecond.pipe(buffer(control)) // standard buffer
  117. bufferring.subscribe((element) => {
  118. console.log(`Data received: ${element}`)
  119. handler1(element).then((data) => {
  120. handler2(data)
  121. })
  122. })
  123. }
  124. function bufferWhenOBS() {
  125. let buffered = publishDataEverySecond.pipe(
  126. /* This code creates a buffered observable that buffers emissions from publishDataEverySecond.
  127. The bufferWhen operator is used to determine when to buffer data. In this case, it uses the
  128. interval operator to emit a value every random time between 1000 and 5000 milliseconds. This
  129. means that every 1 to 5 seconds, the buffered observable will emit an array of the buffered values. */
  130. bufferWhen(() => interval(1000 + Math.random() * 4000))
  131. );
  132. /* This code defines a function bufferOBS that subscribes to the buffered observable and processes the
  133. buffered data by calling handler1 and handler2 on each element. When new data arrives, the next callback
  134. is invoked with an array of the buffered values.
  135. Inside the next callback, we log the received data using console.log. We then call handler1 with the
  136. received data and chain a then operator to handle the result. In the then block, we call handler2
  137. with the transformed data.
  138. Overall, this code sets up a pipeline of RxJS operators to create a buffered observable that buffers
  139. emissions from the source observable for a certain amount of time. It then processes the buffered
  140. data using the handler1 and handler2 functions. This is useful when you want to group emissions
  141. from a source observable and process them together, for example, to reduce network traffic or to
  142. process data in batches.
  143. Please not that the handler handles the array, not the individual data. A separate function will
  144. need to be designed in order to process each of the individual values from the emitted array. */
  145. buffered.subscribe({
  146. next(element) {
  147. console.log(`Data received: ${element}`)
  148. handler1(element).then((data) => {
  149. handler2(data)
  150. })
  151. }
  152. })
  153. }
  154. bufferWhenOBS()
  155. /* Scheduler */
  156. function asyncScheduleOBS() {
  157. const scheduleObservable = of(`Hello Observable passing through`)
  158. const delayTime = 3000;
  159. /* In this example, the source$ observable emits the value "Hello Observable passing through". We pass the source$ observable\
  160. as a parameter to the task function, which is executed after a delay of 3 second using the asyncScheduler.
  161. Inside the task function, we subscribe to the source$ observable and log its emitted values to the console.
  162. By using an observable as the task parameter, you can create complex logic that can be executed at a specific time or interval. */
  163. let subscription = asyncScheduler.schedule(() => {
  164. scheduleObservable.subscribe((element) => {
  165. console.log(element)
  166. })
  167. }, delayTime);
  168. }
  169. function queue_Scheduler() {
  170. /* In this example, we use the observeOn operator to apply the queueScheduler to the observable stream.
  171. We then subscribe to the stream and execute three tasks (task1, task2, and task3) in order using the queueScheduler.
  172. The queueScheduler ensures that the tasks are executed in the order they were added to the queue, waiting for each task to
  173. complete before executing the next one. This is useful for tasks that need to run in order and should not be interrupted by other tasks.
  174. Note that the queueScheduler is a synchronous scheduler, which means that tasks scheduled using this scheduler will be executed
  175. synchronously. If you need to schedule tasks asynchronously, you can use the asyncScheduler or other schedulers that provide asynchronous execution.*/
  176. let task1 = () => console.log('Task 1');
  177. let task2 = () => console.log('Task 2');
  178. let task3 = () => console.log('Task 3');
  179. of(null)
  180. .pipe(
  181. observeOn(queueScheduler)
  182. )
  183. .subscribe(() => {
  184. task1();
  185. task2();
  186. task3();
  187. });
  188. }