浏览代码

update some new comments

Enzo 1 年之前
父节点
当前提交
d09929b2e2
共有 3 个文件被更改,包括 66 次插入39 次删除
  1. 40 0
      services/utility.service.ts
  2. 14 29
      test/consumer_1.ts
  3. 12 10
      test/publisher.ts

+ 40 - 0
services/utility.service.ts

@@ -0,0 +1,40 @@
+import _ = require("lodash")
+
+export class UtilityService {
+    // Check Heap size
+    // const v8 = require('v8');
+    // const heapStats = v8.getHeapStatistics();
+    // const maxHeapSize = heapStats.heap_size_limit / (1024 * 1024); // Convert to MB
+    // console.log(`Current maximum heap size: ${maxHeapSize.toFixed(2)} MB`);
+    // const used = process.memoryUsage().heapUsed / 1024 / 1024;
+    // const total = process.memoryUsage().heapTotal / 1024 / 1024;
+    // console.log(`Heap memory usage: ${used.toFixed(2)} MB`);
+    // console.log(`Total heap size: ${total.toFixed(2)} MB`);
+
+    public callFromOtherClass() {
+        const t0 = performance.now()
+        let i
+        for (i = 0; i <= 6000000000; i++) {
+        }
+        const t1 = performance.now()
+        const timeTakenInSeconds = (t1 - t0) / 1000;
+        console.log(`Time taken: ${timeTakenInSeconds} seconds to run this function`);
+    }
+
+    public checkHeapSize(): any {
+        let currentHeapSize = process.memoryUsage().heapUsed / 1024 / 1024;
+        let allocatedHeapSize = 512;
+        let percentage = (currentHeapSize / allocatedHeapSize) * 100;
+        console.log(`Consumer_! Heap currentHeapSize: ${currentHeapSize.toFixed(2)} MB. Percentage: ${percentage} %`);
+
+        return percentage
+    }
+
+    public checkMaxHeap() {
+        let v8 = require('v8');
+        let heapStats = v8.getHeapStatistics();
+        let maxHeapSize = heapStats.heap_size_limit / (1024 * 1024); // Convert to MB
+        console.log(`Current maximum heap size: ${maxHeapSize.toFixed(2)} MB`);
+    }
+}
+

+ 14 - 29
test/consumer_1.ts

@@ -3,38 +3,23 @@ import _ = require("lodash")
 import { Subject, interval } from "rxjs";
 import { io } from "socket.io-client";
 import { Server } from "socket.io"
+import { UtilityService } from "../services/utility.service";
 
+const util = new UtilityService()
+util.checkMaxHeap()
 /* ---------------------- COMPLEX OPERATION ------------------------------ */
-// Check Heap size
-// const v8 = require('v8');
-// const heapStats = v8.getHeapStatistics();
-// const maxHeapSize = heapStats.heap_size_limit / (1024 * 1024); // Convert to MB
-// console.log(`Current maximum heap size: ${maxHeapSize.toFixed(2)} MB`);
-// const used = process.memoryUsage().heapUsed / 1024 / 1024;
-// const total = process.memoryUsage().heapTotal / 1024 / 1024;
-// console.log(`Heap memory usage: ${used.toFixed(2)} MB`);
-// console.log(`Total heap size: ${total.toFixed(2)} MB`);
-
-function checkHeapSize(): any {
-    let currentHeapSize = process.memoryUsage().heapUsed / 1024 / 1024;
-    let allocatedHeapSize = 512;
-    let percentage = (currentHeapSize / allocatedHeapSize) * 100;
-    console.log(`Consumer_! Heap currentHeapSize: ${currentHeapSize.toFixed(2)} MB. Percentage: ${percentage} %`);
-
-    return percentage
-}
 
 // Create new Subject to handle incoming data from remote subscription
 let payload: Subject<any> = new Subject()
 payload.subscribe((element) => {
-    console.log(`Received message from server: ${element.appData.msgId}`);
+    // console.log(`Received message from server: ${element.appData.msgId}`);
 })
 
 // Create new Subject to monitor and broadcast heap size 
 let trafficControl: Subject<any> = new Subject()
-let intervalChecking = interval(1000)
+let intervalChecking = interval(5000)
 intervalChecking.subscribe(() => {
-    trafficControl.next(checkHeapSize())
+    trafficControl.next(util.checkHeapSize())
 })
 
 // Create a WebSocket server
