import { BehaviorSubject, Subject, buffer, bufferWhen, elementAt, filter } from 'rxjs'; import * as WebSocket from 'ws'; import { DataPrepService } from '../services/dataprep.service'; import { StorageLocation } from '../types/interface'; let msgData = new DataPrepService() /* ---------------------- COMPLEX OPERATION ------------------------------ */ let msgPayload: Subject = new Subject(); let consumerTrafficStatus: Subject = new Subject() let bufferTrigger: BehaviorSubject = new BehaviorSubject(true) // let mongoStorage: StorageLocation = { // type: `MongoDB`, // url: `mongodb://192.168.100.59:27017/default` // } // msgData.loadObsData(mongoStorage, msgPayload) let storageAddress: StorageLocation = { type: "File", url: "payload.json" } msgData.loadObsData(storageAddress, msgPayload) consumerTrafficStatus.subscribe((element) => { if (element >= 1.5) { let warning: boolean = false bufferTrigger.next(warning) console.log(`Buffering.....`) } else { let warning: boolean = true bufferTrigger.next(warning) // console.log(`Releasing the buffer`) } }) // Create a WebSocket server function createWebsocketServer() { const wss = new WebSocket.Server({ port: 8080 }); // Listen for connections to the WebSocket server wss.on('connection', (ws: WebSocket) => { console.log('Client connected'); // Subscribe to the subject when a client connects const subscription = msgPayload.pipe( buffer(bufferTrigger.pipe(filter(Boolean))) ).subscribe((element) => { console.log(`Emitting ${element.length} messages`) let messageString = JSON.stringify(element) ws.send(messageString); }); // Listen for messages from the client ws.on('message', (message: any) => { console.log(`Received message from client: ${message}`); }); // Unsubscribe from the subject when the client disconnects ws.on('close', () => { console.log('Client disconnected'); subscription.unsubscribe(); }); }); } // Create a new WebSocket client function connectWebSocket() { // Create a new WebSocket client const ws = new WebSocket('ws://localhost:8081'); // Listen for the WebSocket connection to open ws.on('open', () => { console.log('Connecting to Consumer WebSocket server'); // Send a message to the server ws.send('Hello, consumer server!'); }) // Listen for messages from the server ws.on('message', (message: string) => { let msgObj: number = JSON.parse(message) consumerTrafficStatus.next(msgObj) }); // Listen for WebSocket errors ws.on('error', (element) => { console.error('WebSocket error:', element); }) // Listen for the WebSocket connection to close ws.on('close', () => { console.log('Disconnected from WebSocket server'); setTimeout(connectWebSocket, 1000); // Attempt to reconnect after 1 second }) } createWebsocketServer() connectWebSocket();