Browse Source

change to socket io

Enzo 1 năm trước cách đây
mục cha
commit
c5b15cb84c
4 tập tin đã thay đổi với 380 bổ sung532 xóa
  1. 299 449
      package-lock.json
  2. 3 3
      package.json
  3. 40 39
      test/consumer_1.ts
  4. 38 41
      test/publisher.ts

Những thai đổi đã bị hủy bỏ vì nó quá lớn
+ 299 - 449
package-lock.json


+ 3 - 3
package.json

@@ -27,7 +27,7 @@
     "@grpc/grpc-js": "^1.8.13",
     "@grpc/proto-loader": "^0.7.6",
     "@types/node": "^18.11.18",
-    "express": "^4.18.2",
+    "@types/socket.io-client": "^3.0.0",
     "google-protobuf": "^3.21.2",
     "grpc": "^1.24.11",
     "jsonschema": "^1.4.1",
@@ -36,9 +36,9 @@
     "protobufjs": "^7.2.3",
     "pug": "^3.0.2",
     "rxjs": "^7.8.0",
+    "socket.io": "^4.6.1",
     "typescript": "^5.0.2",
-    "typescript-collections": "^1.3.3",
-    "ws": "^8.13.0"
+    "typescript-collections": "^1.3.3"
   },
   "devDependencies": {
     "@faker-js/faker": "^7.6.0",

+ 40 - 39
test/consumer_1.ts

@@ -1,7 +1,8 @@
 import { SearchService } from "../services/query.service"
 import _ = require("lodash")
 import { Subject, interval } from "rxjs";
-import * as WebSocket from 'ws';
+import { io } from "socket.io-client";
+import { Server } from "socket.io"
 
 /* ---------------------- COMPLEX OPERATION ------------------------------ */
 // Check Heap size
@@ -18,7 +19,7 @@ function checkHeapSize(): any {
     let currentHeapSize = process.memoryUsage().heapUsed / 1024 / 1024;
     let allocatedHeapSize = 512;
     let percentage = (currentHeapSize / allocatedHeapSize) * 100;
-    console.log(`Consumer_! Heap currentHeapSize: ${currentHeapSize} MB. Percentage: ${percentage}`);
+    console.log(`Consumer_! Heap currentHeapSize: ${currentHeapSize.toFixed(2)} MB. Percentage: ${percentage} %`);
 
     return percentage
 }
@@ -27,8 +28,6 @@ function checkHeapSize(): any {
 let payload: Subject<any> = new Subject()
 payload.subscribe((element) => {
     console.log(`Received message from server: ${element.appData.msgId}`);
-    // tell traffic control to check heap size
-    // trafficControl.next(checkHeapSize())
 })
 
 // Create new Subject to monitor and broadcast heap size 
@@ -40,64 +39,66 @@ intervalChecking.subscribe(() => {
 
 // Create a WebSocket server
 function createWebsocketServer() {
-    const wss = new WebSocket.Server({ port: 8081 });
-
+    const io = new Server({})
     // Listen for connections to the WebSocket server
-    wss.on('connection', (ws: WebSocket) => {
-        console.log('Client connected');
+    io.on(`connection`, (socket) => {
+        console.log(`Connected to client`);
 
         // Subscribe to the subject when a client connects
         const subscription = trafficControl.subscribe((element) => {
             // Stringify heap status and send data over to connected client
-            const messageString = JSON.stringify(element);
-            ws.send(messageString);
+            socket.emit('trafficControl', element);
         });
 
-        // Listen for messages from the client
-        ws.on('message', (message: any) => {
-            console.log(`Received message from client: ${message}`);
-        });
+        socket.on(`disconnect`, () => {
+            console.log(`Client Disconnected`)
+            subscription.unsubscribe()
+        })
 
-        // Unsubscribe from the subject when the client disconnects
-        ws.on('close', () => {
-            console.log('Client disconnected');
-            subscription.unsubscribe();
-        });
-    });
+    })
+
+    io.listen(8081);
 }
 
 
 // Create a new WebSocket client
 function connectWebSocket() {
-    const ws = new WebSocket('ws://localhost:8080');
+    const socket = io('http://localhost:8080')
 
     // Listen for the WebSocket connection to open
-    ws.on('open', () => {
-        console.log('Connecting to Publisher WebSocket server');
+    socket.on(`connect`, () => {
+        console.log(`Connected to publisher`)
 
-        // Send a message to the server
-        ws.send('Hello, publisher server!');
+
+        // Listen for messages from the server
+        socket.on('message', (message) => {
+            trafficControl.next(message);
+        });
     })
 
-    // Listen for messages from the server
-    ws.on('message', (message: string) => {
-        let msgObj: any[] = JSON.parse(message)
-        msgObj.forEach(element => {
+    socket.emit('subscribe', 'traffic-status')
+
+    socket.on('payload', (data) => {
+        data.forEach(element => {
             payload.next(element)
         });
-    });
-
-    // Listen for WebSocket errors
-    ws.on('error', (element) => {
-        console.error('WebSocket error:', element);
     })
 
-    // Listen for the WebSocket connection to close
-    ws.on('close', () => {
-        console.log('Disconnected from WebSocket server');
-        setTimeout(connectWebSocket, 1000); // Attempt to reconnect after 1 second
+    // Listen for the disconnect event
+    socket.on('disconnect', () => {
+        console.log('Disconnected from server');
 
-    })
+        // Attempt to reconnect every 3 seconds
+        setTimeout(() => {
+            console.log('Attempting to reconnect...');
+            socket.connect();
+        }, 1000);
+    });
+
+    // Listen for errors
+    socket.on('error', (error) => {
+        console.error('Socket error:', error);
+    });
 
 }
 

+ 38 - 41
test/publisher.ts

@@ -1,7 +1,9 @@
-import { BehaviorSubject, Subject, buffer, bufferWhen, elementAt, filter } from 'rxjs';
-import * as WebSocket from 'ws';
+import { BehaviorSubject, Subject, buffer, bufferWhen, elementAt, filter, interval } from 'rxjs';
 import { DataPrepService } from '../services/dataprep.service';
 import { StorageLocation } from '../types/interface';
+import { Server } from 'socket.io'
+import { io } from 'socket.io-client';
+
 let msgData = new DataPrepService()
 
 /* ---------------------- COMPLEX OPERATION ------------------------------ */
@@ -16,10 +18,10 @@ let mongoStorage: StorageLocation = {
 msgData.loadObsData(mongoStorage, msgPayload)
 
 consumerTrafficStatus.subscribe((element) => {
-    if (element >= 1.5) {
+    if (element >= 2) {
         let warning: boolean = false
         bufferTrigger.next(warning)
-        console.log(`Buffering.....`)
+        console.log(`Heap Load Exceeded on client side. Buffering.....`)
     } else {
         let warning: boolean = true
         bufferTrigger.next(warning)
@@ -30,65 +32,60 @@ consumerTrafficStatus.subscribe((element) => {
 
 // Create a WebSocket server
 function createWebsocketServer() {
-    const wss = new WebSocket.Server({ port: 8080 });
-
-    // Listen for connections to the WebSocket server
-    wss.on('connection', (ws: WebSocket) => {
-        console.log('Client connected');
+    const io = new Server({})
+    io.on('connection', (socket) => {
+        console.log(`Consumer Connected`)
 
         // Subscribe to the subject when a client connects
         const subscription = msgPayload.pipe(
             buffer(bufferTrigger.pipe(filter(Boolean)))
         ).subscribe((element) => {
             console.log(`Emitting ${element.length} messages`)
-            let messageString = JSON.stringify(element)
-            ws.send(messageString);
+            // let messageString = JSON.stringify(element)
+            socket.emit(`payload`, element);
         });
 
-        // Listen for messages from the client
-        ws.on('message', (message: any) => {
-            console.log(`Received message from client: ${message}`);
-        });
+        //Listen for messages from consumer/client
+        socket.on(`message`, (message) => {
+            console.log(`Received message from client: ${message}`)
+
+            // Send a message back to the client
+            socket.send(`${message} received!`);
+        })
 
-        // Unsubscribe from the subject when the client disconnects
-        ws.on('close', () => {
+        // Listen for the socket to be closed
+        socket.on('disconnect', () => {
             console.log('Client disconnected');
             subscription.unsubscribe();
+            consumerTrafficStatus.next(false)
         });
-    });
+    })
+
+    io.listen(8080)
 }
 
 
 // Create a new WebSocket client
 function connectWebSocket() {
-    // Create a new WebSocket client
-    const ws = new WebSocket('ws://localhost:8081');
+    const socket = io('http://localhost:8081');
 
-    // Listen for the WebSocket connection to open
-    ws.on('open', () => {
-        console.log('Connecting to Consumer WebSocket server');
-
-        // Send a message to the server
-        ws.send('Hello, consumer server!');
-    })
-
-    // Listen for messages from the server
-    ws.on('message', (message: string) => {
-        let msgObj: number = JSON.parse(message)
-        consumerTrafficStatus.next(msgObj)
+    socket.on('connect', () => {
+        console.log('Connected to publisher');
     });
 
-    // Listen for WebSocket errors
-    ws.on('error', (element) => {
-        console.error('WebSocket error:', element);
-    })
+    socket.on('trafficControl', (report) => {
+        consumerTrafficStatus.next(report);
+    });
 
+    socket.on('disconnect', () => {
+        console.log('Disconnected from server');
 
-    // Listen for the WebSocket connection to close
-    ws.on('close', () => {
-        console.log('Disconnected from WebSocket server');
-        setTimeout(connectWebSocket, 1000); // Attempt to reconnect after 1 second
-    })
+        // Attempt to reconnect every 3 seconds
+        setTimeout(() => {
+            console.log('Attempting to reconnect...');
+            socket.connect();
+        }, 3000);
+    });
 }
 
 createWebsocketServer()

Một số tệp đã không được hiển thị bởi vì quá nhiều tập tin thay đổi trong này khác