123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- import { BehaviorSubject, from, Observable, Subject, takeUntil, takeWhile } from "rxjs";
- import { RetransmissionService } from "../services/retransmission.service";
- import { prepareResponseMessages } from "../services/utility/prepareFISmessage";
- import { BaseMessage } from "../dependencies/logging/services/logging-service";
- const express = require('express');
- const http = require('http');
- const { Server } = require('socket.io');
- const app = express();
- const server = http.createServer(app);
- const io = new Server(server);
- // Keep track of connected clients
- const clients: any[] = []
- let announcements: Subject<any> = new Subject()
- announcements.subscribe(announcement => {
- console.log(`Server Announcement: ${announcement}`)
- })
- app.use(express.static('public'));
- io.on('connection', (socket) => {
- announcements.next('a client connected:' + socket.id);
- let clientInfo: ClientInfo = {
- id: socket.id,
- connectedAt: new Date(),
- clientConnectionState: new BehaviorSubject<'ONLINE' | 'OFFLINE'>('ONLINE'),
- requests: [],
- buffer: new RetransmissionService()
- }
- let serverBuffer = new Subject<any>()
- clients.push(clientInfo);
- clientInfo.buffer.retransmission(serverBuffer, clientInfo.clientConnectionState).subscribe(output => socket.emit('message', output))
- // Listen for messages from the client
- socket.on('message', (request) => {
- announcements.next(`Received Message: ${request.header.messageID} from ${clientInfo.id}`);
- clientInfo.requests.push({ message: request, completed: false })
- returnResponse(request).subscribe({
- next: message => serverBuffer.next(message),
- error: err => console.error(err),
- complete: () => {
- let clientOBJ = clientInfo.requests.find(obj => obj.message.header.messageID === request.header.messageID)
- clientOBJ.completed = true
- console.log('Current Array', clients)
- }
- })
- });
- socket.on('connect', (msg) => {
- // Send a response back to the client
- socket.emit('notification', `Hello from server. You have been assigned ${socket.id}`);
- });
- socket.on('interval', (value) => {
- console.log(socket.id, value) // okay so it does receive in sequence after reconnection
- })
- // Handle disconnection
- socket.on('disconnect', () => {
- announcements.next(`Client ${clientInfo.id} disconnected`);
- deleteClientById(socket.id)
- });
- });
- io.engine.on("connection_error", (err) => {
- console.log(err.req); // the request object
- console.log(err.code); // the error code, for example 1
- console.log(err.message); // the error message, for example "Session ID unknown"
- console.log(err.context); // some additional error context
- });
- // Start the server
- const PORT = process.env.PORT || 3000;
- server.listen(PORT, () => {
- console.log(`Server listening on port ${PORT}`);
- });
- // Utils
- // Function to delete an item by its id (mutating the array)
- function deleteClientById(id) {
- const index = clients.findIndex(item => item.id === id);
- if (index !== -1) {
- clients.splice(index, 1);
- }
- }
- function returnResponse(request: BaseMessage): Observable<BaseMessage> {
- return new Observable((observer) => {
- prepareResponseMessages(10, 1000).subscribe({
- next: (message: BaseMessage) => {
- message.header.messageID = request.header.messageID
- observer.next(message)
- },
- error: err => console.error(err),
- complete: () => {
- prepareResponseMessages(1).subscribe({
- next: message => {
- message.header.messageID = request.header.messageID
- message.header.messageName = 'Complete'
- observer.next(message)
- },
- complete: () => {
- observer.complete()
- }
- })
- }
- })
- })
- }
- export interface ClientInfo {
- id: string,
- connectedAt: Date,
- clientConnectionState: BehaviorSubject<'ONLINE' | 'OFFLINE'>,
- requests: { message: any, completed: boolean }[],
- buffer: RetransmissionService
- }
|