import { BehaviorSubject, from, Observable, Subject, takeUntil, takeWhile } from "rxjs"; import { RetransmissionService } from "../services/retransmission.service"; import { prepareResponseMessages } from "../services/utility/prepareFISmessage"; import { BaseMessage } from "../dependencies/logging/services/logging-service"; const express = require('express'); const http = require('http'); const { Server } = require('socket.io'); const app = express(); const server = http.createServer(app); const io = new Server(server); // Keep track of connected clients const clients: any[] = [] let announcements: Subject = new Subject() announcements.subscribe(announcement => { console.log(`Server Announcement: ${announcement}`) }) app.use(express.static('public')); io.on('connection', (socket) => { announcements.next('a client connected:' + socket.id); let clientInfo: ClientInfo = { id: socket.id, connectedAt: new Date(), clientConnectionState: new BehaviorSubject<'ONLINE' | 'OFFLINE'>('ONLINE'), requests: [], buffer: new RetransmissionService() } let serverBuffer = new Subject() clients.push(clientInfo); clientInfo.buffer.retransmission(serverBuffer, clientInfo.clientConnectionState).subscribe(output => socket.emit('message', output)) // Listen for messages from the client socket.on('message', (request) => { announcements.next(`Received Message: ${request.header.messageID} from ${clientInfo.id}`); clientInfo.requests.push({ message: request, completed: false }) returnResponse(request).subscribe({ next: message => serverBuffer.next(message), error: err => console.error(err), complete: () => { let clientOBJ = clientInfo.requests.find(obj => obj.message.header.messageID === request.header.messageID) clientOBJ.completed = true console.log('Current Array', clients) } }) }); socket.on('connect', (msg) => { // Send a response back to the client socket.emit('notification', `Hello from server. You have been assigned ${socket.id}`); }); socket.on('interval', (value) => { console.log(socket.id, value) // okay so it does receive in sequence after reconnection }) // Handle disconnection socket.on('disconnect', () => { announcements.next(`Client ${clientInfo.id} disconnected`); deleteClientById(socket.id) }); }); io.engine.on("connection_error", (err) => { console.log(err.req); // the request object console.log(err.code); // the error code, for example 1 console.log(err.message); // the error message, for example "Session ID unknown" console.log(err.context); // some additional error context }); // Start the server const PORT = process.env.PORT || 3000; server.listen(PORT, () => { console.log(`Server listening on port ${PORT}`); }); // Utils // Function to delete an item by its id (mutating the array) function deleteClientById(id) { const index = clients.findIndex(item => item.id === id); if (index !== -1) { clients.splice(index, 1); } } function returnResponse(request: BaseMessage): Observable { return new Observable((observer) => { prepareResponseMessages(10, 1000).subscribe({ next: (message: BaseMessage) => { message.header.messageID = request.header.messageID observer.next(message) }, error: err => console.error(err), complete: () => { prepareResponseMessages(1).subscribe({ next: message => { message.header.messageID = request.header.messageID message.header.messageName = 'Complete' observer.next(message) }, complete: () => { observer.complete() } }) } }) }) } export interface ClientInfo { id: string, connectedAt: Date, clientConnectionState: BehaviorSubject<'ONLINE' | 'OFFLINE'>, requests: { message: any, completed: boolean }[], buffer: RetransmissionService }