123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- import { BehaviorSubject, Subject, buffer, bufferWhen, elementAt, filter } from 'rxjs';
- import * as WebSocket from 'ws';
- import { DataPrepService } from '../services/dataprep.service';
- import { StorageLocation } from '../types/interface';
- let msgData = new DataPrepService()
- /* ---------------------- COMPLEX OPERATION ------------------------------ */
- let msgPayload: Subject<any> = new Subject();
- let consumerTrafficStatus: Subject<any> = new Subject()
- let bufferTrigger: BehaviorSubject<boolean> = new BehaviorSubject(true)
- // let mongoStorage: StorageLocation = {
- // type: `MongoDB`,
- // url: `mongodb://192.168.100.59:27017/default`
- // }
- // msgData.loadObsData(mongoStorage, msgPayload)
- let storageAddress: StorageLocation = {
- type: "File",
- url: "payload.json"
- }
- msgData.loadObsData(storageAddress, msgPayload)
- consumerTrafficStatus.subscribe((element) => {
- if (element >= 1.5) {
- let warning: boolean = false
- bufferTrigger.next(warning)
- console.log(`Buffering.....`)
- } else {
- let warning: boolean = true
- bufferTrigger.next(warning)
- // console.log(`Releasing the buffer`)
- }
- })
- // Create a WebSocket server
- function createWebsocketServer() {
- const wss = new WebSocket.Server({ port: 8080 });
- // Listen for connections to the WebSocket server
- wss.on('connection', (ws: WebSocket) => {
- console.log('Client connected');
- // Subscribe to the subject when a client connects
- const subscription = msgPayload.pipe(
- buffer(bufferTrigger.pipe(filter(Boolean)))
- ).subscribe((element) => {
- console.log(`Emitting ${element.length} messages`)
- let messageString = JSON.stringify(element)
- ws.send(messageString);
- });
- // Listen for messages from the client
- ws.on('message', (message: any) => {
- console.log(`Received message from client: ${message}`);
- });
- // Unsubscribe from the subject when the client disconnects
- ws.on('close', () => {
- console.log('Client disconnected');
- subscription.unsubscribe();
- });
- });
- }
- // Create a new WebSocket client
- function connectWebSocket() {
- // Create a new WebSocket client
- const ws = new WebSocket('ws://localhost:8081');
- // Listen for the WebSocket connection to open
- ws.on('open', () => {
- console.log('Connecting to Consumer WebSocket server');
- // Send a message to the server
- ws.send('Hello, consumer server!');
- })
- // Listen for messages from the server
- ws.on('message', (message: string) => {
- let msgObj: number = JSON.parse(message)
- consumerTrafficStatus.next(msgObj)
- });
- // Listen for WebSocket errors
- ws.on('error', (element) => {
- console.error('WebSocket error:', element);
- })
- // Listen for the WebSocket connection to close
- ws.on('close', () => {
- console.log('Disconnected from WebSocket server');
- setTimeout(connectWebSocket, 1000); // Attempt to reconnect after 1 second
- })
- }
- createWebsocketServer()
- connectWebSocket();
|