@@ -42,21 +27,21 @@ function createWebsocketServer() {
     const io = new Server({})
     // Listen for connections to the WebSocket server
     io.on(`connection`, (socket) => {
-        console.log(`Connected to client`);
+        console.log(`Connected to clients`);
 
         // Subscribe to the subject when a client connects
         const subscription = trafficControl.subscribe((element) => {
-            // Stringify heap status and send data over to connected client
             socket.emit('trafficControl', element);
         });
 
         socket.on(`disconnect`, () => {
-            console.log(`Client Disconnected`)
+            console.log(`Clients Disconnected`)
             subscription.unsubscribe()
         })
 
     })
 
+    // create localhost server port 8080
     io.listen(8081);
 }
 
@@ -67,7 +52,7 @@ function connectWebSocket() {
 
     // Listen for the WebSocket connection to open
     socket.on(`connect`, () => {
-        console.log(`Connected to publisher`)
+        console.log(`Connected to publisher'server`)
 
 
         // Listen for messages from the server
@@ -76,9 +61,9 @@ function connectWebSocket() {
         });
     })
 
-    socket.emit('subscribe', 'traffic-status')
-
-    socket.on('payload', (data) => {
+    // Receive payload from publisher and push them into local Subject
+    socket.on('payload', (data: any[]) => {
+        console.log(data.length)
         data.forEach(element => {
             payload.next(element)
         });
@@ -86,7 +71,7 @@ function connectWebSocket() {
 
     // Listen for the disconnect event
     socket.on('disconnect', () => {
-        console.log('Disconnected from server');
+        console.log(`Disconnected from publisher'server`);
 
         // Attempt to reconnect every 3 seconds
         setTimeout(() => {

+ 12 - 10
test/publisher.ts

@@ -3,8 +3,11 @@ import { DataPrepService } from '../services/dataprep.service';
 import { StorageLocation } from '../types/interface';
 import { Server } from 'socket.io'
 import { io } from 'socket.io-client';
+import { UtilityService } from '../services/utility.service';
 
 let msgData = new DataPrepService()
+let util = new UtilityService()
+util.checkMaxHeap()
 
 /* ---------------------- COMPLEX OPERATION ------------------------------ */
 let msgPayload: Subject<any> = new Subject();
@@ -25,7 +28,6 @@ consumerTrafficStatus.subscribe((element) => {
     } else {
         let warning: boolean = true
         bufferTrigger.next(warning)
-        // console.log(`Releasing the buffer`)
     }
 })
 
@@ -34,51 +36,51 @@ consumerTrafficStatus.subscribe((element) => {
 function createWebsocketServer() {
     const io = new Server({})
     io.on('connection', (socket) => {
-        console.log(`Consumer Connected`)
+        console.log(`Connected to Clients/Consumers`)
 
         // Subscribe to the subject when a client connects
         const subscription = msgPayload.pipe(
             buffer(bufferTrigger.pipe(filter(Boolean)))
         ).subscribe((element) => {
             console.log(`Emitting ${element.length} messages`)
-            // let messageString = JSON.stringify(element)
             socket.emit(`payload`, element);
+
         });
 
         //Listen for messages from consumer/client
         socket.on(`message`, (message) => {
             console.log(`Received message from client: ${message}`)
-
+            util.checkHeapSize()
             // Send a message back to the client
             socket.send(`${message} received!`);
         })
 
         // Listen for the socket to be closed
         socket.on('disconnect', () => {
-            console.log('Client disconnected');
+            console.log('Client/Consumer disconnected');
+            // Need to put next trigger the buffer. Alternavtively, can use bufferTrigfer.next(false)
+            consumerTrafficStatus.next(2)
             subscription.unsubscribe();
-            consumerTrafficStatus.next(false)
         });
     })
 
     io.listen(8080)
 }
 
-
 // Create a new WebSocket client
 function connectWebSocket() {
     const socket = io('http://localhost:8081');
 
     socket.on('connect', () => {
-        console.log('Connected to publisher');
+        console.log(`Connected to Consumer'Server.`);
     });
 
-    socket.on('trafficControl', (report) => {
+    socket.on('trafficControl', (report: number) => {
         consumerTrafficStatus.next(report);
     });
 
     socket.on('disconnect', () => {
-        console.log('Disconnected from server');
+        console.log(`Disconnected from Consumer'Server`);
 
         // Attempt to reconnect every 3 seconds
         setTimeout(() => {