socket.utils.ts 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. import { BehaviorSubject, Observable, Observer, Subject } from 'rxjs';
  2. import { createServer } from 'http';
  3. import { Server, Socket as SocketForConnectedClient } from 'socket.io';
  4. import { io, Socket as ClientSocket } from 'socket.io-client';
  5. import * as fs from 'fs'
  6. import { v4 as uuidv4 } from 'uuid'
  7. import { ConnectionState, Transport, TransportEvent, TransportMessage } from '../interface/connector.interface';
  8. import { ConnectedClientSocket, ConnectedServerSocket } from '../transport/websocket';
  9. import { EventMessage } from '../interface/transport.interface';
  10. export function startSocketServer(port: number): Observable<SocketForConnectedClient> {
  11. return new Observable((observer) => {
  12. try {
  13. console.log(`Socket Server ${port} Started....`)
  14. let httpServer = createServer();
  15. let socketServer = new Server(httpServer)
  16. // something wrong here
  17. socketServer.on('connection', (socket) => {
  18. observer.next(socket)
  19. })
  20. socketServer.engine.on("connection_error", (err) => {
  21. console.log(err.req); // the request object
  22. console.log(err.code); // the error code, for example 1
  23. console.log(err.message); // the error message, for example "Session ID unknown"
  24. console.log(err.context); // some additional error context
  25. });
  26. // Start the socketServer
  27. httpServer.listen(port)
  28. } catch (error) {
  29. observer.error(error)
  30. }
  31. })
  32. }
  33. export async function startClientSocketConnection(serverUrl: string): Promise<ClientSocket> {
  34. return new Promise((resolve, reject) => {
  35. try {
  36. // let clientSocket = io(serverUrl)
  37. let clientSocket: ClientSocket = io(serverUrl, {
  38. reconnection: true, // Enable automatic reconnections
  39. reconnectionAttempts: 1000, // Retry up to 10 times
  40. reconnectionDelay: 500, // Start with a 500ms delay
  41. reconnectionDelayMax: 10000, // Delay can grow to a max of 10 seconds
  42. randomizationFactor: 0.3,
  43. })
  44. resolve(clientSocket)
  45. }
  46. catch (error) {
  47. reject(error)
  48. }
  49. })
  50. }
  51. // After establishing connection to the server, set up the credentials, confirm whether or not if there's any credentials, if not ask for one from the server
  52. export function handleClientSocketConnection(socket: ClientSocket, serversConnected: ConnectedServerSocket[]): Observable<TransportEvent> {
  53. return new Observable((eventNotification: Observer<TransportEvent>) => {
  54. let clientName!: string
  55. let buffer: any[] = []
  56. let receiverProfileInfo!: ConnectedServerSocket
  57. // Listen for a connection event
  58. socket.on('connect', () => {
  59. console.log('Connected to the server:', socket.id)
  60. if (clientName) {
  61. checkOwnClientInfo(clientName).then((profile: ConnectedServerSocket) => {
  62. receiverProfileInfo = profile
  63. socket.emit('profile', {
  64. name: 'Old Client',
  65. data: profile
  66. })
  67. }).catch((error) => {
  68. socket.emit('profile', {
  69. name: 'New Client',
  70. data: null
  71. })
  72. })
  73. } else {
  74. socket.emit('profile', {
  75. name: 'New Client',
  76. data: null
  77. })
  78. }
  79. });
  80. // Listen for messages from the server. Generally here's the responses
  81. socket.on('message', (msg: any) => {
  82. console.log(`Websocket Client Transport Receieve Msg`, msg.id)
  83. if (receiverProfileInfo) {
  84. // publish to event
  85. eventNotification.next({
  86. id: uuidv4(),
  87. event: 'New Message',
  88. data: msg
  89. })
  90. } else {
  91. // Do nothing. just store in local array first. Cannot process without information. but then again, don['t need information if acting as client
  92. // but for consistency sake, will impose the standard
  93. buffer.push(msg) // store locally for now
  94. }
  95. })
  96. socket.on('profile', (data: { name: string, message: any }) => {
  97. console.log(data)
  98. if (data.name == 'New Profile') {
  99. console.log(`Assigned client Name: ${(data.message as ConnectedServerSocket).id}`)
  100. receiverProfileInfo = data.message as ConnectedServerSocket
  101. writeFile(data.message as ConnectedServerSocket, (data.message as ConnectedServerSocket).id).then(() => {
  102. clientName = receiverProfileInfo.id
  103. // broadcast event to allow retransmission to release buffer
  104. eventNotification.next({
  105. id: uuidv4(),
  106. event: `New Server`,
  107. data: {
  108. clientId: (data.message as ConnectedServerSocket).id,
  109. message: `New Websocket Channel ${(data.message as ConnectedServerSocket).id} established.`
  110. } as EventMessage
  111. })
  112. }).catch((error) => { }) // do nothing at the moment.
  113. // Update websocket instance record
  114. receiverProfileInfo = {
  115. id: (data.message as ConnectedServerSocket).id,
  116. dateCreated: new Date(),
  117. socketInstance: socket,
  118. connectionState: new BehaviorSubject<ConnectionState>(`ONLINE`)
  119. }
  120. serversConnected.push(receiverProfileInfo)
  121. }
  122. if (data.name == 'Adjusted Profile') {
  123. console.log(`Assigned client Name: ${(data.message as ConnectedServerSocket).id}`)
  124. receiverProfileInfo = data.message as ConnectedServerSocket
  125. writeFile(data.message as ConnectedServerSocket, (data.message as ConnectedServerSocket).id).then(() => {
  126. // broadcast event to allow retransmission to release buffer
  127. eventNotification.next({
  128. id: uuidv4(),
  129. event: 'Client Reconnected',
  130. data: {
  131. clientId: (data.message as ConnectedServerSocket).id,
  132. message: `Existing Websocket Channel ${(data.message as ConnectedServerSocket).id} re-established.`
  133. } as EventMessage
  134. })
  135. }).catch((error) => { }) // do nothing at the moment.
  136. // Update websocket instance record
  137. let clientObj: ConnectedServerSocket | undefined = serversConnected.find(obj => obj.id === (data.message as ConnectedServerSocket).id)
  138. if (clientObj) {
  139. clientObj.socketInstance = receiverProfileInfo.socketInstance
  140. clientObj.connectionState.next('ONLINE')
  141. }
  142. }
  143. if (data.name == 'Error') {
  144. console.log(`Server cannot find credentials`, data.message)
  145. // logic to request for new credentials
  146. setTimeout(() => {
  147. socket.emit('profile', {
  148. name: 'New Client',
  149. data: null
  150. })
  151. }, 2000)
  152. }
  153. })
  154. // Handle disconnection
  155. socket.on('disconnect', () => {
  156. console.log('Websocket Client disconnected from the server');
  157. if (receiverProfileInfo) {
  158. eventNotification.next({
  159. id: uuidv4(),
  160. event: `Client Disconnected`,
  161. data: {
  162. clientId: receiverProfileInfo.id,
  163. message: `Server for Channel ${receiverProfileInfo.id} disconnected.`
  164. } as EventMessage
  165. })
  166. receiverProfileInfo.connectionState.next(`OFFLINE`)
  167. }
  168. });
  169. })
  170. }
  171. // For SERVER Usage: set up socket listeners to start listening for different events
  172. export function handleNewSocketClient(socket: SocketForConnectedClient, connectedClientSocket: ConnectedClientSocket[]): Observable<TransportEvent> {
  173. return new Observable((event: Observer<TransportEvent>) => {
  174. console.log(`Setting up listeners for socket:${socket.id}`)
  175. // returns the socket client instance
  176. // listen to receiver's initiotion first before assigning 'credentials'
  177. socket.on(`profile`, (message: { name: string, data: any }) => {
  178. if (message.name == 'New Client') {
  179. let clientInstance: ConnectedClientSocket = {
  180. id: uuidv4(), // client should only be assigned at this level. And is passed around for reference pointing
  181. dateCreated: new Date(),
  182. socketInstance: socket,
  183. connectionState: new BehaviorSubject<ConnectionState>(`ONLINE`)
  184. }
  185. // send to receiver for reference
  186. socket.emit('profile', {
  187. name: `New Profile`, message: { id: clientInstance.id }
  188. })
  189. // publish first event notification
  190. event.next({
  191. id: uuidv4(),
  192. event: `New Client`,
  193. data: {
  194. clientId: clientInstance.id,
  195. message: `New Client Connected. Adapter ID assigned: ${clientInstance.id}`,
  196. payload: clientInstance
  197. } as EventMessage
  198. })
  199. // Update connected clientInstance info to adapter
  200. connectedClientSocket.push(clientInstance)
  201. startListening(socket, clientInstance, event)
  202. } else {
  203. // update first
  204. let clientInstance: ConnectedClientSocket | undefined = connectedClientSocket.find(obj => obj.id === message.data.id)
  205. if (clientInstance) {
  206. console.log(`Socket Client ${clientInstance.id} Found`)
  207. socket.emit('profile', { name: 'Adjusted Profile', message: { id: clientInstance.id } })
  208. // replace socket instance since the previous has been terminated
  209. clientInstance.socketInstance = socket
  210. // need to start listening again, because it's assigned a different socket instance this time round
  211. startListening(socket, clientInstance, event)
  212. event.next({
  213. id: uuidv4(),
  214. event: 'Client Reconnected',
  215. data: {
  216. clientId: clientInstance.id,
  217. message: `Client ${clientInstance.id} connection re-established`,
  218. payload: clientInstance
  219. } as EventMessage
  220. })
  221. // Resume operation
  222. if (clientInstance.connectionState.getValue() == 'OFFLINE') {
  223. clientInstance.connectionState.next(`ONLINE`)
  224. }
  225. } else {
  226. console.log(`Profile Not Found`)
  227. socket.emit('profile', { name: 'Error', message: 'Receiver Profile Not found' })
  228. }
  229. }
  230. })
  231. })
  232. }
  233. // Specifically to write receiver profile information
  234. export async function writeFile(data: ConnectedServerSocket, filename: string): Promise<boolean> {
  235. return new Promise((resolve, reject) => {
  236. // Write JSON data to a file
  237. fs.writeFile(`${filename}.json`, JSON.stringify(data, null, 2), (err) => {
  238. if (err) {
  239. console.error('Error writing file', err);
  240. reject(false)
  241. } else {
  242. console.log('File has been written');
  243. resolve(true)
  244. }
  245. });
  246. })
  247. }
  248. // Check if filename exists. Return profile information if there's any
  249. export async function checkOwnClientInfo(filename?: string): Promise<ConnectedServerSocket> {
  250. return new Promise((resolve, reject) => {
  251. // Check if the file exists
  252. if (fs.existsSync(`${filename}.json`)) {
  253. try {
  254. // Read the file contents
  255. const fileData = fs.readFileSync(`${filename}.json`, 'utf8');
  256. // If the file is empty, return an error
  257. if (fileData.trim() === "") {
  258. throw new Error("File is empty");
  259. }
  260. // Parse and return the data if present
  261. const jsonData = JSON.parse(fileData);
  262. resolve(jsonData)
  263. } catch (err) {
  264. // Handle parsing errors or other file-related errors
  265. console.error("Error reading or parsing file:", err);
  266. reject('');
  267. }
  268. } else {
  269. console.error("File does not exist");
  270. reject('');
  271. }
  272. })
  273. }
  274. export function startListening(socket: SocketForConnectedClient, client: ConnectedClientSocket, eventListener: Observer<TransportEvent>): void {
  275. /* Generally, we don't need this unless in the case of being the receiver */
  276. socket.on('message', (message: any) => {
  277. eventListener.next({
  278. id: uuidv4(),
  279. event: 'New Message',
  280. data: {
  281. id: uuidv4(),
  282. dateCreated: new Date(),
  283. transport: Transport.Websocket,
  284. target: client.id, // this ref to be associated with the client/channel
  285. payload: message
  286. } as TransportMessage
  287. })
  288. })
  289. socket.on('disconnect', () => {
  290. eventListener.next({
  291. id: uuidv4(),
  292. event: 'Server Disconnected',
  293. data: {
  294. clientID: client.id,
  295. time: new Date()
  296. }
  297. })
  298. eventListener.error(`Client ${client.id} disconnected. Terminating this observable event for this client socket...`)
  299. eventListener.complete()
  300. })
  301. }