socket.service.ts 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. import { BehaviorSubject, Observable, Subject } from "rxjs"
  2. import { RetransmissionService } from "../../services/retransmission.service"
  3. import { BaseMessage } from "../../dependencies/logging/services/logging-service"
  4. import { v4 as uuidV4 } from 'uuid';
  5. import { Socket } from "socket.io-client";
  6. const express = require('express');
  7. const http = require('http');
  8. const { Server } = require('socket.io');
  9. /* This is only for demonstration purposes. Because the actual nestjs socket implementation may differ. */
  10. export class SocketService {
  11. private connectedClients: ClientInfo[] = []
  12. private announcements: Subject<any> = new Subject()
  13. private app = express();
  14. private server = http.createServer(this.app);
  15. private io = new Server(this.server);
  16. private responseFromApp: Subject<BaseMessage>
  17. private incomingRequest: Subject<BaseMessage> = new Subject()
  18. constructor(response: Subject<BaseMessage>) {
  19. this.responseFromApp = response
  20. this.announcements.subscribe(announcement => {
  21. console.log(`Server Announcement: ${announcement}`)
  22. })
  23. }
  24. public getIncomingRequest(): Observable<BaseMessage> {
  25. return this.incomingRequest.asObservable()
  26. }
  27. public async setUpConnection() {
  28. this.io.on('connection', (socket) => {
  29. this.announcements.next('a client is connected:' + socket.id);
  30. let clientIsOnline: BehaviorSubject<boolean> = new BehaviorSubject(true)
  31. let clientInfo: ClientInfo | null
  32. socket.on('connect', (msg) => {
  33. // this is reserved....
  34. });
  35. socket.on('notification', (msg) => {
  36. console.log(msg)
  37. if (msg.agenda == 'newClient') {
  38. clientInfo = {
  39. id: socket.id,
  40. clientName: uuidV4(),
  41. connectedAt: new Date(),
  42. clientConnectionState: new BehaviorSubject<'ONLINE' | 'OFFLINE'>('ONLINE'),
  43. requests: [],
  44. buffer: new RetransmissionService(),
  45. responseObs: new Subject<BaseMessage>()
  46. }
  47. this.connectedClients.push(clientInfo);
  48. // Send data over for client to persist
  49. socket.emit('notification', {
  50. notification: 'Your credentials',
  51. createdAt: new Date(),
  52. socketInfo: clientInfo
  53. })
  54. // this is the supposed responses to be pushed to this socket client
  55. clientInfo.buffer.retransmission(clientInfo.responseObs, clientInfo.clientConnectionState)
  56. let subscription = clientInfo.buffer.returnBufferedMessages().subscribe(output => {
  57. // console.log(output)
  58. if (clientIsOnline.getValue() === true) {
  59. socket.emit('response', output)
  60. } else {
  61. subscription.unsubscribe()
  62. }
  63. })
  64. }
  65. if (msg.agenda == 'existingClient') {
  66. // check if client exists
  67. let clientObj = this.connectedClients.find(obj => obj.clientName === msg.data.clientName)
  68. if (clientObj) {
  69. // clientInfo = clientObj
  70. console.log('Existing client found')
  71. // but also update socketId
  72. clientObj.id = socket.id
  73. // Send data over for client to persist
  74. socket.emit('notification', {
  75. notification: 'Your updated credentials',
  76. connectedAt: new Date(),
  77. updatedId: socket.id
  78. })
  79. socket.emit('notification', `Hello from server. You have been assigned ${socket.id}`);
  80. // resume operation Release them buffer
  81. /* local client isOnline need not be mutated, since this is a new connection. However the previous intance of client Connection State
  82. inside the retransmission needs to be updated to release the buffered values.*/
  83. clientObj.clientConnectionState.next('ONLINE')
  84. let subscription = clientObj.buffer.returnBufferedMessages().subscribe(output => {
  85. // console.log(output)
  86. if (clientIsOnline.getValue() === true) {
  87. socket.emit('response', output)
  88. } else {
  89. subscription.unsubscribe()
  90. }
  91. })
  92. } else {
  93. console.log(this.connectedClients)
  94. console.log(`Existing Client is not found`)
  95. }
  96. }
  97. })
  98. // Listen for messages from the client
  99. socket.on('request', (request: BaseMessage) => {
  100. if (clientInfo) {
  101. this.announcements.next(`Received Message: ${request.header.messageID} from ${clientInfo.clientName}`);
  102. // clientInfo.requests.push({ message: request, completed: false })
  103. this.incomingRequest.next(request)
  104. this.processRequest(request).subscribe({
  105. next: message => {
  106. // console.log(message.header.messageName) // it does receive
  107. clientInfo.responseObs.next(message)
  108. },
  109. error: err => console.error(err),
  110. complete: () => { }
  111. })
  112. } else {
  113. console.log(`Client is still not defined. Please have this client set up the credentials`)
  114. socket.emit('notification', {
  115. notification: 'Failed Request',
  116. data: request,
  117. message: 'Client Credentials is not properply set up! Cannot process requests at the moment.'
  118. })
  119. }
  120. });
  121. // Handle disconnection
  122. socket.on('disconnect', () => {
  123. if (clientInfo) {
  124. clientIsOnline.next(false)
  125. clientInfo.clientConnectionState.next('OFFLINE') // signal to start buffering\
  126. this.announcements.next(`Client ${clientInfo.id} disconnected`);
  127. // this.deleteClientById(socket.id)
  128. }
  129. });
  130. });
  131. this.io.engine.on("connection_error", (err) => {
  132. console.log(err.req); // the request object
  133. console.log(err.code); // the error code, for example 1
  134. console.log(err.message); // the error message, for example "Session ID unknown"
  135. console.log(err.context); // some additional error context
  136. });
  137. // Start the server
  138. const PORT = process.env.PORT || 3000;
  139. this.server.listen(PORT, () => {
  140. console.log(`Server listening on port ${PORT}`);
  141. });
  142. }
  143. // Utils
  144. // Function to delete an item by its id (mutating the array)
  145. private deleteClientById(id) {
  146. const index = this.connectedClients.findIndex(item => item.id === id);
  147. if (index !== -1) {
  148. this.connectedClients.splice(index, 1);
  149. }
  150. }
  151. private processRequest(request: BaseMessage): Observable<BaseMessage> {
  152. return new Observable((observer) => {
  153. this.responseFromApp.subscribe(message => {
  154. // console.log(message)
  155. if (message.header.messageID === request.header.messageID && message.header.messageName != 'Complete') {
  156. observer.next(message)
  157. }
  158. if (message.header.messageID === request.header.messageID && message.header.messageName == 'Complete') {
  159. observer.next(message)
  160. // console.log(message) // so it does show
  161. observer.complete()
  162. }
  163. })
  164. })
  165. }
  166. }
  167. export interface ClientInfo {
  168. id: string,
  169. clientName: string,
  170. connectedAt: Date,
  171. clientConnectionState: BehaviorSubject<'ONLINE' | 'OFFLINE'>,
  172. requests: { message: any, completed: boolean }[],
  173. buffer: RetransmissionService,
  174. responseObs: Subject<BaseMessage>
  175. }