socket-test-server.ts 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. import { BehaviorSubject, from, Observable, Subject, takeUntil, takeWhile } from "rxjs";
  2. import { RetransmissionService } from "../services/retransmission.service";
  3. import { prepareResponseMessages } from "../services/utility/prepareFISmessage";
  4. import { BaseMessage } from "../dependencies/logging/services/logging-service";
  5. const express = require('express');
  6. const http = require('http');
  7. const { Server } = require('socket.io');
  8. const app = express();
  9. const server = http.createServer(app);
  10. const io = new Server(server);
  11. // Keep track of connected clients
  12. const clients: any[] = []
  13. let announcements: Subject<any> = new Subject()
  14. announcements.subscribe(announcement => {
  15. console.log(`Server Announcement: ${announcement}`)
  16. })
  17. app.use(express.static('public'));
  18. io.on('connection', (socket) => {
  19. announcements.next('a client connected:' + socket.id);
  20. let clientInfo: ClientInfo = {
  21. id: socket.id,
  22. connectedAt: new Date(),
  23. clientConnectionState: new BehaviorSubject<'ONLINE' | 'OFFLINE'>('ONLINE'),
  24. requests: [],
  25. buffer: new RetransmissionService()
  26. }
  27. let serverBuffer = new Subject<any>()
  28. clients.push(clientInfo);
  29. clientInfo.buffer.retransmission(serverBuffer, clientInfo.clientConnectionState).subscribe(output => socket.emit('message', output))
  30. // Listen for messages from the client
  31. socket.on('message', (request) => {
  32. announcements.next(`Received Message: ${request.header.messageID} from ${clientInfo.id}`);
  33. clientInfo.requests.push({ message: request, completed: false })
  34. returnResponse(request).subscribe({
  35. next: message => serverBuffer.next(message),
  36. error: err => console.error(err),
  37. complete: () => {
  38. let clientOBJ = clientInfo.requests.find(obj => obj.message.header.messageID === request.header.messageID)
  39. clientOBJ.completed = true
  40. console.log('Current Array', clients)
  41. }
  42. })
  43. });
  44. socket.on('connect', (msg) => {
  45. // Send a response back to the client
  46. socket.emit('notification', `Hello from server. You have been assigned ${socket.id}`);
  47. });
  48. socket.on('interval', (value) => {
  49. console.log(socket.id, value) // okay so it does receive in sequence after reconnection
  50. })
  51. // Handle disconnection
  52. socket.on('disconnect', () => {
  53. announcements.next(`Client ${clientInfo.id} disconnected`);
  54. deleteClientById(socket.id)
  55. });
  56. });
  57. io.engine.on("connection_error", (err) => {
  58. console.log(err.req); // the request object
  59. console.log(err.code); // the error code, for example 1
  60. console.log(err.message); // the error message, for example "Session ID unknown"
  61. console.log(err.context); // some additional error context
  62. });
  63. // Start the server
  64. const PORT = process.env.PORT || 3000;
  65. server.listen(PORT, () => {
  66. console.log(`Server listening on port ${PORT}`);
  67. });
  68. // Utils
  69. // Function to delete an item by its id (mutating the array)
  70. function deleteClientById(id) {
  71. const index = clients.findIndex(item => item.id === id);
  72. if (index !== -1) {
  73. clients.splice(index, 1);
  74. }
  75. }
  76. function returnResponse(request: BaseMessage): Observable<BaseMessage> {
  77. return new Observable((observer) => {
  78. prepareResponseMessages(10, 1000).subscribe({
  79. next: (message: BaseMessage) => {
  80. message.header.messageID = request.header.messageID
  81. observer.next(message)
  82. },
  83. error: err => console.error(err),
  84. complete: () => {
  85. prepareResponseMessages(1).subscribe({
  86. next: message => {
  87. message.header.messageID = request.header.messageID
  88. message.header.messageName = 'Complete'
  89. observer.next(message)
  90. },
  91. complete: () => {
  92. observer.complete()
  93. }
  94. })
  95. }
  96. })
  97. })
  98. }
  99. export interface ClientInfo {
  100. id: string,
  101. connectedAt: Date,
  102. clientConnectionState: BehaviorSubject<'ONLINE' | 'OFFLINE'>,
  103. requests: { message: any, completed: boolean }[],
  104. buffer: RetransmissionService
  105. }