socket.ts 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. import { Observable, Subject, takeWhile } from "rxjs";
  2. import { prepareResponseMessages } from "../services/utility/prepareFISmessage";
  3. import { BaseMessage } from "../dependencies/logging/interface/export";
  4. import { WrappedMessage } from "../services/retransmission.service";
  5. import { io, Socket } from "socket.io-client";
  6. let onHoldMessagesSubject: Subject<WrappedMessage> = new Subject()
  7. let toBePassedOverToApp: Subject<BaseMessage> = new Subject()
  8. // Serve static files (optional)
  9. let sender: Subject<BaseMessage> = prepareResponseMessages(1, 2000)
  10. let serverSocketUrl: string = 'http://localhost:3000'
  11. let socket: Socket
  12. establishSocketConnection(serverSocketUrl)
  13. // interval(1000).subscribe(value => { // just to test if the emission is in sequence after reconnection
  14. // console.log(value)
  15. // socket.emit('interval', value)
  16. // })
  17. sender.subscribe({
  18. next: message => {
  19. makeRequest(message).subscribe({
  20. complete: () => console.log(`Request ${message.header.messageID} has acquired all responses.`)
  21. })
  22. }
  23. })
  24. // the interface the client Program will make without having to decide transport protocol
  25. function makeRequest(request: BaseMessage): Observable<any> {
  26. return new Observable((response) => {
  27. sendMessage(request)
  28. toBePassedOverToApp.subscribe({
  29. next: (message: BaseMessage) => {
  30. // The identification of responses mapping to the request be adjusted accordingly
  31. // For now it's a simple demulti-plexing
  32. if (message.header.messageID == request.header.messageID && message.header.messageName == 'ResponseData') {
  33. response.next(message)
  34. }
  35. if (message.header.messageID == request.header.messageID && message.header.messageName == 'Complete') {
  36. response.complete()
  37. }
  38. },
  39. error: err => console.error(err),
  40. complete: () => { }
  41. })
  42. })
  43. }
  44. // socket util: Assuming that the client program would already have something like this in place
  45. function establishSocketConnection(serverUrl: string) {
  46. socket = io(serverUrl, {
  47. reconnection: true, // Enable automatic reconnections
  48. reconnectionAttempts: 100, // Retry up to 10 times
  49. reconnectionDelay: 500, // Start with a 500ms delay
  50. reconnectionDelayMax: 10000, // Delay can grow to a max of 10 seconds
  51. randomizationFactor: 0.3,
  52. })
  53. // Listen for a connection event
  54. socket.on('connect', () => {
  55. // socket.emit('Hello from the client!')
  56. console.log('Connected to the server:', socket.id)
  57. // receiverConnectionState.next('ONLINE')
  58. });
  59. // Listen for messages from the server
  60. socket.on('message', (msg: WrappedMessage) => {
  61. console.log('Message from server:', msg.payload.header.messageID);
  62. // Check the sequence by ensuring the message value before the current message exists, then pass them over to "App"
  63. onHoldMessagesSubject.next(msg)
  64. checkMessage(msg).then(() => [
  65. toBePassedOverToApp.next(msg.payload as BaseMessage)
  66. ]).catch((err) => console.error(err))
  67. })
  68. socket.on('notification', (msg: string) => {
  69. console.log(msg)
  70. })
  71. // Handle disconnection
  72. socket.on('disconnect', () => {
  73. console.log('Disconnected from the server');
  74. // receiverConnectionState.next('OFFLINE')
  75. });
  76. }
  77. async function sendMessage(message: BaseMessage): Promise<any> {
  78. return new Promise((resolve, reject) => {
  79. try {
  80. // extra precaution: According to chatgpt, if disconnected, then the payload will be loaded back in event queue whilst the socket will try to reestablish connection
  81. // https://socket.io/docs/v4/client-offline-behavior/
  82. socket.emit('message', message); // inherently an aysnc
  83. console.log(`SocketEmit() for message to event queue ${message.header.messageID}`)
  84. resolve('')
  85. } catch (error) {
  86. console.error('Error emitting message:', error);
  87. this.wrappedMessage.next(message)
  88. reject(error)
  89. } ``
  90. })
  91. }
  92. // SO concept will be that if the message behind it is received, then
  93. async function checkMessage(message: WrappedMessage): Promise<any> {
  94. return new Promise((resolve, reject) => {
  95. if (message.previousMessageID) {
  96. onHoldMessagesSubject.pipe(
  97. takeWhile(item => item.payload.header.messageID == message.previousMessageID)
  98. ).subscribe({
  99. complete: () => {
  100. resolve('previousMessageID matched')
  101. // console.log(`${message.payload.header.messageID} : Previous messageID(${message.previousMessageID}) matched`)
  102. console.log(`matched`)
  103. }
  104. })
  105. } else {
  106. console.log('No previous messageID. This should be the first message')
  107. resolve('No previous message ID. Please Proceed.')
  108. }
  109. })
  110. }