Browse Source

updated behaviour comprehension

Enzo 1 year ago
parent
commit
67f24fdd13
1 changed files with 89 additions and 81 deletions
  1. 89 81
      test/test3.ts

+ 89 - 81
test/test3.ts

@@ -1,7 +1,7 @@
 // Test Observable //
 // Test Observable //
 
 
 import mongoose, { Model } from "mongoose";
 import mongoose, { Model } from "mongoose";
-import { Subject, from, map, of, interval } from "rxjs";
+import { Subject, from, map, of, interval, buffer, asyncScheduler, observeOn, takeUntil, delay, queueScheduler, bufferWhen } from "rxjs";
 const used = process.memoryUsage();
 const used = process.memoryUsage();
 
 
 let MongooseConnection: mongoose.Connection
 let MongooseConnection: mongoose.Connection
@@ -14,7 +14,6 @@ let mongoStorage: any = {
 
 
 let testSubject: Subject<any> = new Subject()
 let testSubject: Subject<any> = new Subject()
 let count = 0
 let count = 0
-let data: any[] = []
 
 
 // Connect to designated storage destination
 // Connect to designated storage destination
 async function connectMongo(storage: Storage) {
 async function connectMongo(storage: Storage) {
@@ -52,93 +51,102 @@ function streamMongoData(storage: Storage, subjectStream: Subject<any>) {
 }
 }
 
 
 
 
-/* --------------- TEST --------------- */
 
 
-// testSubject.subscribe({
-//     next: (e) => {
-//         setTimeout(() => {
-//             count++
-//             console.log(count + '. ' + e.appData.msgId) // No problem streaming all the elements from mongo streaming (2.9 milliion)
-//         }, 10000) // But it doesn't wait 10 seconds
-//     }
-// });
 
 
-function doThis(element: any) {
-    const min = 1;
-    const max = 10;
-    const randomInt = Math.floor(Math.random() * (max - min + 1)) + min;
-    setTimeout(() => {
-        // console.log(`Doing random task for 3 seconds for ${element.appData.msgId}`)
-        console.log(`Doing random task for ${randomInt} seconds for "${element}"`)
-    }, randomInt * 1000)
-}
-async function doThat(element): Promise<any> {
+
+
+
+/* --------------- Understanding the Concepts && Behaviour --------------- */
+// Callbacks to be used to emulate high traffic observables subscription to observe it's behaviour.
+// What happens if the data received is used to call other functions that may take a while to finish, observe what happens to the stream and event stack.
+async function handler1(element): Promise<any> {
     return new Promise((resolve, reject) => {
     return new Promise((resolve, reject) => {
         const min = 1;
         const min = 1;
-        const max = 10;
+        const max = 5;
         const randomInt = Math.floor(Math.random() * (max - min + 1)) + min;
         const randomInt = Math.floor(Math.random() * (max - min + 1)) + min;
         setTimeout(() => {
         setTimeout(() => {
-            // console.log(`Processing ${element.appData.msgId} for 3 seconds`)
-            console.log(`Processing "${element}" for  ${randomInt} seconds`)
+            console.log(`Handler 1: Processing data ${element} for ${randomInt} seconds.`)
             resolve(element)
             resolve(element)
         }, randomInt * 1000)
         }, randomInt * 1000)
     })
     })
 }
 }
-
-/* -------------- ACTION ------------------- */
-// streamMongoData(mongoStorage, testSubject)
-// testSubject.subscribe({
-//     next(element) {
-//         doThat(element).then((data) => {
-//             doThis(data)
-//         })
-//     },
-//     error(err) {
-//         console.error('something wrong occurred: ' + err);
-//     },
-//     complete() {
-//         console.log('done');
-//     },
-// })
-
-/* --------- Just TESTING and understanding behaviour ---------- */
-/* Explanation: Producer streams at a constant rate of broadcasting 1 data per second. The first task of emitting the value will continue
-without interruptioon, where as the callbacks for doThis() and doThat() will be registered and be called. 
-If the callback is asynchronous, it will register it to the event stack to be executed. And it doesn't have to wait for the previous 
-tasks to complete before it can start it's own tasks. 
-If the callback is synchronous, it will register the callbacks to the event stack to be executed. It doesn't wait for previous 
-tasks to be completed before it can start it's own tasks. */
-let waitForOneSecond = interval(1000)
-waitForOneSecond.subscribe({
-    next(element) {
-        console.log(`Broadcasting: ${element}`)
-        //Asynchrounous Code
-        // doThat(element).then((data) => {
-        //     doThis(data)
-        // })
-        // Synchrounous
-        doThis(element)
+function handler2(element: any) {
+    const min = 1;
+    const max = 5;
+    const randomInt = Math.floor(Math.random() * (max - min + 1)) + min;
+    setTimeout(() => {
+        console.log(`Handler 2: Processing data ${element} for ${randomInt} seconds.`)
+        handlers(element)
+    }, randomInt * 1000)
+}
+function handlers(element: any) {
+    const min = 1;
+    const max = 5;
+    const randomInt = Math.floor(Math.random() * (max - min + 1)) + min;
+    setTimeout(() => {
+        console.log(`Callback Handlers: Processing data ${element} for ${randomInt} seconds.`)
+    }, randomInt * 1000)
+}
+function printLog() {
+    const t0 = performance.now()
+    let i
+    for (i = 0; i <= 6000000000; i++) {
     }
     }
-})
-
-/* Ignore this for now, just wanted to see how it behaves if the observable is created from arrays. The
-value publishing was too fast, I could derive meaningful comprehension from the data. Plese refer to the
-study case above. */
-// let obsArray = from([1, 2, 3, 4, 5])
-// obsArray.subscribe({
-//     next(element) {
-//         console.log(`Emmitting: ${element}`)
-//         doThat(element).then((data) => {
-//             doThis(data)
-//         })
-//     }
-// })
-
-// of(1, 2, 3, 4, 5).subscribe({
-//     next(element) {
-//         console.log(`Streaming: ${element}`)
-//         doThat(element).then((data) => {
-//             doThis(data)
-//         })
-//     }
-// })
+    const t1 = performance.now()
+    const timeTakenInSeconds = (t1 - t0) / 1000;
+    console.log(`Time taken: ${timeTakenInSeconds} seconds to run this printLog()`);
+}
+
+/* Explanation: So the producer will emit 1 data very 1 second indefinitely. For the consumer, when they subscribes to the producer, the data will be consumed.
+So when the producer emits data, the next method of the consumer is called, and the following tasks are registered into the call stack or event queue:
+
+1.The console.log() statement logs the received data to the console. This task is synchronous and is executed immediately.
+2.The handler1() function is called with the received data as an argument. This task is asynchronous and is added to the event queue.
+3.The then() method of the Promise returned by handler1() is called with a callback function as an argument. This task is also asynchronous and is added to the event queue.
+4.The printLog() function is called. This task is synchronous and is executed immediately.
+
+After all synchronous tasks in the call stack are completed, the asynchronous tasks in the event queue are executed one by one, starting with the handler1() function call
+and followed by the then() callback function call.*/
+let publishDataEverySecond = interval(1000)
+let control = interval(5000)
+function understandingOBS() {
+    publishDataEverySecond.subscribe({
+        next: element => {
+            console.log(`Data received: ${element}`)
+            handler1(element).then((data) => { // asynchronous
+                handler2(data) // setTimeout will put the call into the call stack
+            })
+            printLog() // synchronous: this must complete before the next data to be received
+        }
+    })
+}
+
+/* Buffer */
+let bufferring = publishDataEverySecond.pipe(buffer(control)) // standard buffer 
+let buffered = publishDataEverySecond.pipe( // using buffer when
+    bufferWhen(() => interval(1000 + Math.random() * 4000))
+);
+function bufferOBS() {
+    buffered.subscribe({
+        next(element) {
+            console.log(`Data received: ${element}`)
+            handler1(element).then((data) => {
+                handler2(data)
+            })
+        }
+    })
+}
+
+/*  Scheduler */
+let scheduler = publishDataEverySecond.pipe(observeOn(asyncScheduler)) //async scheduler
+let source$ = interval(1000, queueScheduler); // queue scheduler
+let result$ = source$.pipe(takeUntil(control))
+function scheduleOBS() {
+    result$.subscribe({
+        next: element => {
+            console.log(`Scheduler: ${element}`)
+        }
+    })
+}
+
+understandingOBS()