浏览代码

updates on observables behaviour

Enzo 1 年之前
父节点
当前提交
20b3fa7794
共有 1 个文件被更改,包括 91 次插入102 次删除
  1. 91 102
      test/test3.ts

+ 91 - 102
test/test3.ts

@@ -1,7 +1,7 @@
 // Test Observable //
 
 import mongoose, { Model } from "mongoose";
-import { Subject, from, map } from "rxjs";
+import { Subject, from, map, of, interval } from "rxjs";
 const used = process.memoryUsage();
 
 let MongooseConnection: mongoose.Connection
@@ -12,6 +12,8 @@ let mongoStorage: any = {
     url: `mongodb://192.168.100.59:27017/fromEnzo`
 }
 
+let testSubject: Subject<any> = new Subject()
+let count = 0
 let data: any[] = []
 
 // Connect to designated storage destination
@@ -37,119 +39,106 @@ async function connectMongo(storage: Storage) {
 }
 
 // Acquire data from Mongo
-async function getMongoData(storage: Storage) {
-    return new Promise<any>(async (resolve, reject) => {
-        await connectMongo(storage);
-        const Message: Model<any> = MongooseConnection.model('Message', require('../types/message.schema'));
-        try {
-            data = await Message.find().limit(100000)
-            resolve(data)
-        } catch (err) {
-            console.error(err);
-        }
+function streamMongoData(storage: Storage, subjectStream: Subject<any>) {
+    connectMongo(storage).then(() => {
+        let message: Model<any> = MongooseConnection.model('Message', require('../types/message.schema'))
+        let stream = message.find().limit(10).cursor()
+
+        stream.on('data', (data: any) => subjectStream.next(data));
+        stream.on('error', (error) => subjectStream.error(error));
+        stream.on('end', () => subjectStream.complete());
     })
+
 }
 
 
 /* --------------- TEST --------------- */
-let testSubject: Subject<any> = new Subject()
-let count = 0
 
-// Using Subjects
-// getMongoData(mongoStorage).then((data) => {
-//     /* putting array into next */
+// testSubject.subscribe({
+//     next: (e) => {
+//         setTimeout(() => {
+//             count++
+//             console.log(count + '. ' + e.appData.msgId) // No problem streaming all the elements from mongo streaming (2.9 milliion)
+//         }, 10000) // But it doesn't wait 10 seconds
+//     }
+// });
+
+function doThis(element: any) {
+    const min = 1;
+    const max = 10;
+    const randomInt = Math.floor(Math.random() * (max - min + 1)) + min;
+    setTimeout(() => {
+        // console.log(`Doing random task for 3 seconds for ${element.appData.msgId}`)
+        console.log(`Doing random task for ${randomInt} seconds for "${element}"`)
+    }, randomInt * 1000)
+}
+async function doThat(element): Promise<any> {
+    return new Promise((resolve, reject) => {
+        const min = 1;
+        const max = 10;
+        const randomInt = Math.floor(Math.random() * (max - min + 1)) + min;
+        setTimeout(() => {
+            // console.log(`Processing ${element.appData.msgId} for 3 seconds`)
+            console.log(`Processing "${element}" for  ${randomInt} seconds`)
+            resolve(element)
+        }, randomInt * 1000)
+    })
+}
 
-//     testSubject.next(data) // returns the entire array as a single value
+/* -------------- ACTION ------------------- */
+// streamMongoData(mongoStorage, testSubject)
+// testSubject.subscribe({
+//     next(element) {
+//         doThat(element).then((data) => {
+//             doThis(data)
+//         })
+//     },
+//     error(err) {
+//         console.error('something wrong occurred: ' + err);
+//     },
+//     complete() {
+//         console.log('done');
+//     },
 // })
 
-/* Letting the system to load the array 1 by 1 into the subject. Speed subject to system/machine */
-getMongoData(mongoStorage).then((data) => {
-    data.forEach(element => {
-        testSubject.next(element)
-    })
+/* --------- Just TESTING and understanding behaviour ---------- */
+/* Explanation: Producer streams at a constant rate of broadcasting 1 data per second. The first task of emitting the value will continue
+without interruptioon, where as the callbacks for doThis() and doThat() will be registered and be called. 
+If the callback is asynchronous, it will register it to the event stack to be executed. And it doesn't have to wait for the previous 
+tasks to complete before it can start it's own tasks. 
+If the callback is synchronous, it will register the callbacks to the event stack to be executed. It doesn't wait for previous 
+tasks to be completed before it can start it's own tasks. */
+let waitForOneSecond = interval(1000)
+waitForOneSecond.subscribe({
+    next(element) {
+        console.log(`Broadcasting: ${element}`)
+        //Asynchrounous Code
+        // doThat(element).then((data) => {
+        //     doThis(data)
+        // })
+        // Synchrounous
+        doThis(element)
+    }
 })
 
-testSubject.subscribe({
-    next: (e) => {
-        count++
-        // console.log(count + '. ' + e.appData.msgId)
-        console.log(count + '. ' + e[0])
-    }
-});
-
-/* -------------- TEST 2 ------------- */
-// Just making Observable from an array
-// getMongoData(mongoStorage).then((data) => {
-//     let observableArray = from(data)
-//     observableArray.subscribe({
-//             next(x: any) {
-//                 count ++
-//                 console.log(count + '. ' + x.appData.msgId)
-//             },
-//             error(err) {
-//                 console.error('something wrong occurred: ' + err);
-//             },
-//             complete() {
-//                 console.log('done');
-//             },
+/* Ignore this for now, just wanted to see how it behaves if the observable is created from arrays. The
+value publishing was too fast, I could derive meaningful comprehension from the data. Plese refer to the
+study case above. */
+// let obsArray = from([1, 2, 3, 4, 5])
+// obsArray.subscribe({
+//     next(element) {
+//         console.log(`Emmitting: ${element}`)
+//         doThat(element).then((data) => {
+//             doThis(data)
 //         })
+//     }
 // })
 
-
-
-
-
-
-
-
-
-
-
-
-/*  Additional Test */
-
-
-// let pipedObs = testSubject.pipe(
-//     map((element) => {
-//         let finalResponse: any = {
-//             message: 'Piped element'
-//         }
-//         return finalResponse
-//     })
-// )
-
-// pipedObs.subscribe(element => {
-//     count ++
-//     console.log(element.message + count)
+// of(1, 2, 3, 4, 5).subscribe({
+//     next(element) {
+//         console.log(`Streaming: ${element}`)
+//         doThat(element).then((data) => {
+//             doThis(data)
+//         })
+//     }
 // })
-
-
-// function doThis() {
-//     count++
-//     console.log(`Task ${count} completed.`)
-//     return count
-// }
-
-// // for (let i = 0; i < 500; i++) {
-// //     setTimeout(() => {
-// //         testSubject.next(doThis())
-// //     }, 500)
-// // }
-
-// function callFunctionRepeatedlyWithFixedDelay() {
-//     let count = 0;
-//     const intervalId = setInterval(() => {
-//         testSubject.next(doThis())
-//         count++;
-//         if (count === 500) {
-//             clearInterval(intervalId);
-//         }
-//     }, 1000);
-// }
-
-// callFunctionRepeatedlyWithFixedDelay()
-// testSubject.subscribe((value) => {
-//     console.log(`Task ${value} acknowledged!`)
-// })  
-
-