publisher.ts 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. import { BehaviorSubject, Subject, buffer, bufferWhen, elementAt, filter, interval } from 'rxjs';
  2. import { DataPrepService } from '../services/dataprep.service';
  3. import { StorageLocation } from '../types/interface';
  4. import { Server } from 'socket.io'
  5. import { io } from 'socket.io-client';
  6. import { UtilityService } from '../services/utility.service';
  7. let msgData = new DataPrepService()
  8. let util = new UtilityService()
  9. util.checkMaxHeap()
  10. /* ---------------------- COMPLEX OPERATION ------------------------------ */
  11. let msgPayload: Subject<any> = new Subject();
  12. let consumerTrafficStatus: Subject<any> = new Subject()
  13. let bufferTrigger: BehaviorSubject<boolean> = new BehaviorSubject(false)
  14. let mongoStorage: StorageLocation = {
  15. type: `MongoDB`,
  16. url: `mongodb://192.168.100.59:27017/default`
  17. }
  18. msgData.loadObsData(mongoStorage, msgPayload)
  19. consumerTrafficStatus.subscribe((consumerHeapUsage) => {
  20. if (consumerHeapUsage >= 2) {
  21. // If consumerHeapUsage is over 2 %, it will trigger buffer
  22. bufferTrigger.next(true)
  23. } else {
  24. bufferTrigger.next(false)
  25. }
  26. })
  27. bufferTrigger.subscribe((element) => {
  28. if(element){
  29. console.log(`Heap Load Exceeded on client side. Buffering.....`)
  30. }
  31. })
  32. // Create a WebSocket server
  33. function createWebsocketServer() {
  34. const io = new Server({})
  35. io.on('connection', (socket) => {
  36. console.log(`Connected to Clients/Consumers`)
  37. // Subscribe to the subject when a client connects
  38. const subscription = msgPayload.pipe(
  39. buffer(bufferTrigger.pipe(filter(value => !value)))
  40. ).subscribe((element) => {
  41. console.log(`Emitting ${element.length} messages`)
  42. socket.emit(`payload`, element);
  43. });
  44. //Listen for messages from consumer/client
  45. socket.on(`message`, (message) => {
  46. console.log(`Received message from client: ${message}`)
  47. util.checkHeapSize()
  48. // Send a message back to the client
  49. socket.send(`${message} received!`);
  50. })
  51. // Listen for the socket to be closed
  52. socket.on('disconnect', () => {
  53. console.log('Client/Consumer disconnected');
  54. // Need to put next(true) trigger the buffer due to disconnection from the other side.
  55. bufferTrigger.next(true)
  56. subscription.unsubscribe();
  57. });
  58. })
  59. io.listen(8080)
  60. }
  61. // Create a new WebSocket client
  62. function connectWebSocket() {
  63. const socket = io('http://localhost:8081');
  64. socket.on('connect', () => {
  65. console.log(`Connected to Consumer'Server.`);
  66. });
  67. socket.on('trafficControl', (report: number) => {
  68. consumerTrafficStatus.next(report);
  69. });
  70. socket.on('disconnect', () => {
  71. console.log(`Disconnected from Consumer'Server`);
  72. // Attempt to reconnect every 3 seconds
  73. setTimeout(() => {
  74. console.log('Attempting to reconnect...');
  75. socket.connect();
  76. }, 3000);
  77. });
  78. }
  79. createWebsocketServer()
  80. connectWebSocket();