|
@@ -120,17 +120,17 @@ function understandingOBS() {
|
|
|
/* Buffer */
|
|
|
function bufferOBS() {
|
|
|
/* This code defines a function called bufferOBS that creates a new observable called bufferring using
|
|
|
- the buffer operator. The buffer operator collects all the emissions from the source observable (publishDataEverySecond)
|
|
|
- into an array and emits the array each time a notification is emitted from the control observable (control).
|
|
|
+ the buffer operator. The buffer operator collects all the emissions from the source observable (publishDataEverySecond)
|
|
|
+ into an array and emits the array each time a notification is emitted from the control observable (control).
|
|
|
|
|
|
The buffer operator takes the control observable as a parameter. In this case, control is not defined in
|
|
|
- the code snippet you provided, but it should be an observable that emits a notification to trigger the buffer emission.
|
|
|
+ the code snippet you provided, but it should be an observable that emits a notification to trigger the buffer emission.
|
|
|
|
|
|
After creating the bufferring observable, the code subscribes to it using the subscribe method. The subscribe
|
|
|
method takes a callback function that is invoked each time an element is emitted from the bufferring observable.
|
|
|
|
|
|
Inside the callback function, we log the received data using console.log. We then call handler1 with the received
|
|
|
- data and chain a then operator to handle the result. In the then block, we call handler2 with the transformed data. */
|
|
|
+ data and chain a then operator to handle the result. In the then block, we call handler2 with the transformed data. */
|
|
|
let bufferring = publishDataEverySecond.pipe(buffer(control)) // standard buffer
|
|
|
bufferring.subscribe((element) => {
|
|
|
console.log(`Data received: ${element}`)
|
|
@@ -158,10 +158,13 @@ function bufferWhenOBS() {
|
|
|
with the transformed data.
|
|
|
|
|
|
Overall, this code sets up a pipeline of RxJS operators to create a buffered observable that buffers
|
|
|
- emissions from the source observable for a certain amount of time. It then processes the buffered
|
|
|
- data using the handler1 and handler2 functions. This is useful when you want to group emissions
|
|
|
- from a source observable and process them together, for example, to reduce network traffic or to
|
|
|
- process data in batches. */
|
|
|
+ emissions from the source observable for a certain amount of time. It then processes the buffered
|
|
|
+ data using the handler1 and handler2 functions. This is useful when you want to group emissions
|
|
|
+ from a source observable and process them together, for example, to reduce network traffic or to
|
|
|
+ process data in batches.
|
|
|
+
|
|
|
+ Please not that the handler handles the array, not the individual data. A separate function will
|
|
|
+ need to be designed in order to process each of the individual values from the emitted array. */
|
|
|
buffered.subscribe({
|
|
|
next(element) {
|
|
|
console.log(`Data received: ${element}`)
|
|
@@ -172,12 +175,14 @@ function bufferWhenOBS() {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
+bufferWhenOBS()
|
|
|
+
|
|
|
/* Scheduler */
|
|
|
function asyncScheduleOBS() {
|
|
|
const scheduleObservable = of(`Hello Observable passing through`)
|
|
|
const delayTime = 3000;
|
|
|
- /* In this example, the source$ observable emits the value "Hello, RxJS!". We pass the source$ observable\
|
|
|
- as a parameter to the task function, which is executed after a delay of 1 second using the asyncScheduler.
|
|
|
+ /* In this example, the source$ observable emits the value "Hello Observable passing through". We pass the source$ observable\
|
|
|
+ as a parameter to the task function, which is executed after a delay of 3 second using the asyncScheduler.
|
|
|
Inside the task function, we subscribe to the source$ observable and log its emitted values to the console.
|
|
|
By using an observable as the task parameter, you can create complex logic that can be executed at a specific time or interval. */
|
|
|
let subscription = asyncScheduler.schedule(() => {
|
|
@@ -189,7 +194,7 @@ function asyncScheduleOBS() {
|
|
|
|
|
|
function queue_Scheduler() {
|
|
|
/* In this example, we use the observeOn operator to apply the queueScheduler to the observable stream.
|
|
|
- We then subscribe to the stream and execute three tasks (task1, task2, and task3) in order using the queueScheduler.
|
|
|
+ We then subscribe to the stream and execute three tasks (task1, task2, and task3) in order using the queueScheduler.
|
|
|
|
|
|
The queueScheduler ensures that the tasks are executed in the order they were added to the queue, waiting for each task to
|
|
|
complete before executing the next one. This is useful for tasks that need to run in order and should not be interrupted by other tasks.
|