Enzo преди 1 година
родител
ревизия
9954cb715b
променени са 2 файла, в които са добавени 21 реда и са изтрити 18 реда
  1. 7 4
      test/consumer_1.ts
  2. 14 14
      test/publisher.ts

+ 7 - 4
test/consumer_1.ts

@@ -30,19 +30,22 @@ function createWebsocketServer() {
         console.log(`Connected to clients`);
 
         // Subscribe to the subject when a client connects
-        const subscription = trafficControl.subscribe((element: number) => {
+        const subscription = trafficControl.subscribe((heapUsagePercentage: number) => {
 
+            // Boolean to be emitted to server to tell them to buffer the incoming 
             let toBuffer = false;
 
-            if (element >= 3) {
+            // Set to 3% at the moment
+            if (heapUsagePercentage >= 3) {
                 toBuffer = true
             } else {
                 toBuffer = false
             }
 
+            // Stream traffic control obs data over to designated client/consumer.
             socket.emit('trafficControl', {
-                "consumerHeapUsage": element.toFixed(2),
-                "pause": toBuffer
+                "consumerHeapUsage": heapUsagePercentage.toFixed(2),
+                "buffer": toBuffer
             });
         });
 

+ 14 - 14
test/publisher.ts

@@ -4,7 +4,6 @@ import { StorageLocation } from '../types/interface';
 import { Server } from 'socket.io'
 import { io } from 'socket.io-client';
 import { UtilityService } from '../services/utility.service';
-import { Worker } from "worker_threads"
 
 
 let msgData = new DataPrepService()
@@ -14,6 +13,8 @@ util.checkMaxHeap()
 /* ---------------------- COMPLEX OPERATION ------------------------------ */
 let msgPayload: Subject<any> = new Subject();
 let consumerTrafficStatus: Subject<any> = new Subject()
+let inputliveSubject = new Subject();
+let OutputliveSubject = new Subject();
 
 let mongoStorage: StorageLocation = {
     type: `MongoDB`,
@@ -70,6 +71,7 @@ function connectWebSocket() {
         console.log(`Connected to Consumer'Server.`);
     });
 
+    // Subsribe to trafficControl from consumer's side
     socket.on('trafficControl', (report: any) => {
         console.log(report)
         consumerTrafficStatus.next(report);
@@ -86,30 +88,28 @@ function connectWebSocket() {
     });
 }
 
-createWebsocketServer()
-connectWebSocket();
-
-let inputliveSubject = new Subject();
-let OutputliveSubject = new Subject();
 
 function backlogBuffer(msgPayload: Subject<any>, notifier: Observable<any>) {
-    // Pulse by each message
+    // Pulse by each message to tell output to keep releasing the buffer unless requested by client/consumer not to do so.
     msgPayload.subscribe(inputliveSubject)
 
-    // Notifier subscription
+    // Notfier act as a input from client/consumer's side to listen to request for buffering or releasing for the buffer
     notifier.subscribe(inputliveSubject)
 
-    let pause = false // true or false
+    let toBuffer = false // true or false
     inputliveSubject.subscribe((element: any) => {
-        if('pause' in element && element.pause == true){
+        if ('buffer' in element && element.buffer == true) {
             // Start the buffer
-            pause = element.pause
-            console.log(`buffering`)
-        }  
-        if(element && pause == false){
+            toBuffer = element.buffer
+            console.log(`Buffering....`)
+        }
+        if (element && toBuffer == false) {
             // Continue to release the buffer
             OutputliveSubject.next(element)
         }
     })
     return buffer(OutputliveSubject);
 }
+
+createWebsocketServer()
+connectWebSocket();