import { Observable, Subject, takeWhile } from "rxjs"; import { prepareResponseMessages } from "../services/utility/prepareFISmessage"; import { BaseMessage } from "../dependencies/logging/interface/export"; import { WrappedMessage } from "../services/retransmission.service"; import { io, Socket } from "socket.io-client"; let onHoldMessagesSubject: Subject = new Subject() let toBePassedOverToApp: Subject = new Subject() // Serve static files (optional) let sender: Subject = prepareResponseMessages(1, 2000) let serverSocketUrl: string = 'http://localhost:3000' let socket: Socket establishSocketConnection(serverSocketUrl) // interval(1000).subscribe(value => { // just to test if the emission is in sequence after reconnection // console.log(value) // socket.emit('interval', value) // }) sender.subscribe({ next: message => { makeRequest(message).subscribe({ complete: () => console.log(`Request ${message.header.messageID} has acquired all responses.`) }) } }) // the interface the client Program will make without having to decide transport protocol function makeRequest(request: BaseMessage): Observable { return new Observable((response) => { sendMessage(request) toBePassedOverToApp.subscribe({ next: (message: BaseMessage) => { // The identification of responses mapping to the request be adjusted accordingly // For now it's a simple demulti-plexing if (message.header.messageID == request.header.messageID && message.header.messageName == 'ResponseData') { response.next(message) } if (message.header.messageID == request.header.messageID && message.header.messageName == 'Complete') { response.complete() } }, error: err => console.error(err), complete: () => { } }) }) } // socket util: Assuming that the client program would already have something like this in place function establishSocketConnection(serverUrl: string) { socket = io(serverUrl, { reconnection: true, // Enable automatic reconnections reconnectionAttempts: 100, // Retry up to 10 times reconnectionDelay: 500, // Start with a 500ms delay reconnectionDelayMax: 10000, // Delay can grow to a max of 10 seconds randomizationFactor: 0.3, }) // Listen for a connection event socket.on('connect', () => { // socket.emit('Hello from the client!') console.log('Connected to the server:', socket.id) // receiverConnectionState.next('ONLINE') }); // Listen for messages from the server socket.on('message', (msg: WrappedMessage) => { console.log('Message from server:', msg.payload.header.messageID); // Check the sequence by ensuring the message value before the current message exists, then pass them over to "App" onHoldMessagesSubject.next(msg) checkMessage(msg).then(() => [ toBePassedOverToApp.next(msg.payload as BaseMessage) ]).catch((err) => console.error(err)) }) socket.on('notification', (msg: string) => { console.log(msg) }) // Handle disconnection socket.on('disconnect', () => { console.log('Disconnected from the server'); // receiverConnectionState.next('OFFLINE') }); } async function sendMessage(message: BaseMessage): Promise { return new Promise((resolve, reject) => { try { // extra precaution: According to chatgpt, if disconnected, then the payload will be loaded back in event queue whilst the socket will try to reestablish connection // https://socket.io/docs/v4/client-offline-behavior/ socket.emit('message', message); // inherently an aysnc console.log(`SocketEmit() for message to event queue ${message.header.messageID}`) resolve('') } catch (error) { console.error('Error emitting message:', error); this.wrappedMessage.next(message) reject(error) } `` }) } // SO concept will be that if the message behind it is received, then async function checkMessage(message: WrappedMessage): Promise { return new Promise((resolve, reject) => { if (message.previousMessageID) { onHoldMessagesSubject.pipe( takeWhile(item => item.payload.header.messageID == message.previousMessageID) ).subscribe({ complete: () => { resolve('previousMessageID matched') // console.log(`${message.payload.header.messageID} : Previous messageID(${message.previousMessageID}) matched`) console.log(`matched`) } }) } else { console.log('No previous messageID. This should be the first message') resolve('No previous message ID. Please Proceed.') } }) }