Bläddra i källkod

fix buffer issue

Enzo 1 år sedan
förälder
incheckning
2ff74cccb2
3 ändrade filer med 41 tillägg och 47 borttagningar
  1. 19 13
      test/buffer_showcase.ts
  2. 17 4
      test/consumer_1.ts
  3. 5 30
      test/publisher.ts

+ 19 - 13
test/buffer_showcase.ts

@@ -1,10 +1,10 @@
 import { interval } from 'rxjs';
 import { buffer, filter, map, pairwise, tap } from 'rxjs/operators';
 
-// Create an observable that emits a number every second
-const source$ = interval(1000);
+// // Create an observable that emits a number every second
+// const source$ = interval(1000);
 
-// Create an observable that emits a random boolean value every 3 seconds
+// // Create an observable that emits a random boolean value every 3 seconds
 const trigger$ = interval(1000).pipe(
   map(() => Math.random() < 0.5),
   tap(triggered => {
@@ -16,13 +16,19 @@ const trigger$ = interval(1000).pipe(
   })
 );
 
-// Buffer the values emitted by source$ between two true values emitted by trigger$
-source$.pipe(
-  buffer(trigger$.pipe(
-    pairwise(),
-    filter(([prev, curr]) => prev === false && curr === true),
-    map(([_, curr]) => curr)
-  ))
-).subscribe(bufferedValues => {
-  console.log('Buffered values:', bufferedValues);
-});
+// // Buffer the values emitted by source$ between two true values emitted by trigger$
+// source$.pipe(
+//   buffer(trigger$.pipe(
+//     pairwise(),
+//     filter(([prev, curr]) => prev === false && curr === true),
+//     map(([_, curr]) => curr)
+//   ))
+// ).subscribe(bufferedValues => {
+//   console.log('Buffered values:', bufferedValues);
+// });
+
+
+const intervalEvents = interval(1000);
+const buffered = intervalEvents.pipe(buffer(trigger$.pipe(filter(value => !value))));
+// const buffered = intervalEvents.pipe(buffer(trigger$.pipe(map(val => !val))));
+buffered.subscribe(x => console.log(x));

+ 17 - 4
test/consumer_1.ts

@@ -17,7 +17,7 @@ payload.subscribe((element) => {
 
 // Create new Subject to monitor and broadcast heap size 
 let trafficControl: Subject<any> = new Subject()
-let intervalChecking = interval(5000)
+let intervalChecking = interval(1000)
 intervalChecking.subscribe(() => {
     trafficControl.next(util.checkHeapSize())
 })
@@ -30,8 +30,20 @@ function createWebsocketServer() {
         console.log(`Connected to clients`);
 
         // Subscribe to the subject when a client connects
-        const subscription = trafficControl.subscribe((element) => {
-            socket.emit('trafficControl', element);
+        const subscription = trafficControl.subscribe((element: number) => {
+
+            let toProceed = false;
+
+            if (element >= 2) {
+                toProceed = true
+            } else {
+                toProceed = false
+            }
+
+            socket.emit('trafficControl', {
+                "consumerHeapUsage": element.toFixed(2),
+                "pause": toProceed
+            });
         });
 
         socket.on(`disconnect`, () => {
@@ -66,6 +78,7 @@ function connectWebSocket() {
         console.log(data.length)
         data.forEach(element => {
             payload.next(element)
+            // trafficControl.next(util.checkHeapSize())
         });
     })
 
@@ -73,7 +86,7 @@ function connectWebSocket() {
     socket.on('disconnect', () => {
         console.log(`Disconnected from publisher'server`);
 
-        // Attempt to reconnect every 3 seconds
+        // Attempt to reconnect every 1 second
         setTimeout(() => {
             console.log('Attempting to reconnect...');
             socket.connect();

+ 5 - 30
test/publisher.ts

@@ -1,4 +1,4 @@
-import { BehaviorSubject, Subject, buffer, bufferWhen, elementAt, filter, interval } from 'rxjs';
+import { BehaviorSubject, Subject, buffer, bufferWhen, elementAt, filter, interval, map } from 'rxjs';
 import { DataPrepService } from '../services/dataprep.service';
 import { StorageLocation } from '../types/interface';
 import { Server } from 'socket.io'
@@ -12,7 +12,6 @@ util.checkMaxHeap()
 /* ---------------------- COMPLEX OPERATION ------------------------------ */
 let msgPayload: Subject<any> = new Subject();
 let consumerTrafficStatus: Subject<any> = new Subject()
-let bufferTrigger: BehaviorSubject<boolean> = new BehaviorSubject(false)
 
 let mongoStorage: StorageLocation = {
     type: `MongoDB`,
@@ -20,50 +19,25 @@ let mongoStorage: StorageLocation = {
 }
 msgData.loadObsData(mongoStorage, msgPayload)
 
-consumerTrafficStatus.subscribe((consumerHeapUsage) => {
-    if (consumerHeapUsage >= 2) {
-        // If consumerHeapUsage is over 2 %, it will trigger buffer
-        bufferTrigger.next(true)
-    } else {
-        bufferTrigger.next(false)
-    }
-})
-
-bufferTrigger.subscribe((element) => {
-    if(element){
-        console.log(`Heap Load Exceeded on client side. Buffering.....`)
-    }
-})
-
-
 // Create a WebSocket server
 function createWebsocketServer() {
     const io = new Server({})
     io.on('connection', (socket) => {
         console.log(`Connected to Clients/Consumers`)
+        let notifier = consumerTrafficStatus.pipe(filter(value => !value.pause))
 
         // Subscribe to the subject when a client connects
         const subscription = msgPayload.pipe(
-            buffer(bufferTrigger.pipe(filter(value => !value)))
+            buffer(notifier)
         ).subscribe((element) => {
             console.log(`Emitting ${element.length} messages`)
             socket.emit(`payload`, element);
 
         });
 
-        //Listen for messages from consumer/client
-        socket.on(`message`, (message) => {
-            console.log(`Received message from client: ${message}`)
-            util.checkHeapSize()
-            // Send a message back to the client
-            socket.send(`${message} received!`);
-        })
-
         // Listen for the socket to be closed
         socket.on('disconnect', () => {
             console.log('Client/Consumer disconnected');
-            // Need to put next(true) trigger the buffer due to disconnection from the other side.
-            bufferTrigger.next(true)
             subscription.unsubscribe();
         });
     })
@@ -79,7 +53,8 @@ function connectWebSocket() {
         console.log(`Connected to Consumer'Server.`);
     });
 
-    socket.on('trafficControl', (report: number) => {
+    socket.on('trafficControl', (report: any) => {
+        console.log(report)
         consumerTrafficStatus.next(report);
     });