consumer_1.ts 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. import { SearchService } from "../services/query.service"
  2. import _ = require("lodash")
  3. import { Subject, interval } from "rxjs";
  4. import * as WebSocket from 'ws';
  5. /* ---------------------- COMPLEX OPERATION ------------------------------ */
  6. // Check Heap size
  7. // const v8 = require('v8');
  8. // const heapStats = v8.getHeapStatistics();
  9. // const maxHeapSize = heapStats.heap_size_limit / (1024 * 1024); // Convert to MB
  10. // console.log(`Current maximum heap size: ${maxHeapSize.toFixed(2)} MB`);
  11. // const used = process.memoryUsage().heapUsed / 1024 / 1024;
  12. // const total = process.memoryUsage().heapTotal / 1024 / 1024;
  13. // console.log(`Heap memory usage: ${used.toFixed(2)} MB`);
  14. // console.log(`Total heap size: ${total.toFixed(2)} MB`);
  15. function checkHeapSize(): any {
  16. let currentHeapSize = process.memoryUsage().heapUsed / 1024 / 1024;
  17. let allocatedHeapSize = 512;
  18. let percentage = (currentHeapSize / allocatedHeapSize) * 100;
  19. console.log(`Consumer_! Heap currentHeapSize: ${currentHeapSize} MB. Percentage: ${percentage}`);
  20. return percentage
  21. }
  22. // Create new Subject to handle incoming data from remote subscription
  23. let payload: Subject<any> = new Subject()
  24. payload.subscribe((element) => {
  25. console.log(`Received message from server: ${element.appData.msgId}`);
  26. // tell traffic control to check heap size
  27. // trafficControl.next(checkHeapSize())
  28. })
  29. // Create new Subject to monitor and broadcast heap size
  30. let trafficControl: Subject<any> = new Subject()
  31. let intervalChecking = interval(1000)
  32. intervalChecking.subscribe(() => {
  33. trafficControl.next(checkHeapSize())
  34. })
  35. // Create a WebSocket server
  36. function createWebsocketServer() {
  37. const wss = new WebSocket.Server({ port: 8081 });
  38. // Listen for connections to the WebSocket server
  39. wss.on('connection', (ws: WebSocket) => {
  40. console.log('Client connected');
  41. // Subscribe to the subject when a client connects
  42. const subscription = trafficControl.subscribe((element) => {
  43. // Stringify heap status and send data over to connected client
  44. const messageString = JSON.stringify(element);
  45. ws.send(messageString);
  46. });
  47. // Listen for messages from the client
  48. ws.on('message', (message: any) => {
  49. console.log(`Received message from client: ${message}`);
  50. });
  51. // Unsubscribe from the subject when the client disconnects
  52. ws.on('close', () => {
  53. console.log('Client disconnected');
  54. subscription.unsubscribe();
  55. });
  56. });
  57. }
  58. // Create a new WebSocket client
  59. function connectWebSocket() {
  60. const ws = new WebSocket('ws://localhost:8080');
  61. // Listen for the WebSocket connection to open
  62. ws.on('open', () => {
  63. console.log('Connecting to Publisher WebSocket server');
  64. // Send a message to the server
  65. ws.send('Hello, publisher server!');
  66. })
  67. // Listen for messages from the server
  68. ws.on('message', (message: string) => {
  69. let msgObj: any[] = JSON.parse(message)
  70. msgObj.forEach(element => {
  71. payload.next(element)
  72. });
  73. });
  74. // Listen for WebSocket errors
  75. ws.on('error', (element) => {
  76. console.error('WebSocket error:', element);
  77. })
  78. // Listen for the WebSocket connection to close
  79. ws.on('close', () => {
  80. console.log('Disconnected from WebSocket server');
  81. setTimeout(connectWebSocket, 1000); // Attempt to reconnect after 1 second
  82. })
  83. }
  84. createWebsocketServer()
  85. connectWebSocket();