Ver código fonte

buffering implementations

Enzo 1 ano atrás
pai
commit
63c1019dfe
5 arquivos alterados com 197 adições e 53 exclusões
  1. 2 1
      package.json
  2. 1 1
      services/dataprep.service.ts
  3. 28 0
      test/buffer_showcase.ts
  4. 81 20
      test/consumer_1.ts
  5. 85 31
      test/publisher.ts

+ 2 - 1
package.json

@@ -13,7 +13,8 @@
     "test3": "node test/test3.js",
     "test4": "node test/test4.js",
     "publish": "node test/publisher.js",
-    "consume": "node --max-old-space-size=512 test/consumer_1.js"
+    "consume": "node --max-old-space-size=512 test/consumer_1.js",
+    "buffer": "node test/buffer_showcase.js"
   },
   "repository": {
     "type": "git",

+ 1 - 1
services/dataprep.service.ts

@@ -40,7 +40,7 @@ export class DataPrepService {
                 clearInterval(intervalId);
                 dataFromStorage.complete();
             }
-        }, 250)
+        }, 100)
 
     }
 

+ 28 - 0
test/buffer_showcase.ts

@@ -0,0 +1,28 @@
+import { interval } from 'rxjs';
+import { buffer, filter, map, pairwise, tap } from 'rxjs/operators';
+
+// Create an observable that emits a number every second
+const source$ = interval(1000);
+
+// Create an observable that emits a random boolean value every 3 seconds
+const trigger$ = interval(1000).pipe(
+  map(() => Math.random() < 0.5),
+  tap(triggered => {
+    if (triggered) {
+      console.log('Buffering triggered by:', triggered);
+    } else {
+      console.log('Buffering not triggered by:', triggered);
+    }
+  })
+);
+
+// Buffer the values emitted by source$ between two true values emitted by trigger$
+source$.pipe(
+  buffer(trigger$.pipe(
+    pairwise(),
+    filter(([prev, curr]) => prev === false && curr === true),
+    map(([_, curr]) => curr)
+  ))
+).subscribe(bufferedValues => {
+  console.log('Buffered values:', bufferedValues);
+});

+ 81 - 20
test/consumer_1.ts

@@ -9,36 +9,97 @@ import * as WebSocket from 'ws';
 // const heapStats = v8.getHeapStatistics();
 // const maxHeapSize = heapStats.heap_size_limit / (1024 * 1024); // Convert to MB
 // console.log(`Current maximum heap size: ${maxHeapSize.toFixed(2)} MB`);
+// const used = process.memoryUsage().heapUsed / 1024 / 1024;
+// const total = process.memoryUsage().heapTotal / 1024 / 1024;
+// console.log(`Heap memory usage: ${used.toFixed(2)} MB`);
+// console.log(`Total heap size: ${total.toFixed(2)} MB`);
+
+function checkHeapSize(): any {
+    let currentHeapSize = process.memoryUsage().heapUsed / 1024 / 1024;
+    let allocatedHeapSize = 512;
+    let percentage = (currentHeapSize / allocatedHeapSize) * 100;
+    console.log(`Consumer_! Heap currentHeapSize: ${currentHeapSize} MB. Percentage: ${percentage}`);
+
+    return percentage
+}
 
 // Create new Subject to handle incoming data from remote subscription
 let payload: Subject<any> = new Subject()
-
 payload.subscribe((element) => {
-    console.log(`Received message from server: ${element.appData.msgId}`);
-    const used = process.memoryUsage().heapUsed / 1024 / 1024;
-    console.log(`Heap used: ${used} MB`);
+    console.log(`Received message from server: ${element.header.messageID}`);
+    // tell traffic control to check heap size
+    // trafficControl.next(checkHeapSize())
 })
 
+// Create new Subject to monitor and broadcast heap size 
+let trafficControl: Subject<any> = new Subject()
+let intervalChecking = interval(1000)
+intervalChecking.subscribe(() => {
+    trafficControl.next(checkHeapSize())
+})
+
+// Create a WebSocket server
+function createWebsocketServer() {
+    const wss = new WebSocket.Server({ port: 8081 });
+
+    // Listen for connections to the WebSocket server
+    wss.on('connection', (ws: WebSocket) => {
+        console.log('Client connected');
+
+        // Subscribe to the subject when a client connects
+        const subscription = trafficControl.subscribe((element) => {
+            // Stringify heap status and send data over to connected client
+            const messageString = JSON.stringify(element);
+            ws.send(messageString);
+        });
+
+        // Listen for messages from the client
+        ws.on('message', (message: any) => {
+            console.log(`Received message from client: ${message}`);
+        });
+
+        // Unsubscribe from the subject when the client disconnects
+        ws.on('close', () => {
+            console.log('Client disconnected');
+            subscription.unsubscribe();
+        });
+    });
+}
+
+
 // Create a new WebSocket client
