socket.service.ts 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. import { BehaviorSubject, Observable, Subject } from "rxjs"
  2. import { RetransmissionService } from "../../services/retransmission.service"
  3. import { BaseMessage } from "../../dependencies/logging/services/logging-service"
  4. import { v4 as uuidV4 } from 'uuid';
  5. import { Socket } from "socket.io-client";
  6. const express = require('express');
  7. const http = require('http');
  8. const { Server } = require('socket.io');
  9. /* This is only for demonstration purposes. Because the actual nestjs socket implementation may differ. */
  10. export class SocketService {
  11. private connectedClients: ClientInfo[] = []
  12. private announcements: Subject<any> = new Subject()
  13. private app = express();
  14. private server = http.createServer(this.app);
  15. private io = new Server(this.server);
  16. private responseFromApp: Subject<BaseMessage>
  17. private incomingRequest: Subject<BaseMessage> = new Subject()
  18. constructor(response: Subject<BaseMessage>) {
  19. this.responseFromApp = response
  20. this.announcements.subscribe(announcement => {
  21. console.log(`Server Announcement: ${announcement}`)
  22. })
  23. }
  24. public getIncomingRequest(): Observable<BaseMessage> {
  25. return this.incomingRequest.asObservable()
  26. }
  27. public async setUpConnection() {
  28. this.io.on('connection', (socket) => {
  29. this.announcements.next('a client is connected:' + socket.id);
  30. let clientInfo: ClientInfo | null
  31. socket.on('connect', (msg) => {
  32. // this is reserved....
  33. });
  34. socket.on('notification', (msg) => {
  35. console.log(msg)
  36. clientInfo = this.handleNotification(msg, socket, clientInfo)
  37. })
  38. // Listen for messages from the client
  39. socket.on('request', (request: BaseMessage) => {
  40. if (clientInfo) {
  41. this.announcements.next(`Received Message: ${request.header.messageID} from ${clientInfo.clientName}`);
  42. // clientInfo.requests.push({ message: request, completed: false })
  43. this.incomingRequest.next(request)
  44. this.processRequest(request).subscribe({
  45. next: message => {
  46. // console.log(message.header.messageName) // it does receive
  47. clientInfo.responseObs.next(message)
  48. },
  49. error: err => console.error(err),
  50. complete: () => { }
  51. })
  52. } else {
  53. console.log(`Client is still not defined. Please have this client set up the credentials`)
  54. socket.emit('notification', {
  55. notification: 'Failed Request',
  56. data: request,
  57. message: 'Client Credentials is not properply set up! Cannot process requests at the moment.'
  58. })
  59. }
  60. });
  61. // Handle disconnection
  62. socket.on('disconnect', () => {
  63. if (clientInfo) {
  64. clientInfo.clientConnectionState.next('OFFLINE') // signal to start buffering
  65. this.announcements.next(`Client ${clientInfo.id} disconnected`);
  66. // this.deleteClientById(socket.id)
  67. }
  68. });
  69. });
  70. this.io.engine.on("connection_error", (err) => {
  71. console.log(err.req); // the request object
  72. console.log(err.code); // the error code, for example 1
  73. console.log(err.message); // the error message, for example "Session ID unknown"
  74. console.log(err.context); // some additional error context
  75. });
  76. // Start the server
  77. const PORT = process.env.PORT || 3000;
  78. this.server.listen(PORT, () => {
  79. console.log(`Server listening on port ${PORT}`);
  80. });
  81. }
  82. // Utils
  83. // Function to delete an item by its id (mutating the array)
  84. private deleteClientById(id) {
  85. const index = this.connectedClients.findIndex(item => item.id === id);
  86. if (index !== -1) {
  87. this.connectedClients.splice(index, 1);
  88. }
  89. }
  90. private processRequest(request: BaseMessage): Observable<BaseMessage> {
  91. return new Observable((observer) => {
  92. this.responseFromApp.subscribe(message => {
  93. // console.log(message)
  94. if (message.header.messageID === request.header.messageID && message.header.messageName != 'Complete') {
  95. observer.next(message)
  96. }
  97. if (message.header.messageID === request.header.messageID && message.header.messageName == 'Complete') {
  98. observer.next(message)
  99. // console.log(message) // so it does show
  100. observer.complete()
  101. }
  102. })
  103. })
  104. }
  105. private handleNotification(msg: any, socket: Socket, clientInfo: ClientInfo | null) {
  106. if (msg.agenda == 'newClient') {
  107. clientInfo = {
  108. id: socket.id,
  109. clientName: uuidV4(),
  110. connectedAt: new Date(),
  111. clientConnectionState: new BehaviorSubject<'ONLINE' | 'OFFLINE'>('ONLINE'),
  112. requests: [],
  113. buffer: new RetransmissionService(),
  114. responseObs: new Subject<BaseMessage>()
  115. }
  116. this.connectedClients.push(clientInfo);
  117. // Send data over for client to persist
  118. socket.emit('notification', {
  119. notification: 'Your credentials',
  120. createdAt: new Date(),
  121. socketInfo: clientInfo
  122. })
  123. // this is the supposed responses to be pushed to this socket client
  124. clientInfo.buffer.retransmission(clientInfo.responseObs, clientInfo.clientConnectionState).subscribe(output => {
  125. // console.log(output)
  126. socket.emit('response', output)
  127. })
  128. }
  129. if (msg.agenda == 'existingClient') {
  130. // check if client exists
  131. let clientObj = this.connectedClients.find(obj => obj.clientName === msg.data.clientName)
  132. if (clientObj) {
  133. clientInfo = clientObj
  134. console.log('Existing client found')
  135. // but also update socketId
  136. clientObj.id = socket.id
  137. // Send data over for client to persist
  138. socket.emit('notification', {
  139. notification: 'Your updated credentials',
  140. connectedAt: new Date(),
  141. socketInfo: clientInfo
  142. })
  143. socket.emit('notification', `Hello from server. You have been assigned ${socket.id}`);
  144. // resume operation
  145. clientObj.clientConnectionState.next('ONLINE')
  146. } else {
  147. console.log(this.connectedClients)
  148. console.log(`Existing Client is not found`)
  149. }
  150. }
  151. return clientInfo
  152. }
  153. }
  154. export interface ClientInfo {
  155. id: string,
  156. clientName: string,
  157. connectedAt: Date,
  158. clientConnectionState: BehaviorSubject<'ONLINE' | 'OFFLINE'>,
  159. requests: { message: any, completed: boolean }[],
  160. buffer: RetransmissionService,
  161. responseObs: Subject<BaseMessage>
  162. }