publisher.ts 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. import { BehaviorSubject, Subject, buffer, bufferWhen, elementAt, filter, interval } from 'rxjs';
  2. import { DataPrepService } from '../services/dataprep.service';
  3. import { StorageLocation } from '../types/interface';
  4. import { Server } from 'socket.io'
  5. import { io } from 'socket.io-client';
  6. import { UtilityService } from '../services/utility.service';
  7. let msgData = new DataPrepService()
  8. let util = new UtilityService()
  9. util.checkMaxHeap()
  10. /* ---------------------- COMPLEX OPERATION ------------------------------ */
  11. let msgPayload: Subject<any> = new Subject();
  12. let consumerTrafficStatus: Subject<any> = new Subject()
  13. let bufferTrigger: BehaviorSubject<boolean> = new BehaviorSubject(true)
  14. let mongoStorage: StorageLocation = {
  15. type: `MongoDB`,
  16. url: `mongodb://192.168.100.59:27017/default`
  17. }
  18. msgData.loadObsData(mongoStorage, msgPayload)
  19. consumerTrafficStatus.subscribe((element) => {
  20. if (element >= 2) {
  21. let warning: boolean = false
  22. bufferTrigger.next(warning)
  23. console.log(`Heap Load Exceeded on client side. Buffering.....`)
  24. } else {
  25. let warning: boolean = true
  26. bufferTrigger.next(warning)
  27. }
  28. })
  29. // Create a WebSocket server
  30. function createWebsocketServer() {
  31. const io = new Server({})
  32. io.on('connection', (socket) => {
  33. console.log(`Connected to Clients/Consumers`)
  34. // Subscribe to the subject when a client connects
  35. const subscription = msgPayload.pipe(
  36. buffer(bufferTrigger.pipe(filter(Boolean)))
  37. ).subscribe((element) => {
  38. console.log(`Emitting ${element.length} messages`)
  39. socket.emit(`payload`, element);
  40. });
  41. //Listen for messages from consumer/client
  42. socket.on(`message`, (message) => {
  43. console.log(`Received message from client: ${message}`)
  44. util.checkHeapSize()
  45. // Send a message back to the client
  46. socket.send(`${message} received!`);
  47. })
  48. // Listen for the socket to be closed
  49. socket.on('disconnect', () => {
  50. console.log('Client/Consumer disconnected');
  51. // Need to put next trigger the buffer. Alternavtively, can use bufferTrigfer.next(false)
  52. consumerTrafficStatus.next(2)
  53. subscription.unsubscribe();
  54. });
  55. })
  56. io.listen(8080)
  57. }
  58. // Create a new WebSocket client
  59. function connectWebSocket() {
  60. const socket = io('http://localhost:8081');
  61. socket.on('connect', () => {
  62. console.log(`Connected to Consumer'Server.`);
  63. });
  64. socket.on('trafficControl', (report: number) => {
  65. consumerTrafficStatus.next(report);
  66. });
  67. socket.on('disconnect', () => {
  68. console.log(`Disconnected from Consumer'Server`);
  69. // Attempt to reconnect every 3 seconds
  70. setTimeout(() => {
  71. console.log('Attempting to reconnect...');
  72. socket.connect();
  73. }, 3000);
  74. });
  75. }
  76. createWebsocketServer()
  77. connectWebSocket();