Przeglądaj źródła

modify buffer to enable self buffering

Enzo 1 rok temu
rodzic
commit
39f60dc6fb
2 zmienionych plików z 41 dodań i 10 usunięć
  1. 12 6
      test/consumer_1.ts
  2. 29 4
      test/publisher.ts

+ 12 - 6
test/consumer_1.ts

@@ -32,17 +32,17 @@ function createWebsocketServer() {
         // Subscribe to the subject when a client connects
         const subscription = trafficControl.subscribe((element: number) => {
 
-            let toProceed = false;
+            let toBuffer = false;
 
-            if (element >= 50) {
-                toProceed = true
+            if (element >= 3) {
+                toBuffer = true
             } else {
-                toProceed = false
+                toBuffer = false
             }
 
             socket.emit('trafficControl', {
                 "consumerHeapUsage": element.toFixed(2),
-                "pause": toProceed
+                "pause": toBuffer
             });
         });
 
@@ -75,9 +75,15 @@ function connectWebSocket() {
 
     // Receive payload from publisher and push them into local Subject
     socket.on('payload', (data: any[]) => {
-        console.log(`Message received from publisher: ${data.length}`)
+        if(data.length > 0){
+            // just to check if there's any data
+            // console.log(`Message received from publisher: ${data.length}`)
+        } else {
+            console.log(`Publisher is buffering. Data received = ${data.length}`)
+        }
         data.forEach(element => {
             payload.next(element)
+            // console.log(element.appData.msgId)
             // trafficControl.next(util.checkHeapSize())
         });
     })

+ 29 - 4
test/publisher.ts

@@ -1,4 +1,4 @@
-import { BehaviorSubject, Subject, buffer, bufferWhen, elementAt, filter, interval, map } from 'rxjs';
+import { BehaviorSubject, Observable, Subject, buffer, bufferWhen, elementAt, filter, interval, map } from 'rxjs';
 import { DataPrepService } from '../services/dataprep.service';
 import { StorageLocation } from '../types/interface';
 import { Server } from 'socket.io'
@@ -26,11 +26,11 @@ function createWebsocketServer() {
     const io = new Server({})
     io.on('connection', (socket) => {
         console.log(`Connected to Clients/Consumers`)
-        let notifier = consumerTrafficStatus.pipe(filter(value => !value.pause))
+        // let notifier = consumerTrafficStatus.pipe(filter(value => !value.pause))
 
         // Subscribe to the subject when a client connects
         const subscription = msgPayload.pipe(
-            buffer(notifier)
+            backlogBuffer(msgPayload, consumerTrafficStatus)
         ).subscribe((element) => {
             if (element.length >= 25000) {
                 const chunkSize = 1024; // Specify the desired chunk size in bytes
@@ -46,7 +46,7 @@ function createWebsocketServer() {
                 });
                 socket.emit(`end`)
             } else {
-                console.log(`Emitting ${element.length} messages`)
+                // console.log(`Emitting ${element.length} messages`)
                 socket.emit(`payload`, element);
             }
 
@@ -88,3 +88,28 @@ function connectWebSocket() {
 
 createWebsocketServer()
 connectWebSocket();
+
+let inputliveSubject = new Subject();
+let OutputliveSubject = new Subject();
+
+function backlogBuffer(msgPayload: Subject<any>, notifier: Observable<any>) {
+    // Pulse by each message
+    msgPayload.subscribe(inputliveSubject)
+
+    // Notifier subscription
+    notifier.subscribe(inputliveSubject)
+
+    let pause = false // true or false
+    inputliveSubject.subscribe((element: any) => {
+        if('pause' in element && element.pause == true){
+            // Start the buffer
+            pause = element.pause
+            console.log(`buffering`)
+        }  
+        if(element && pause == false){
+            // Continue to release the buffer
+            OutputliveSubject.next(element)
+        }
+    })
+    return buffer(OutputliveSubject);
+}