publisher.ts 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import { BehaviorSubject, Subject, buffer, bufferWhen, elementAt, filter, interval } from 'rxjs';
  2. import { DataPrepService } from '../services/dataprep.service';
  3. import { StorageLocation } from '../types/interface';
  4. import { Server } from 'socket.io'
  5. import { io } from 'socket.io-client';
  6. let msgData = new DataPrepService()
  7. /* ---------------------- COMPLEX OPERATION ------------------------------ */
  8. let msgPayload: Subject<any> = new Subject();
  9. let consumerTrafficStatus: Subject<any> = new Subject()
  10. let bufferTrigger: BehaviorSubject<boolean> = new BehaviorSubject(true)
  11. let mongoStorage: StorageLocation = {
  12. type: `MongoDB`,
  13. url: `mongodb://192.168.100.59:27017/default`
  14. }
  15. msgData.loadObsData(mongoStorage, msgPayload)
  16. consumerTrafficStatus.subscribe((element) => {
  17. if (element >= 2) {
  18. let warning: boolean = false
  19. bufferTrigger.next(warning)
  20. console.log(`Heap Load Exceeded on client side. Buffering.....`)
  21. } else {
  22. let warning: boolean = true
  23. bufferTrigger.next(warning)
  24. // console.log(`Releasing the buffer`)
  25. }
  26. })
  27. // Create a WebSocket server
  28. function createWebsocketServer() {
  29. const io = new Server({})
  30. io.on('connection', (socket) => {
  31. console.log(`Consumer Connected`)
  32. // Subscribe to the subject when a client connects
  33. const subscription = msgPayload.pipe(
  34. buffer(bufferTrigger.pipe(filter(Boolean)))
  35. ).subscribe((element) => {
  36. console.log(`Emitting ${element.length} messages`)
  37. // let messageString = JSON.stringify(element)
  38. socket.emit(`payload`, element);
  39. });
  40. //Listen for messages from consumer/client
  41. socket.on(`message`, (message) => {
  42. console.log(`Received message from client: ${message}`)
  43. // Send a message back to the client
  44. socket.send(`${message} received!`);
  45. })
  46. // Listen for the socket to be closed
  47. socket.on('disconnect', () => {
  48. console.log('Client disconnected');
  49. subscription.unsubscribe();
  50. consumerTrafficStatus.next(false)
  51. });
  52. })
  53. io.listen(8080)
  54. }
  55. // Create a new WebSocket client
  56. function connectWebSocket() {
  57. const socket = io('http://localhost:8081');
  58. socket.on('connect', () => {
  59. console.log('Connected to publisher');
  60. });
  61. socket.on('trafficControl', (report) => {
  62. consumerTrafficStatus.next(report);
  63. });
  64. socket.on('disconnect', () => {
  65. console.log('Disconnected from server');
  66. // Attempt to reconnect every 3 seconds
  67. setTimeout(() => {
  68. console.log('Attempting to reconnect...');
  69. socket.connect();
  70. }, 3000);
  71. });
  72. }
  73. createWebsocketServer()
  74. connectWebSocket();