// Test Observable // import mongoose, { Model } from "mongoose"; import { Subject, from, map } from "rxjs"; const used = process.memoryUsage(); let MongooseConnection: mongoose.Connection let connectionStatus = 0 let mongoStorage: any = { type: `MongoDB`, url: `mongodb://192.168.100.59:27017/fromEnzo` } let data: any[] = [] // Connect to designated storage destination async function connectMongo(storage: Storage) { return new Promise((resolve, reject) => { try { console.log(`Connecting to ${storage.url}`) MongooseConnection = mongoose.createConnection(storage.url) connectionStatus = 1 resolve(connectionStatus) } catch (error) { connectionStatus = 0 console.error('An error occurred while connecting to the database:', error); setTimeout(() => { connectMongo(storage).then(() => { resolve(connectionStatus) }) console.log(`Reconnecting...`) }, 3000); } }) } // Acquire data from Mongo async function getMongoData(storage: Storage) { return new Promise(async (resolve, reject) => { await connectMongo(storage); const Message: Model = MongooseConnection.model('Message', require('../types/message.schema')); try { data = await Message.find().limit(100000) resolve(data) } catch (err) { console.error(err); } }) } /* --------------- TEST --------------- */ let testSubject: Subject = new Subject() let count = 0 // Using Subjects // getMongoData(mongoStorage).then((data) => { // /* putting array into next */ // testSubject.next(data) // returns the entire array as a single value // }) /* Letting the system to load the array 1 by 1 into the subject. Speed subject to system/machine */ getMongoData(mongoStorage).then((data) => { data.forEach(element => { testSubject.next(element) }) }) testSubject.subscribe({ next: (e) => { count++ // console.log(count + '. ' + e.appData.msgId) console.log(count + '. ' + e[0]) } }); /* -------------- TEST 2 ------------- */ // Just making Observable from an array // getMongoData(mongoStorage).then((data) => { // let observableArray = from(data) // observableArray.subscribe({ // next(x: any) { // count ++ // console.log(count + '. ' + x.appData.msgId) // }, // error(err) { // console.error('something wrong occurred: ' + err); // }, // complete() { // console.log('done'); // }, // }) // }) /* Additional Test */ // let pipedObs = testSubject.pipe( // map((element) => { // let finalResponse: any = { // message: 'Piped element' // } // return finalResponse // }) // ) // pipedObs.subscribe(element => { // count ++ // console.log(element.message + count) // }) // function doThis() { // count++ // console.log(`Task ${count} completed.`) // return count // } // // for (let i = 0; i < 500; i++) { // // setTimeout(() => { // // testSubject.next(doThis()) // // }, 500) // // } // function callFunctionRepeatedlyWithFixedDelay() { // let count = 0; // const intervalId = setInterval(() => { // testSubject.next(doThis()) // count++; // if (count === 500) { // clearInterval(intervalId); // } // }, 1000); // } // callFunctionRepeatedlyWithFixedDelay() // testSubject.subscribe((value) => { // console.log(`Task ${value} acknowledged!`) // })