// Test Observable // import mongoose, { Model } from "mongoose"; import { Subject, from, map, of, interval } from "rxjs"; const used = process.memoryUsage(); let MongooseConnection: mongoose.Connection let connectionStatus = 0 let mongoStorage: any = { type: `MongoDB`, url: `mongodb://192.168.100.59:27017/fromEnzo` } let testSubject: Subject = new Subject() let count = 0 let data: any[] = [] // Connect to designated storage destination async function connectMongo(storage: Storage) { return new Promise((resolve, reject) => { try { console.log(`Connecting to ${storage.url}`) MongooseConnection = mongoose.createConnection(storage.url) connectionStatus = 1 resolve(connectionStatus) } catch (error) { connectionStatus = 0 console.error('An error occurred while connecting to the database:', error); setTimeout(() => { connectMongo(storage).then(() => { resolve(connectionStatus) }) console.log(`Reconnecting...`) }, 3000); } }) } // Acquire data from Mongo function streamMongoData(storage: Storage, subjectStream: Subject) { connectMongo(storage).then(() => { let message: Model = MongooseConnection.model('Message', require('../types/message.schema')) let stream = message.find().limit(10).cursor() stream.on('data', (data: any) => subjectStream.next(data)); stream.on('error', (error) => subjectStream.error(error)); stream.on('end', () => subjectStream.complete()); }) } /* --------------- TEST --------------- */ // testSubject.subscribe({ // next: (e) => { // setTimeout(() => { // count++ // console.log(count + '. ' + e.appData.msgId) // No problem streaming all the elements from mongo streaming (2.9 milliion) // }, 10000) // But it doesn't wait 10 seconds // } // }); function doThis(element: any) { const min = 1; const max = 10; const randomInt = Math.floor(Math.random() * (max - min + 1)) + min; setTimeout(() => { // console.log(`Doing random task for 3 seconds for ${element.appData.msgId}`) console.log(`Doing random task for ${randomInt} seconds for "${element}"`) }, randomInt * 1000) } async function doThat(element): Promise { return new Promise((resolve, reject) => { const min = 1; const max = 10; const randomInt = Math.floor(Math.random() * (max - min + 1)) + min; setTimeout(() => { // console.log(`Processing ${element.appData.msgId} for 3 seconds`) console.log(`Processing "${element}" for ${randomInt} seconds`) resolve(element) }, randomInt * 1000) }) } /* -------------- ACTION ------------------- */ // streamMongoData(mongoStorage, testSubject) // testSubject.subscribe({ // next(element) { // doThat(element).then((data) => { // doThis(data) // }) // }, // error(err) { // console.error('something wrong occurred: ' + err); // }, // complete() { // console.log('done'); // }, // }) /* --------- Just TESTING and understanding behaviour ---------- */ /* Explanation: Producer streams at a constant rate of broadcasting 1 data per second. The first task of emitting the value will continue without interruptioon, where as the callbacks for doThis() and doThat() will be registered and be called. If the callback is asynchronous, it will register it to the event stack to be executed. And it doesn't have to wait for the previous tasks to complete before it can start it's own tasks. If the callback is synchronous, it will register the callbacks to the event stack to be executed. It doesn't wait for previous tasks to be completed before it can start it's own tasks. */ let waitForOneSecond = interval(1000) waitForOneSecond.subscribe({ next(element) { console.log(`Broadcasting: ${element}`) //Asynchrounous Code // doThat(element).then((data) => { // doThis(data) // }) // Synchrounous doThis(element) } }) /* Ignore this for now, just wanted to see how it behaves if the observable is created from arrays. The value publishing was too fast, I could derive meaningful comprehension from the data. Plese refer to the study case above. */ // let obsArray = from([1, 2, 3, 4, 5]) // obsArray.subscribe({ // next(element) { // console.log(`Emmitting: ${element}`) // doThat(element).then((data) => { // doThis(data) // }) // } // }) // of(1, 2, 3, 4, 5).subscribe({ // next(element) { // console.log(`Streaming: ${element}`) // doThat(element).then((data) => { // doThis(data) // }) // } // })