publisher.ts 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import { BehaviorSubject, Subject, buffer, bufferWhen, elementAt, filter } from 'rxjs';
  2. import * as WebSocket from 'ws';
  3. import { DataPrepService } from '../services/dataprep.service';
  4. import { StorageLocation } from '../types/interface';
  5. let msgData = new DataPrepService()
  6. /* ---------------------- COMPLEX OPERATION ------------------------------ */
  7. let msgPayload: Subject<any> = new Subject();
  8. let consumerTrafficStatus: Subject<any> = new Subject()
  9. let bufferTrigger: BehaviorSubject<boolean> = new BehaviorSubject(true)
  10. let mongoStorage: StorageLocation = {
  11. type: `MongoDB`,
  12. url: `mongodb://192.168.100.59:27017/default`
  13. }
  14. msgData.loadObsData(mongoStorage, msgPayload)
  15. consumerTrafficStatus.subscribe((element) => {
  16. if (element >= 1.5) {
  17. let warning: boolean = false
  18. bufferTrigger.next(warning)
  19. console.log(`Buffering.....`)
  20. } else {
  21. let warning: boolean = true
  22. bufferTrigger.next(warning)
  23. // console.log(`Releasing the buffer`)
  24. }
  25. })
  26. // Create a WebSocket server
  27. function createWebsocketServer() {
  28. const wss = new WebSocket.Server({ port: 8080 });
  29. // Listen for connections to the WebSocket server
  30. wss.on('connection', (ws: WebSocket) => {
  31. console.log('Client connected');
  32. // Subscribe to the subject when a client connects
  33. const subscription = msgPayload.pipe(
  34. buffer(bufferTrigger.pipe(filter(Boolean)))
  35. ).subscribe((element) => {
  36. console.log(`Emitting ${element.length} messages`)
  37. let messageString = JSON.stringify(element)
  38. ws.send(messageString);
  39. });
  40. // Listen for messages from the client
  41. ws.on('message', (message: any) => {
  42. console.log(`Received message from client: ${message}`);
  43. });
  44. // Unsubscribe from the subject when the client disconnects
  45. ws.on('close', () => {
  46. console.log('Client disconnected');
  47. subscription.unsubscribe();
  48. });
  49. });
  50. }
  51. // Create a new WebSocket client
  52. function connectWebSocket() {
  53. // Create a new WebSocket client
  54. const ws = new WebSocket('ws://localhost:8081');
  55. // Listen for the WebSocket connection to open
  56. ws.on('open', () => {
  57. console.log('Connecting to Consumer WebSocket server');
  58. // Send a message to the server
  59. ws.send('Hello, consumer server!');
  60. })
  61. // Listen for messages from the server
  62. ws.on('message', (message: string) => {
  63. let msgObj: number = JSON.parse(message)
  64. consumerTrafficStatus.next(msgObj)
  65. });
  66. // Listen for WebSocket errors
  67. ws.on('error', (element) => {
  68. console.error('WebSocket error:', element);
  69. })
  70. // Listen for the WebSocket connection to close
  71. ws.on('close', () => {
  72. console.log('Disconnected from WebSocket server');
  73. setTimeout(connectWebSocket, 1000); // Attempt to reconnect after 1 second
  74. })
  75. }
  76. createWebsocketServer()
  77. connectWebSocket();