|
@@ -2,6 +2,7 @@
|
|
|
|
|
|
import mongoose, { Model } from "mongoose";
|
|
import mongoose, { Model } from "mongoose";
|
|
import { Subject, from, map, of, interval, buffer, asyncScheduler, observeOn, takeUntil, delay, queueScheduler, bufferWhen } from "rxjs";
|
|
import { Subject, from, map, of, interval, buffer, asyncScheduler, observeOn, takeUntil, delay, queueScheduler, bufferWhen } from "rxjs";
|
|
|
|
+import { queryService } from "../services/query.service";
|
|
const used = process.memoryUsage();
|
|
const used = process.memoryUsage();
|
|
|
|
|
|
let MongooseConnection: mongoose.Connection
|
|
let MongooseConnection: mongoose.Connection
|
|
@@ -88,13 +89,8 @@ function handlers(element: any) {
|
|
}, randomInt * 1000)
|
|
}, randomInt * 1000)
|
|
}
|
|
}
|
|
function printLog() {
|
|
function printLog() {
|
|
- const t0 = performance.now()
|
|
|
|
- let i
|
|
|
|
- for (i = 0; i <= 6000000000; i++) {
|
|
|
|
- }
|
|
|
|
- const t1 = performance.now()
|
|
|
|
- const timeTakenInSeconds = (t1 - t0) / 1000;
|
|
|
|
- console.log(`Time taken: ${timeTakenInSeconds} seconds to run this printLog()`);
|
|
|
|
|
|
+ let service = new queryService()
|
|
|
|
+ service.callFromOtherClass()
|
|
}
|
|
}
|
|
|
|
|
|
/* Explanation: So the producer will emit 1 data very 1 second indefinitely. For the consumer, when they subscribes to the producer, the data will be consumed.
|
|
/* Explanation: So the producer will emit 1 data very 1 second indefinitely. For the consumer, when they subscribes to the producer, the data will be consumed.
|
|
@@ -102,8 +98,8 @@ So when the producer emits data, the next method of the consumer is called, and
|
|
|
|
|
|
1.The console.log() statement logs the received data to the console. This task is synchronous and is executed immediately.
|
|
1.The console.log() statement logs the received data to the console. This task is synchronous and is executed immediately.
|
|
2.The handler1() function is called with the received data as an argument. This task is asynchronous and is added to the event queue.
|
|
2.The handler1() function is called with the received data as an argument. This task is asynchronous and is added to the event queue.
|
|
-3.The then() method of the Promise returned by handler1() is called with a callback function as an argument. This task is also asynchronous and is added to the event queue.
|
|
|
|
-4.The printLog() function is called. This task is synchronous and is executed immediately.
|
|
|
|
|
|
+The then() method of the Promise returned by handler1() is called with a callback function as an argument. This task is also asynchronous and is added to the event queue.
|
|
|
|
+3.The printLog() function is called. This task is synchronous and is executed immediately.
|
|
|
|
|
|
After all synchronous tasks in the call stack are completed, the asynchronous tasks in the event queue are executed one by one, starting with the handler1() function call
|
|
After all synchronous tasks in the call stack are completed, the asynchronous tasks in the event queue are executed one by one, starting with the handler1() function call
|
|
and followed by the then() callback function call.*/
|
|
and followed by the then() callback function call.*/
|
|
@@ -116,17 +112,56 @@ function understandingOBS() {
|
|
handler1(element).then((data) => { // asynchronous
|
|
handler1(element).then((data) => { // asynchronous
|
|
handler2(data) // setTimeout will put the call into the call stack
|
|
handler2(data) // setTimeout will put the call into the call stack
|
|
})
|
|
})
|
|
- printLog() // synchronous: this must complete before the next data to be received
|
|
|
|
|
|
+ printLog()
|
|
}
|
|
}
|
|
})
|
|
})
|
|
}
|
|
}
|
|
|
|
|
|
/* Buffer */
|
|
/* Buffer */
|
|
-let bufferring = publishDataEverySecond.pipe(buffer(control)) // standard buffer
|
|
|
|
-let buffered = publishDataEverySecond.pipe( // using buffer when
|
|
|
|
- bufferWhen(() => interval(1000 + Math.random() * 4000))
|
|
|
|
-);
|
|
|
|
function bufferOBS() {
|
|
function bufferOBS() {
|
|
|
|
+ /* This code defines a function called bufferOBS that creates a new observable called bufferring using
|
|
|
|
+ the buffer operator. The buffer operator collects all the emissions from the source observable (publishDataEverySecond)
|
|
|
|
+ into an array and emits the array each time a notification is emitted from the control observable (control).
|
|
|
|
+
|
|
|
|
+ The buffer operator takes the control observable as a parameter. In this case, control is not defined in
|
|
|
|
+ the code snippet you provided, but it should be an observable that emits a notification to trigger the buffer emission.
|
|
|
|
+
|
|
|
|
+ After creating the bufferring observable, the code subscribes to it using the subscribe method. The subscribe
|
|
|
|
+ method takes a callback function that is invoked each time an element is emitted from the bufferring observable.
|
|
|
|
+
|
|
|
|
+ Inside the callback function, we log the received data using console.log. We then call handler1 with the received
|
|
|
|
+ data and chain a then operator to handle the result. In the then block, we call handler2 with the transformed data. */
|
|
|
|
+ let bufferring = publishDataEverySecond.pipe(buffer(control)) // standard buffer
|
|
|
|
+ bufferring.subscribe((element) => {
|
|
|
|
+ console.log(`Data received: ${element}`)
|
|
|
|
+ handler1(element).then((data) => {
|
|
|
|
+ handler2(data)
|
|
|
|
+ })
|
|
|
|
+ })
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+function bufferWhenOBS() {
|
|
|
|
+ let buffered = publishDataEverySecond.pipe(
|
|
|
|
+ /* This code creates a buffered observable that buffers emissions from publishDataEverySecond.
|
|
|
|
+ The bufferWhen operator is used to determine when to buffer data. In this case, it uses the
|
|
|
|
+ interval operator to emit a value every random time between 1000 and 5000 milliseconds. This
|
|
|
|
+ means that every 1 to 5 seconds, the buffered observable will emit an array of the buffered values. */
|
|
|
|
+ bufferWhen(() => interval(1000 + Math.random() * 4000))
|
|
|
|
+ );
|
|
|
|
+
|
|
|
|
+ /* This code defines a function bufferOBS that subscribes to the buffered observable and processes the
|
|
|
|
+ buffered data by calling handler1 and handler2 on each element. When new data arrives, the next callback
|
|
|
|
+ is invoked with an array of the buffered values.
|
|
|
|
+
|
|
|
|
+ Inside the next callback, we log the received data using console.log. We then call handler1 with the
|
|
|
|
+ received data and chain a then operator to handle the result. In the then block, we call handler2
|
|
|
|
+ with the transformed data.
|
|
|
|
+
|
|
|
|
+ Overall, this code sets up a pipeline of RxJS operators to create a buffered observable that buffers
|
|
|
|
+ emissions from the source observable for a certain amount of time. It then processes the buffered
|
|
|
|
+ data using the handler1 and handler2 functions. This is useful when you want to group emissions
|
|
|
|
+ from a source observable and process them together, for example, to reduce network traffic or to
|
|
|
|
+ process data in batches. */
|
|
buffered.subscribe({
|
|
buffered.subscribe({
|
|
next(element) {
|
|
next(element) {
|
|
console.log(`Data received: ${element}`)
|
|
console.log(`Data received: ${element}`)
|
|
@@ -138,15 +173,41 @@ function bufferOBS() {
|
|
}
|
|
}
|
|
|
|
|
|
/* Scheduler */
|
|
/* Scheduler */
|
|
-let scheduler = publishDataEverySecond.pipe(observeOn(asyncScheduler)) //async scheduler
|
|
|
|
-let source$ = interval(1000, queueScheduler); // queue scheduler
|
|
|
|
-let result$ = source$.pipe(takeUntil(control))
|
|
|
|
-function scheduleOBS() {
|
|
|
|
- result$.subscribe({
|
|
|
|
- next: element => {
|
|
|
|
- console.log(`Scheduler: ${element}`)
|
|
|
|
- }
|
|
|
|
- })
|
|
|
|
|
|
+function asyncScheduleOBS() {
|
|
|
|
+ const scheduleObservable = of(`Hello Observable passing through`)
|
|
|
|
+ const delayTime = 3000;
|
|
|
|
+ /* In this example, the source$ observable emits the value "Hello, RxJS!". We pass the source$ observable\
|
|
|
|
+ as a parameter to the task function, which is executed after a delay of 1 second using the asyncScheduler.
|
|
|
|
+ Inside the task function, we subscribe to the source$ observable and log its emitted values to the console.
|
|
|
|
+ By using an observable as the task parameter, you can create complex logic that can be executed at a specific time or interval. */
|
|
|
|
+ let subscription = asyncScheduler.schedule(() => {
|
|
|
|
+ scheduleObservable.subscribe((element) => {
|
|
|
|
+ console.log(element)
|
|
|
|
+ })
|
|
|
|
+ }, delayTime);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+function queue_Scheduler() {
|
|
|
|
+ /* In this example, we use the observeOn operator to apply the queueScheduler to the observable stream.
|
|
|
|
+ We then subscribe to the stream and execute three tasks (task1, task2, and task3) in order using the queueScheduler.
|
|
|
|
+
|
|
|
|
+ The queueScheduler ensures that the tasks are executed in the order they were added to the queue, waiting for each task to
|
|
|
|
+ complete before executing the next one. This is useful for tasks that need to run in order and should not be interrupted by other tasks.
|
|
|
|
+
|
|
|
|
+ Note that the queueScheduler is a synchronous scheduler, which means that tasks scheduled using this scheduler will be executed
|
|
|
|
+ synchronously. If you need to schedule tasks asynchronously, you can use the asyncScheduler or other schedulers that provide asynchronous execution.*/
|
|
|
|
+ let task1 = () => console.log('Task 1');
|
|
|
|
+ let task2 = () => console.log('Task 2');
|
|
|
|
+ let task3 = () => console.log('Task 3');
|
|
|
|
+
|
|
|
|
+ of(null)
|
|
|
|
+ .pipe(
|
|
|
|
+ observeOn(queueScheduler)
|
|
|
|
+ )
|
|
|
|
+ .subscribe(() => {
|
|
|
|
+ task1();
|
|
|
|
+ task2();
|
|
|
|
+ task3();
|
|
|
|
+ });
|
|
}
|
|
}
|
|
|
|
|
|
-understandingOBS()
|
|
|