|
@@ -1,7 +1,7 @@
|
|
// Test Observable //
|
|
// Test Observable //
|
|
|
|
|
|
import mongoose, { Model } from "mongoose";
|
|
import mongoose, { Model } from "mongoose";
|
|
-import { Subject, map } from "rxjs";
|
|
|
|
|
|
+import { Subject, from, map } from "rxjs";
|
|
const used = process.memoryUsage();
|
|
const used = process.memoryUsage();
|
|
|
|
|
|
let MongooseConnection: mongoose.Connection
|
|
let MongooseConnection: mongoose.Connection
|
|
@@ -14,7 +14,7 @@ let mongoStorage: any = {
|
|
|
|
|
|
let data: any[] = []
|
|
let data: any[] = []
|
|
|
|
|
|
-// Conect to designated storage destination
|
|
|
|
|
|
+// Connect to designated storage destination
|
|
async function connectMongo(storage: Storage) {
|
|
async function connectMongo(storage: Storage) {
|
|
return new Promise((resolve, reject) => {
|
|
return new Promise((resolve, reject) => {
|
|
try {
|
|
try {
|
|
@@ -36,12 +36,13 @@ async function connectMongo(storage: Storage) {
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+// Acquire data from Mongo
|
|
async function getMongoData(storage: Storage) {
|
|
async function getMongoData(storage: Storage) {
|
|
return new Promise<any>(async (resolve, reject) => {
|
|
return new Promise<any>(async (resolve, reject) => {
|
|
await connectMongo(storage);
|
|
await connectMongo(storage);
|
|
const Message: Model<any> = MongooseConnection.model('Message', require('../types/message.schema'));
|
|
const Message: Model<any> = MongooseConnection.model('Message', require('../types/message.schema'));
|
|
try {
|
|
try {
|
|
- data = await Message.find().limit(10000)
|
|
|
|
|
|
+ data = await Message.find().limit(100000)
|
|
resolve(data)
|
|
resolve(data)
|
|
} catch (err) {
|
|
} catch (err) {
|
|
console.error(err);
|
|
console.error(err);
|
|
@@ -50,35 +51,105 @@ async function getMongoData(storage: Storage) {
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
+/* --------------- TEST --------------- */
|
|
let testSubject: Subject<any> = new Subject()
|
|
let testSubject: Subject<any> = new Subject()
|
|
|
|
+let count = 0
|
|
|
|
+
|
|
|
|
+// Using Subjects
|
|
|
|
+// getMongoData(mongoStorage).then((data) => {
|
|
|
|
+// /* putting array into next */
|
|
|
|
|
|
|
|
+// testSubject.next(data) // returns the entire array as a single value
|
|
|
|
+// })
|
|
|
|
+
|
|
|
|
+/* Letting the system to load the array 1 by 1 into the subject. Speed subject to system/machine */
|
|
getMongoData(mongoStorage).then((data) => {
|
|
getMongoData(mongoStorage).then((data) => {
|
|
data.forEach(element => {
|
|
data.forEach(element => {
|
|
testSubject.next(element)
|
|
testSubject.next(element)
|
|
- // console.log(`Memory usage: ${used.heapUsed / 1024 / 1024} MB`);
|
|
|
|
})
|
|
})
|
|
})
|
|
})
|
|
|
|
|
|
-let count = 0
|
|
|
|
-
|
|
|
|
testSubject.subscribe({
|
|
testSubject.subscribe({
|
|
next: (e) => {
|
|
next: (e) => {
|
|
count++
|
|
count++
|
|
- console.log(count)
|
|
|
|
|
|
+ // console.log(count + '. ' + e.appData.msgId)
|
|
|
|
+ console.log(count + '. ' + e[0])
|
|
}
|
|
}
|
|
});
|
|
});
|
|
|
|
|
|
-let pipedObs = testSubject.pipe(
|
|
|
|
- map((element) => {
|
|
|
|
- let finalResponse: any = {
|
|
|
|
- message: 'Piped element'
|
|
|
|
- }
|
|
|
|
- return finalResponse
|
|
|
|
- })
|
|
|
|
-)
|
|
|
|
|
|
+/* -------------- TEST 2 ------------- */
|
|
|
|
+// Just making Observable from an array
|
|
|
|
+// getMongoData(mongoStorage).then((data) => {
|
|
|
|
+// let observableArray = from(data)
|
|
|
|
+// observableArray.subscribe({
|
|
|
|
+// next(x: any) {
|
|
|
|
+// count ++
|
|
|
|
+// console.log(count + '. ' + x.appData.msgId)
|
|
|
|
+// },
|
|
|
|
+// error(err) {
|
|
|
|
+// console.error('something wrong occurred: ' + err);
|
|
|
|
+// },
|
|
|
|
+// complete() {
|
|
|
|
+// console.log('done');
|
|
|
|
+// },
|
|
|
|
+// })
|
|
|
|
+// })
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+/* Additional Test */
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+// let pipedObs = testSubject.pipe(
|
|
|
|
+// map((element) => {
|
|
|
|
+// let finalResponse: any = {
|
|
|
|
+// message: 'Piped element'
|
|
|
|
+// }
|
|
|
|
+// return finalResponse
|
|
|
|
+// })
|
|
|
|
+// )
|
|
|
|
+
|
|
|
|
+// pipedObs.subscribe(element => {
|
|
|
|
+// count ++
|
|
|
|
+// console.log(element.message + count)
|
|
|
|
+// })
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+// function doThis() {
|
|
|
|
+// count++
|
|
|
|
+// console.log(`Task ${count} completed.`)
|
|
|
|
+// return count
|
|
|
|
+// }
|
|
|
|
+
|
|
|
|
+// // for (let i = 0; i < 500; i++) {
|
|
|
|
+// // setTimeout(() => {
|
|
|
|
+// // testSubject.next(doThis())
|
|
|
|
+// // }, 500)
|
|
|
|
+// // }
|
|
|
|
+
|
|
|
|
+// function callFunctionRepeatedlyWithFixedDelay() {
|
|
|
|
+// let count = 0;
|
|
|
|
+// const intervalId = setInterval(() => {
|
|
|
|
+// testSubject.next(doThis())
|
|
|
|
+// count++;
|
|
|
|
+// if (count === 500) {
|
|
|
|
+// clearInterval(intervalId);
|
|
|
|
+// }
|
|
|
|
+// }, 1000);
|
|
|
|
+// }
|
|
|
|
+
|
|
|
|
+// callFunctionRepeatedlyWithFixedDelay()
|
|
|
|
+// testSubject.subscribe((value) => {
|
|
|
|
+// console.log(`Task ${value} acknowledged!`)
|
|
|
|
+// })
|
|
|
|
|
|
-pipedObs.subscribe(element => {
|
|
|
|
- count ++
|
|
|
|
- console.log(element.message + count)
|
|
|
|
-})
|
|
|
|
|
|
|