socket-client.ts 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. import { Observable, Subject, takeWhile } from "rxjs";
  2. import { prepareResponseMessages } from "../../services/utility/prepareFISmessage";
  3. import { BaseMessage } from "../../dependencies/logging/interface/export";
  4. import { io, Socket } from "socket.io-client";
  5. import { WrappedMessage } from "../../interfaces/general.interface";
  6. import * as fs from 'fs'
  7. import { ClientInfo } from "./socket.service";
  8. let onHoldMessagesSubject: Subject<WrappedMessage> = new Subject()
  9. let toBePassedOverToApp: Subject<BaseMessage> = new Subject()
  10. // Serve static files (optional)
  11. let sender: Subject<BaseMessage> = prepareResponseMessages(5, 2000)
  12. let serverSocketUrl: string = 'http://192.168.100.96:3000'
  13. let socket: Socket
  14. establishSocketConnection(serverSocketUrl).then(() => {
  15. sender.subscribe({
  16. next: message => {
  17. makeRequest(message).subscribe({
  18. complete: () => console.log(`Request ${message.header.messageID} has acquired all responses.`)
  19. })
  20. }
  21. })
  22. })
  23. // the interface the client Program will make without having to decide transport protocol
  24. function makeRequest(request: BaseMessage): Observable<any> {
  25. return new Observable((response) => {
  26. sendMessage(request)
  27. toBePassedOverToApp.subscribe({
  28. next: (message: BaseMessage) => {
  29. // console.log(message.header.messageName)
  30. // The identification of responses mapping to the request be adjusted accordingly
  31. // For now it's a simple demulti-plexing
  32. if (message.header.messageID == request.header.messageID && message.header.messageName != 'Complete') {
  33. response.next(message)
  34. }
  35. if (message.header.messageID == request.header.messageID && message.header.messageName == 'Complete') {
  36. response.complete()
  37. }
  38. },
  39. error: err => console.error(err),
  40. complete: () => { }
  41. })
  42. })
  43. }
  44. // socket util: Assuming that the client program would already have something like this in place
  45. async function establishSocketConnection(serverUrl: string): Promise<any> {
  46. return new Promise((resolve, reject) => {
  47. try {
  48. socket = io(serverUrl, {
  49. reconnection: true, // Enable automatic reconnections
  50. reconnectionAttempts: 100, // Retry up to 10 times
  51. reconnectionDelay: 500, // Start with a 500ms delay
  52. reconnectionDelayMax: 10000, // Delay can grow to a max of 10 seconds
  53. randomizationFactor: 0.3,
  54. })
  55. // Check if it's a previuos client.
  56. let data: ClientInfo | null = checkOwnClientInfo('info.json')
  57. if (data) {
  58. socket.emit('notification', { agenda: 'existingClient', data: data })
  59. } else {
  60. socket.emit('notification', { agenda: 'newClient' })
  61. }
  62. // Listen for a connection event
  63. socket.on('connect', () => {
  64. // socket.emit('Hello from the client!')
  65. console.log('Connected to the server:', socket.id)
  66. });
  67. // Listen for messages from the server
  68. socket.on('response', (msg: WrappedMessage) => {
  69. console.log('Message from server:', msg.payload.header.messageName, ' for ', msg.payload.header.messageID);
  70. // Check the sequence by ensuring the message value before the current message exists, then pass them over to "App"
  71. // onHoldMessagesSubject.next(msg)
  72. // checkMessage(msg).then(() => [
  73. // toBePassedOverToApp.next(msg.payload as BaseMessage)
  74. // ]).catch((err) => console.error(err))
  75. toBePassedOverToApp.next(msg.payload as BaseMessage)
  76. })
  77. socket.on('notification', (msg: any) => {
  78. if (msg.notification == 'Your credentials') {
  79. console.log(`Assigned client Name: ${msg.socketInfo.clientName}`)
  80. writeFile(msg.socketInfo as ClientInfo)
  81. }
  82. if (msg.notification == 'Your updated credentials') {
  83. console.log(`Updated socket ID: `, msg)
  84. // writeFile(msg.socketInfo as ClientInfo)
  85. }
  86. if (msg.notification == 'Failed Request') {
  87. console.log(`Resending request...`, msg.data.header.messageID)
  88. setTimeout(() => {
  89. sender.next(msg.data)
  90. }, 1000)
  91. }
  92. })
  93. resolve('')
  94. // Handle disconnection
  95. socket.on('disconnect', () => {
  96. console.log('Disconnected from the server');
  97. // receiverConnectionState.next('OFFLINE')
  98. });
  99. }
  100. catch (error) {
  101. reject(error)
  102. }
  103. })
  104. }
  105. function checkOwnClientInfo(filename: string): ClientInfo | null {
  106. // Check if the file exists
  107. if (fs.existsSync(filename)) {
  108. try {
  109. // Read the file contents
  110. const fileData = fs.readFileSync(filename, 'utf8');
  111. // If the file is empty, return an error
  112. if (fileData.trim() === "") {
  113. throw new Error("File is empty");
  114. }
  115. // Parse and return the data if present
  116. const jsonData = JSON.parse(fileData);
  117. return jsonData;
  118. } catch (err) {
  119. // Handle parsing errors or other file-related errors
  120. console.error("Error reading or parsing file:", err.message);
  121. return null;
  122. }
  123. } else {
  124. console.error("File does not exist");
  125. return null;
  126. }
  127. }
  128. function writeFile(data: ClientInfo) {
  129. // Write JSON data to a file
  130. fs.writeFile('info.json', JSON.stringify(data, null, 2), (err) => {
  131. if (err) {
  132. console.error('Error writing file', err);
  133. } else {
  134. console.log('File has been written');
  135. }
  136. });
  137. }
  138. async function sendMessage(message: BaseMessage): Promise<any> {
  139. return new Promise((resolve, reject) => {
  140. try {
  141. // extra precaution: According to chatgpt, if disconnected, then the payload will be loaded back in event queue whilst the socket will try to reestablish connection
  142. // https://socket.io/docs/v4/client-offline-behavior/
  143. socket.emit('request', message); // inherently an aysnc
  144. console.log(`SocketEmit() for message to event queue ${message.header.messageID}`)
  145. resolve('')
  146. } catch (error) {
  147. console.error('Error emitting message:', error);
  148. sender.next(message)
  149. reject(error)
  150. }
  151. })
  152. }
  153. // SO concept will be that if the message behind it is received, then
  154. async function checkMessage(message: WrappedMessage): Promise<any> {
  155. return new Promise((resolve, reject) => {
  156. if (message.previousMessageID) {
  157. onHoldMessagesSubject.pipe(
  158. takeWhile(item => message.previousMessageID === item.thisMessageID)
  159. ).subscribe({
  160. complete: () => {
  161. resolve('previousMessageID matched')
  162. // console.log(`${message.payload.header.messageID} : Previous messageID(${message.previousMessageID}) matched`)
  163. // console.log(`matched`)
  164. }
  165. })
  166. } else {
  167. console.log('No previous messageID. This should be the first message')
  168. resolve('No previous message ID. Please Proceed.')
  169. }
  170. })
  171. }