Pārlūkot izejas kodu

scheduler showcase

Enzo 1 gadu atpakaļ
vecāks
revīzija
e152f39aba
5 mainītis faili ar 51 papildinājumiem un 17 dzēšanām
  1. 2 1
      package.json
  2. 1 1
      test/consumer_1.ts
  3. 4 10
      test/publisher.ts
  4. 40 0
      test/scheduler_showcase.ts
  5. 4 5
      test/test3.ts

+ 2 - 1
package.json

@@ -14,7 +14,8 @@
     "test4": "node test/test4.js",
     "publish": "node test/publisher.js",
     "consume": "node --max-old-space-size=512 test/consumer_1.js",
-    "buffer": "node test/buffer_showcase.js"
+    "buffer": "node test/buffer_showcase.js",
+    "schedule": "node test/scheduler_showcase.js"
   },
   "repository": {
     "type": "git",

+ 1 - 1
test/consumer_1.ts

@@ -26,7 +26,7 @@ function checkHeapSize(): any {
 // Create new Subject to handle incoming data from remote subscription
 let payload: Subject<any> = new Subject()
 payload.subscribe((element) => {
-    console.log(`Received message from server: ${element.header.messageID}`);
+    console.log(`Received message from server: ${element.appData.msgId}`);
     // tell traffic control to check heap size
     // trafficControl.next(checkHeapSize())
 })

+ 4 - 10
test/publisher.ts

@@ -9,17 +9,11 @@ let msgPayload: Subject<any> = new Subject();
 let consumerTrafficStatus: Subject<any> = new Subject()
 let bufferTrigger: BehaviorSubject<boolean> = new BehaviorSubject(true)
 
-// let mongoStorage: StorageLocation = {
-//     type: `MongoDB`,
-//     url: `mongodb://192.168.100.59:27017/default`
-// }
-// msgData.loadObsData(mongoStorage, msgPayload)
-
-let storageAddress: StorageLocation = {
-    type: "File",
-    url: "payload.json"
+let mongoStorage: StorageLocation = {
+    type: `MongoDB`,
+    url: `mongodb://192.168.100.59:27017/default`
 }
-msgData.loadObsData(storageAddress, msgPayload)
+msgData.loadObsData(mongoStorage, msgPayload)
 
 consumerTrafficStatus.subscribe((element) => {
     if (element >= 1.5) {

+ 40 - 0
test/scheduler_showcase.ts

@@ -0,0 +1,40 @@
+import { asyncScheduler, interval, Observable } from 'rxjs';
+import { delay, filter, map, observeOn, tap } from 'rxjs/operators';
+
+// Create an observable that emits a number every second
+const source$: Observable<number> = interval(1000);
+
+// Create an observable that emits a random boolean value every second
+const trigger$: Observable<boolean> = interval(1000).pipe(
+    // tap(() => console.log('Trigger emitted')),
+    map(() => Math.random() < 0.5),
+    tap(triggered => {
+        console.log('Scheduler triggered by:', triggered);
+    })
+);
+
+let triggerSubscription = trigger$.subscribe((val) => {
+    if (val) { // if it's true
+        console.log(`Starting scheduler... ${val}`);
+
+        source$.pipe(
+            observeOn(asyncScheduler)
+        ).subscribe(() => console.log('Scheduler completed. Starting source...'));
+
+        // Pause trigger emissions while waiting for the scheduler to complete
+        triggerSubscription.unsubscribe();
+        setTimeout(() => {
+            console.log('Resuming trigger emissions...');
+            triggerSubscription = trigger$.subscribe();
+        }, 5000);
+    }
+});
+
+source$.subscribe(e=>{
+    console.log(e)
+})
+
+// Not applicable to the our solution. There's no way to dynamically activate scheduler or assign it to existing observable emission.
+// Going back to test3 asyncScheduler example, it can only delay the amount of time (prefined), and then subscribe. There 's no way 
+// to delay an existing observable unless we pipe it, which is not something we want to do since we want to save space in the first place.
+// Piping means creating another observable

+ 4 - 5
test/test3.ts

@@ -121,8 +121,6 @@ function understandingOBS() {
     })
 }
 
-understandingOBS()
-
 /* Buffer */
 function bufferOBS() {
     /* This code defines a function called bufferOBS that creates a new observable called bufferring using
@@ -181,11 +179,10 @@ function bufferWhenOBS() {
     })
 }
 
-bufferWhenOBS()
-
 /*  Scheduler */
 function asyncScheduleOBS() {
-    const scheduleObservable = of(`Hello Observable passing through`)
+    // const scheduleObservable = of(`Hello Observable passing through`)
+    const scheduleObservable = interval(1000)
     const delayTime = 3000;
     /* In this example, the source$ observable emits the value "Hello Observable passing through". We pass the source$ observable\
     as a parameter to the task function, which is executed after a delay of 3 second using the asyncScheduler.
@@ -198,6 +195,8 @@ function asyncScheduleOBS() {
     }, delayTime);
 }
 
+asyncScheduleOBS()
+
 function queue_Scheduler() {
     /* In this example, we use the observeOn operator to apply the queueScheduler to the observable stream.
       We then subscribe to the stream and execute three tasks (task1, task2, and task3) in order using the queueScheduler.