publisher.ts 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. import { BehaviorSubject, Subject, buffer, bufferWhen, elementAt, filter, interval, map } from 'rxjs';
  2. import { DataPrepService } from '../services/dataprep.service';
  3. import { StorageLocation } from '../types/interface';
  4. import { Server } from 'socket.io'
  5. import { io } from 'socket.io-client';
  6. import { UtilityService } from '../services/utility.service';
  7. let msgData = new DataPrepService()
  8. let util = new UtilityService()
  9. util.checkMaxHeap()
  10. /* ---------------------- COMPLEX OPERATION ------------------------------ */
  11. let msgPayload: Subject<any> = new Subject();
  12. let consumerTrafficStatus: Subject<any> = new Subject()
  13. let mongoStorage: StorageLocation = {
  14. type: `MongoDB`,
  15. url: `mongodb://192.168.100.59:27017/default`
  16. }
  17. msgData.loadObsData(mongoStorage, msgPayload)
  18. // Create a WebSocket server
  19. function createWebsocketServer() {
  20. const io = new Server({})
  21. io.on('connection', (socket) => {
  22. console.log(`Connected to Clients/Consumers`)
  23. let notifier = consumerTrafficStatus.pipe(filter(value => !value.pause))
  24. // Subscribe to the subject when a client connects
  25. const subscription = msgPayload.pipe(
  26. buffer(notifier)
  27. ).subscribe((element) => {
  28. console.log(`Emitting ${element.length} messages`)
  29. socket.emit(`payload`, element);
  30. });
  31. // Listen for the socket to be closed
  32. socket.on('disconnect', () => {
  33. console.log('Client/Consumer disconnected');
  34. subscription.unsubscribe();
  35. });
  36. })
  37. io.listen(8080)
  38. }
  39. // Create a new WebSocket client
  40. function connectWebSocket() {
  41. const socket = io('http://localhost:8081');
  42. socket.on('connect', () => {
  43. console.log(`Connected to Consumer'Server.`);
  44. });
  45. socket.on('trafficControl', (report: any) => {
  46. console.log(report)
  47. consumerTrafficStatus.next(report);
  48. });
  49. socket.on('disconnect', () => {
  50. console.log(`Disconnected from Consumer'Server`);
  51. // Attempt to reconnect every 3 seconds
  52. setTimeout(() => {
  53. console.log('Attempting to reconnect...');
  54. socket.connect();
  55. }, 3000);
  56. });
  57. }
  58. createWebsocketServer()
  59. connectWebSocket();