publisher.ts 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. import { BehaviorSubject, Observable, Subject, buffer, bufferWhen, elementAt, filter, interval, map } from 'rxjs';
  2. import { DataPrepService } from '../services/dataprep.service';
  3. import { StorageLocation } from '../types/interface';
  4. import { Server } from 'socket.io'
  5. import { io } from 'socket.io-client';
  6. import { UtilityService } from '../services/utility.service';
  7. let msgData = new DataPrepService()
  8. let util = new UtilityService()
  9. util.checkMaxHeap()
  10. /* ---------------------- COMPLEX OPERATION ------------------------------ */
  11. let msgPayload: Subject<any> = new Subject();
  12. let consumerTrafficStatus: Subject<any> = new Subject()
  13. let inputliveSubject = new Subject();
  14. let OutputliveSubject = new Subject();
  15. let mongoStorage: StorageLocation = {
  16. type: `MongoDB`,
  17. url: `mongodb://192.168.100.59:27017/default`
  18. }
  19. msgData.loadObsData(mongoStorage, msgPayload)
  20. // Create a WebSocket server
  21. function createWebsocketServer() {
  22. const io = new Server({})
  23. io.on('connection', (socket) => {
  24. console.log(`Connected to Clients/Consumers`)
  25. // let notifier = consumerTrafficStatus.pipe(filter(value => !value.pause))
  26. // Subscribe to the subject when a client connects
  27. const subscription = msgPayload.pipe(
  28. backlogBuffer(msgPayload, consumerTrafficStatus)
  29. ).subscribe((element) => {
  30. if (element.length >= 25000) {
  31. const chunkSize = 1024; // Specify the desired chunk size in bytes
  32. const totalChunks = Math.ceil(element.length / chunkSize);
  33. Array.from({ length: totalChunks }, (_, i) => {
  34. const start = i * chunkSize;
  35. const end = start + chunkSize;
  36. const chunk = element.slice(start, end);
  37. console.log(`Emitting ${element.length} messages`)
  38. socket.emit('payload', chunk);
  39. });
  40. socket.emit(`end`)
  41. } else {
  42. // console.log(`Emitting ${element.length} messages`)
  43. socket.emit(`payload`, element);
  44. }
  45. });
  46. // Listen for the socket to be closed
  47. socket.on('disconnect', () => {
  48. console.log('Client/Consumer disconnected');
  49. subscription.unsubscribe();
  50. });
  51. })
  52. io.listen(8080)
  53. }
  54. // Create a new WebSocket client
  55. function connectWebSocket() {
  56. const socket = io('http://localhost:8081');
  57. socket.on('connect', () => {
  58. console.log(`Connected to Consumer'Server.`);
  59. });
  60. // Subsribe to trafficControl from consumer's side
  61. socket.on('trafficControl', (report: any) => {
  62. console.log(report)
  63. consumerTrafficStatus.next(report);
  64. });
  65. socket.on('disconnect', () => {
  66. console.log(`Disconnected from Consumer'Server`);
  67. // Attempt to reconnect every 3 seconds
  68. setTimeout(() => {
  69. console.log('Attempting to reconnect...');
  70. socket.connect();
  71. }, 3000);
  72. });
  73. }
  74. function backlogBuffer(msgPayload: Subject<any>, notifier: Observable<any>) {
  75. // Pulse by each message to tell output to keep releasing the buffer unless requested by client/consumer not to do so.
  76. msgPayload.subscribe(inputliveSubject)
  77. // Notfier act as a input from client/consumer's side to listen to request for buffering or releasing for the buffer
  78. notifier.subscribe(inputliveSubject)
  79. let toBuffer = false // true or false
  80. inputliveSubject.subscribe((element: any) => {
  81. if ('buffer' in element && element.buffer == true) {
  82. // Start the buffer
  83. toBuffer = element.buffer
  84. console.log(`Buffering....`)
  85. }
  86. if (element && toBuffer == false) {
  87. // Continue to release the buffer
  88. OutputliveSubject.next(element)
  89. }
  90. })
  91. return buffer(OutputliveSubject);
  92. }
  93. createWebsocketServer()
  94. connectWebSocket();