publisher.ts 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import { BehaviorSubject, Subject, buffer, bufferWhen, elementAt, filter } from 'rxjs';
  2. import * as WebSocket from 'ws';
  3. import { DataPrepService } from '../services/dataprep.service';
  4. import { StorageLocation } from '../types/interface';
  5. let msgData = new DataPrepService()
  6. /* ---------------------- COMPLEX OPERATION ------------------------------ */
  7. let msgPayload: Subject<any> = new Subject();
  8. let consumerTrafficStatus: Subject<any> = new Subject()
  9. let bufferTrigger: BehaviorSubject<boolean> = new BehaviorSubject(true)
  10. // let mongoStorage: StorageLocation = {
  11. // type: `MongoDB`,
  12. // url: `mongodb://192.168.100.59:27017/default`
  13. // }
  14. // msgData.loadObsData(mongoStorage, msgPayload)
  15. let storageAddress: StorageLocation = {
  16. type: "File",
  17. url: "payload.json"
  18. }
  19. msgData.loadObsData(storageAddress, msgPayload)
  20. consumerTrafficStatus.subscribe((element) => {
  21. if (element >= 1.5) {
  22. let warning: boolean = false
  23. bufferTrigger.next(warning)
  24. console.log(`Buffering.....`)
  25. } else {
  26. let warning: boolean = true
  27. bufferTrigger.next(warning)
  28. // console.log(`Releasing the buffer`)
  29. }
  30. })
  31. // Create a WebSocket server
  32. function createWebsocketServer() {
  33. const wss = new WebSocket.Server({ port: 8080 });
  34. // Listen for connections to the WebSocket server
  35. wss.on('connection', (ws: WebSocket) => {
  36. console.log('Client connected');
  37. // Subscribe to the subject when a client connects
  38. const subscription = msgPayload.pipe(
  39. buffer(bufferTrigger.pipe(filter(Boolean)))
  40. ).subscribe((element) => {
  41. console.log(`Emitting ${element.length} messages`)
  42. let messageString = JSON.stringify(element)
  43. ws.send(messageString);
  44. });
  45. // Listen for messages from the client
  46. ws.on('message', (message: any) => {
  47. console.log(`Received message from client: ${message}`);
  48. });
  49. // Unsubscribe from the subject when the client disconnects
  50. ws.on('close', () => {
  51. console.log('Client disconnected');
  52. subscription.unsubscribe();
  53. });
  54. });
  55. }
  56. // Create a new WebSocket client
  57. function connectWebSocket() {
  58. // Create a new WebSocket client
  59. const ws = new WebSocket('ws://localhost:8081');
  60. // Listen for the WebSocket connection to open
  61. ws.on('open', () => {
  62. console.log('Connecting to Consumer WebSocket server');
  63. // Send a message to the server
  64. ws.send('Hello, consumer server!');
  65. })
  66. // Listen for messages from the server
  67. ws.on('message', (message: string) => {
  68. let msgObj: number = JSON.parse(message)
  69. consumerTrafficStatus.next(msgObj)
  70. });
  71. // Listen for WebSocket errors
  72. ws.on('error', (element) => {
  73. console.error('WebSocket error:', element);
  74. })
  75. // Listen for the WebSocket connection to close
  76. ws.on('close', () => {
  77. console.log('Disconnected from WebSocket server');
  78. setTimeout(connectWebSocket, 1000); // Attempt to reconnect after 1 second
  79. })
  80. }
  81. createWebsocketServer()
  82. connectWebSocket();