// Test Observable // import mongoose, { Model } from "mongoose"; import { Subject, map } from "rxjs"; const used = process.memoryUsage(); let MongooseConnection: mongoose.Connection let connectionStatus = 0 let mongoStorage: any = { type: `MongoDB`, url: `mongodb://192.168.100.59:27017/fromEnzo` } let data: any[] = [] // Conect to designated storage destination async function connectMongo(storage: Storage) { return new Promise((resolve, reject) => { try { console.log(`Connecting to ${storage.url}`) MongooseConnection = mongoose.createConnection(storage.url) connectionStatus = 1 resolve(connectionStatus) } catch (error) { connectionStatus = 0 console.error('An error occurred while connecting to the database:', error); setTimeout(() => { connectMongo(storage).then(() => { resolve(connectionStatus) }) console.log(`Reconnecting...`) }, 3000); } }) } async function getMongoData(storage: Storage) { return new Promise(async (resolve, reject) => { await connectMongo(storage); const Message: Model = MongooseConnection.model('Message', require('../types/message.schema')); try { data = await Message.find().limit(10000) resolve(data) } catch (err) { console.error(err); } }) } let testSubject: Subject = new Subject() getMongoData(mongoStorage).then((data) => { data.forEach(element => { testSubject.next(element) // console.log(`Memory usage: ${used.heapUsed / 1024 / 1024} MB`); }) }) let count = 0 testSubject.subscribe({ next: (e) => { count++ console.log(count) } }); let pipedObs = testSubject.pipe( map((element) => { let finalResponse: any = { message: 'Piped element' } return finalResponse }) ) pipedObs.subscribe(element => { count ++ console.log(element.message + count) })