-const ws = new WebSocket('ws://localhost:8080');
+function connectWebSocket() {
+    const ws = new WebSocket('ws://localhost:8080');
+
+    // Listen for the WebSocket connection to open
+    ws.on('open', () => {
+        console.log('Connecting to Publisher WebSocket server');
+
+        // Send a message to the server
+        ws.send('Hello, publisher server!');
+    })
 
-// Listen for the WebSocket connection to open
-ws.on('open', () => {
-    console.log('Connected to WebSocket server');
+    // Listen for messages from the server
+    ws.on('message', (message: string) => {
+        let msgObj: any[] = JSON.parse(message)
+        msgObj.forEach(element => {
+            payload.next(element)
+        });
+    });
 
-    // Send a message to the server
-    ws.send('Hello, server!');
-});
+    // Listen for WebSocket errors
+    ws.on('error', (element) => {
+        console.error('WebSocket error:', element);
+    })
 
-// Listen for messages from the server
-ws.on('message', (message: string) => {
-    let msgObj = JSON.parse(message)
-    payload.next(msgObj)
-});
+    // Listen for the WebSocket connection to close
+    ws.on('close', () => {
+        console.log('Disconnected from WebSocket server');
+        setTimeout(connectWebSocket, 1000); // Attempt to reconnect after 1 second
 
-// Listen for the WebSocket connection to close
-ws.on('close', () => {
-    console.log('Disconnected from WebSocket server');
-});
+    })
 
+}
 
+createWebsocketServer()
+connectWebSocket();

+ 85 - 31
test/publisher.ts

@@ -1,47 +1,101 @@
-import { Subject } from 'rxjs';
+import { BehaviorSubject, Subject, buffer, bufferWhen, elementAt, filter } from 'rxjs';
 import * as WebSocket from 'ws';
 import { DataPrepService } from '../services/dataprep.service';
 import { StorageLocation } from '../types/interface';
 let msgData = new DataPrepService()
 
 /* ---------------------- COMPLEX OPERATION ------------------------------ */
-const msgPayload: Subject<any> = new Subject();
+let msgPayload: Subject<any> = new Subject();
+let consumerTrafficStatus: Subject<any> = new Subject()
+let bufferTrigger: BehaviorSubject<boolean> = new BehaviorSubject(true)
 
-let mongoStorage: StorageLocation = {
-    type: `MongoDB`,
-    url: `mongodb://192.168.100.59:27017/default`
+// let mongoStorage: StorageLocation = {
+//     type: `MongoDB`,
+//     url: `mongodb://192.168.100.59:27017/default`
+// }
+// msgData.loadObsData(mongoStorage, msgPayload)
+
+let storageAddress: StorageLocation = {
+    type: "File",
+    url: "payload.json"
 }
-msgData.loadObsData(mongoStorage, msgPayload)
+msgData.loadObsData(storageAddress, msgPayload)
+
+consumerTrafficStatus.subscribe((element) => {
+    if (element >= 1.5) {
+        let warning: boolean = false
+        bufferTrigger.next(warning)
+        console.log(`Buffering.....`)
+    } else {
+        let warning: boolean = true
+        bufferTrigger.next(warning)
+        // console.log(`Releasing the buffer`)
+    }
+})
+
 
 // Create a WebSocket server
-const wss = new WebSocket.Server({ port: 8080 });
-
-// Listen for connections to the WebSocket server
-wss.on('connection', (ws: WebSocket) => {
-    console.log('Client connected');
-
-    // Subscribe to the subject when a client connects
-    const subscription = msgPayload.subscribe((element) => {
-        const used = process.memoryUsage().heapUsed / 1024 / 1024;
-        const total = process.memoryUsage().heapTotal / 1024 / 1024;
-        console.log(`Heap memory usage: ${used.toFixed(2)} MB`);
-        console.log(`Total heap size: ${total.toFixed(2)} MB`);
-        console.log(`Emitting value ${element.appData.msgId} to client`);
-
-        const messageString = JSON.stringify(element);
-        ws.send(messageString);
-    });
+function createWebsocketServer() {
+    const wss = new WebSocket.Server({ port: 8080 });
+
+    // Listen for connections to the WebSocket server
+    wss.on('connection', (ws: WebSocket) => {
+        console.log('Client connected');
 
-    // Listen for messages from the client
-    ws.on('message', (message: any) => {
-        console.log(`Received message from client: ${message}`);
+        // Subscribe to the subject when a client connects
+        const subscription = msgPayload.pipe(
+            buffer(bufferTrigger.pipe(filter(Boolean)))
+        ).subscribe((element) => {
+            console.log(`Emitting ${element.length} messages`)
+            let messageString = JSON.stringify(element)
+            ws.send(messageString);
+        });
+
+        // Listen for messages from the client
+        ws.on('message', (message: any) => {
+            console.log(`Received message from client: ${message}`);
+        });
+
+        // Unsubscribe from the subject when the client disconnects
+        ws.on('close', () => {
+            console.log('Client disconnected');
+            subscription.unsubscribe();
+        });
     });
+}
 
-    // Unsubscribe from the subject when the client disconnects
-    ws.on('close', () => {
-        console.log('Client disconnected');
-        subscription.unsubscribe();
+
+// Create a new WebSocket client
+function connectWebSocket() {
+    // Create a new WebSocket client
+    const ws = new WebSocket('ws://localhost:8081');
+
+    // Listen for the WebSocket connection to open
+    ws.on('open', () => {
+        console.log('Connecting to Consumer WebSocket server');
+
+        // Send a message to the server
+        ws.send('Hello, consumer server!');
+    })
+
+    // Listen for messages from the server
+    ws.on('message', (message: string) => {
+        let msgObj: number = JSON.parse(message)
+        consumerTrafficStatus.next(msgObj)
     });
-});
 
+    // Listen for WebSocket errors
+    ws.on('error', (element) => {
+        console.error('WebSocket error:', element);
+    })
+
+
+    // Listen for the WebSocket connection to close
+    ws.on('close', () => {
+        console.log('Disconnected from WebSocket server');
+        setTimeout(connectWebSocket, 1000); // Attempt to reconnect after 1 second
+    })
+}
 
+createWebsocketServer()
+connectWebSocket();