consumer_1.ts 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. import { SearchService } from "../services/query.service"
  2. import _ = require("lodash")
  3. import { Subject, interval } from "rxjs";
  4. import { io } from "socket.io-client";
  5. import { Server } from "socket.io"
  6. import { UtilityService } from "../services/utility.service";
  7. const util = new UtilityService()
  8. util.checkMaxHeap()
  9. /* ---------------------- COMPLEX OPERATION ------------------------------ */
  10. // Create new Subject to handle incoming data from remote subscription
  11. let payload: Subject<any> = new Subject()
  12. payload.subscribe((element) => {
  13. // console.log(`Received message from server: ${element.appData.msgId}`);
  14. })
  15. // Create new Subject to monitor and broadcast heap size
  16. let trafficControl: Subject<any> = new Subject()
  17. let intervalChecking = interval(1000)
  18. intervalChecking.subscribe(() => {
  19. trafficControl.next(util.checkHeapSize())
  20. })
  21. // Create a WebSocket server
  22. function createWebsocketServer() {
  23. const io = new Server({})
  24. // Listen for connections to the WebSocket server
  25. io.on(`connection`, (socket) => {
  26. console.log(`Connected to clients`);
  27. // Subscribe to the subject when a client connects
  28. const subscription = trafficControl.subscribe((heapUsagePercentage: number) => {
  29. // Boolean to be emitted to server to tell them to buffer the incoming
  30. let toBuffer = false;
  31. // Set to 3% at the moment
  32. if (heapUsagePercentage >= 3) {
  33. toBuffer = true
  34. } else {
  35. toBuffer = false
  36. }
  37. // Stream traffic control obs data over to designated client/consumer.
  38. socket.emit('trafficControl', {
  39. "consumerHeapUsage": heapUsagePercentage.toFixed(2),
  40. "buffer": toBuffer
  41. });
  42. });
  43. socket.on(`disconnect`, () => {
  44. console.log(`Clients Disconnected`)
  45. subscription.unsubscribe()
  46. })
  47. })
  48. // create localhost server port 8080
  49. io.listen(8081);
  50. }
  51. // Create a new WebSocket client
  52. function connectWebSocket() {
  53. const socket = io('http://localhost:8080')
  54. // Listen for the WebSocket connection to open
  55. socket.on(`connect`, () => {
  56. console.log(`Connected to publisher'server`)
  57. // Listen for messages from the server
  58. socket.on('message', (message) => {
  59. trafficControl.next(message);
  60. });
  61. })
  62. // Receive payload from publisher and push them into local Subject
  63. socket.on('payload', (data: any[]) => {
  64. if(data.length > 0){
  65. // just to check if there's any data
  66. console.log(`Message received from publisher: ${data.length}`)
  67. } else {
  68. console.log(`Publisher is buffering. Data received = ${data.length}`)
  69. }
  70. data.forEach(element => {
  71. payload.next(element)
  72. // console.log(element.appData.msgId)
  73. // trafficControl.next(util.checkHeapSize())
  74. });
  75. })
  76. // Listen for the disconnect event
  77. socket.on('disconnect', () => {
  78. console.log(`Disconnected from publisher'server`);
  79. // Attempt to reconnect every 1 second
  80. setTimeout(() => {
  81. console.log('Attempting to reconnect...');
  82. socket.connect();
  83. }, 1000);
  84. });
  85. // Listen for errors
  86. socket.on('error', (error) => {
  87. console.error('Socket error:', error);
  88. });
  89. }
  90. createWebsocketServer()
  91. connectWebSocket();