socket.service.ts 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. import { BehaviorSubject, Observable, Subject } from "rxjs"
  2. import { RetransmissionService } from "../../services/retransmission.service"
  3. import { BaseMessage } from "../../dependencies/logging/services/logging-service"
  4. import { v4 as uuidV4 } from 'uuid';
  5. import { Socket } from "socket.io-client";
  6. const express = require('express');
  7. const http = require('http');
  8. const { Server } = require('socket.io');
  9. /* This is only for demonstration purposes. Because the actual nestjs socket implementation may differ. */
  10. export class SocketService {
  11. private connectedClients: ClientInfo[] = []
  12. private announcements: Subject<any> = new Subject()
  13. private app = express();
  14. private server = http.createServer(this.app);
  15. private io = new Server(this.server);
  16. private responseFromApp: Subject<BaseMessage>
  17. private incomingRequest: Subject<BaseMessage> = new Subject()
  18. constructor(response: Subject<BaseMessage>) {
  19. this.responseFromApp = response
  20. this.announcements.subscribe(announcement => {
  21. console.log(`Server Announcement: ${announcement}`)
  22. })
  23. }
  24. public getIncomingRequest(): Observable<BaseMessage> {
  25. return this.incomingRequest.asObservable()
  26. }
  27. public async setUpConnection() {
  28. this.io.on('connection', (socket) => {
  29. this.announcements.next('a client is connected:' + socket.id);
  30. let clientIsOnline: BehaviorSubject<boolean> = new BehaviorSubject(true)
  31. let clientInfo: ClientInfo | null
  32. socket.on('connect', (msg) => {
  33. // this is reserved....
  34. });
  35. socket.on('notification', (msg) => {
  36. console.log(msg)
  37. if (msg.agenda == 'newClient') {
  38. clientInfo = {
  39. id: socket.id,
  40. clientName: uuidV4(),
  41. connectedAt: new Date(),
  42. clientConnectionState: new BehaviorSubject<'ONLINE' | 'OFFLINE'>('ONLINE'),
  43. requests: [],
  44. buffer: new RetransmissionService(),
  45. responseObs: new Subject<BaseMessage>()
  46. }
  47. this.connectedClients.push(clientInfo);
  48. // Send data over for client to persist
  49. socket.emit('notification', {
  50. notification: 'Your credentials',
  51. createdAt: new Date(),
  52. socketInfo: clientInfo
  53. })
  54. // this is the supposed responses to be pushed to this socket client
  55. clientInfo.buffer.retransmission(clientInfo.responseObs, clientInfo.clientConnectionState)
  56. let subscription = clientInfo.buffer.returnBufferedMessages().subscribe(output => {
  57. // console.log(output)
  58. if (clientIsOnline.getValue() === true) {
  59. socket.emit('response', output)
  60. } else {
  61. subscription.unsubscribe()
  62. }
  63. })
  64. }
  65. if (msg.agenda == 'existingClient') {
  66. // check if client exists
  67. let clientObj = this.connectedClients.find(obj => obj.clientName === msg.data.clientName)
  68. if (clientObj) {
  69. clientInfo = clientObj
  70. console.log('Existing client found')
  71. // but also update socketId
  72. clientObj.id = socket.id
  73. // Send data over for client to persist
  74. socket.emit('notification', {
  75. notification: 'Your updated credentials',
  76. connectedAt: new Date(),
  77. updatedId: socket.id
  78. })
  79. // resume operation Release them buffer
  80. /* local client isOnline need not be mutated, since this is a new connection. However the previous intance of client Connection State
  81. inside the retransmission needs to be updated to release the buffered values.*/
  82. clientObj.clientConnectionState.next('ONLINE')
  83. let subscription = clientObj.buffer.returnBufferedMessages().subscribe(output => {
  84. // console.log(output)
  85. if (clientIsOnline.getValue() === true) {
  86. socket.emit('response', output)
  87. } else {
  88. subscription.unsubscribe()
  89. }
  90. })
  91. } else {
  92. console.log(this.connectedClients)
  93. console.log(`Existing Client is not found`)
  94. }
  95. }
  96. })
  97. // Listen for messages from the client
  98. socket.on('request', (request: BaseMessage) => {
  99. if (clientInfo) {
  100. this.announcements.next(`Received Message: ${request.header.messageID} from ${clientInfo.clientName}`);
  101. // clientInfo.requests.push({ message: request, completed: false })
  102. this.incomingRequest.next(request)
  103. this.processRequest(request).subscribe({
  104. next: message => {
  105. // console.log(message.header.messageName) // it does receive
  106. clientInfo.responseObs.next(message)
  107. },
  108. error: err => console.error(err),
  109. complete: () => { }
  110. })
  111. } else {
  112. console.log(`Client is still not defined. Please have this client set up the credentials`)
  113. socket.emit('notification', {
  114. notification: 'Failed Request',
  115. data: request,
  116. message: 'Client Credentials is not properply set up! Cannot process requests at the moment.'
  117. })
  118. }
  119. });
  120. // Handle disconnection
  121. socket.on('disconnect', () => {
  122. if (clientInfo) {
  123. clientIsOnline.next(false)
  124. clientInfo.clientConnectionState.next('OFFLINE') // signal to start buffering\
  125. this.announcements.next(`Client ${clientInfo.id} disconnected`);
  126. // this.deleteClientById(socket.id)
  127. }
  128. });
  129. });
  130. this.io.engine.on("connection_error", (err) => {
  131. console.log(err.req); // the request object
  132. console.log(err.code); // the error code, for example 1
  133. console.log(err.message); // the error message, for example "Session ID unknown"
  134. console.log(err.context); // some additional error context
  135. });
  136. // Start the server
  137. const PORT = process.env.PORT || 3000;
  138. this.server.listen(PORT, () => {
  139. console.log(`Server listening on port ${PORT}`);
  140. });
  141. }
  142. // Utils
  143. // Function to delete an item by its id (mutating the array)
  144. private deleteClientById(id) {
  145. const index = this.connectedClients.findIndex(item => item.id === id);
  146. if (index !== -1) {
  147. this.connectedClients.splice(index, 1);
  148. }
  149. }
  150. private processRequest(request: BaseMessage): Observable<BaseMessage> {
  151. return new Observable((observer) => {
  152. this.responseFromApp.subscribe(message => {
  153. // console.log(message)
  154. if (message.header.messageID === request.header.messageID && message.header.messageName != 'Complete') {
  155. observer.next(message)
  156. }
  157. if (message.header.messageID === request.header.messageID && message.header.messageName == 'Complete') {
  158. observer.next(message)
  159. // console.log(message) // so it does show
  160. observer.complete()
  161. }
  162. })
  163. })
  164. }
  165. }
  166. export interface ClientInfo {
  167. id: string,
  168. clientName: string,
  169. connectedAt: Date,
  170. clientConnectionState: BehaviorSubject<'ONLINE' | 'OFFLINE'>,
  171. requests: { message: any, completed: boolean }[],
  172. buffer: RetransmissionService,
  173. responseObs: Subject<BaseMessage>
  174. }