import { Observable, Subject, takeWhile } from "rxjs"; import { prepareResponseMessages } from "../../services/utility/prepareFISmessage"; import { BaseMessage } from "../../dependencies/logging/interface/export"; import { io, Socket } from "socket.io-client"; import { WrappedMessage } from "../../interfaces/general.interface"; import * as fs from 'fs' import { ClientInfo } from "./socket.service"; let onHoldMessagesSubject: Subject = new Subject() let toBePassedOverToApp: Subject = new Subject() // Serve static files (optional) let sender: Subject = prepareResponseMessages(1, 2000) let serverSocketUrl: string = 'http://localhost:3000' let socket: Socket establishSocketConnection(serverSocketUrl).then(() => { sender.subscribe({ next: message => { makeRequest(message).subscribe({ complete: () => console.log(`Request ${message.header.messageID} has acquired all responses.`) }) } }) }) // the interface the client Program will make without having to decide transport protocol function makeRequest(request: BaseMessage): Observable { return new Observable((response) => { sendMessage(request) toBePassedOverToApp.subscribe({ next: (message: BaseMessage) => { // console.log(message.header.messageName) // The identification of responses mapping to the request be adjusted accordingly // For now it's a simple demulti-plexing if (message.header.messageID == request.header.messageID && message.header.messageName != 'Complete') { response.next(message) } if (message.header.messageID == request.header.messageID && message.header.messageName == 'Complete') { response.complete() } }, error: err => console.error(err), complete: () => { } }) }) } // socket util: Assuming that the client program would already have something like this in place async function establishSocketConnection(serverUrl: string): Promise { return new Promise((resolve, reject) => { try { socket = io(serverUrl, { reconnection: true, // Enable automatic reconnections reconnectionAttempts: 100, // Retry up to 10 times reconnectionDelay: 500, // Start with a 500ms delay reconnectionDelayMax: 10000, // Delay can grow to a max of 10 seconds randomizationFactor: 0.3, }) // Check if it's a previuos client. let data: ClientInfo | null = checkOwnClientInfo('info.json') if (data) { socket.emit('notification', { agenda: 'existingClient', data: data }) } else { socket.emit('notification', { agenda: 'newClient' }) } // Listen for a connection event socket.on('connect', () => { // socket.emit('Hello from the client!') console.log('Connected to the server:', socket.id) }); // Listen for messages from the server socket.on('response', (msg: WrappedMessage) => { // console.log('Message from server:', msg.payload.header.messageName); // Check the sequence by ensuring the message value before the current message exists, then pass them over to "App" // onHoldMessagesSubject.next(msg) // checkMessage(msg).then(() => [ // toBePassedOverToApp.next(msg.payload as BaseMessage) // ]).catch((err) => console.error(err)) toBePassedOverToApp.next(msg.payload as BaseMessage) }) socket.on('notification', (msg: any) => { if (msg.notification == 'Your credentials') { console.log(`Assigned client Name: ${msg.socketInfo.clientName}`) writeFile(msg.socketInfo as ClientInfo) } if (msg.notification == 'Your updated credentials') { console.log(`Updated socket ID: ${msg.socketInfo.id}`) writeFile(msg.socketInfo as ClientInfo) } if (msg.notification == 'Failed Request') { console.log(`Resending request...`, msg.data.header.messageID) // sender.next(msg.data) } }) resolve('') // Handle disconnection socket.on('disconnect', () => { console.log('Disconnected from the server'); // receiverConnectionState.next('OFFLINE') }); } catch (error) { reject(error) } }) } function checkOwnClientInfo(filename: string): ClientInfo | null { // Check if the file exists if (fs.existsSync(filename)) { try { // Read the file contents const fileData = fs.readFileSync(filename, 'utf8'); // If the file is empty, return an error if (fileData.trim() === "") { throw new Error("File is empty"); } // Parse and return the data if present const jsonData = JSON.parse(fileData); return jsonData; } catch (err) { // Handle parsing errors or other file-related errors console.error("Error reading or parsing file:", err.message); return null; } } else { console.error("File does not exist"); return null; } } function writeFile(data: ClientInfo) { // Write JSON data to a file fs.writeFile('info.json', JSON.stringify(data, null, 2), (err) => { if (err) { console.error('Error writing file', err); } else { console.log('File has been written'); } }); } async function sendMessage(message: BaseMessage): Promise { return new Promise((resolve, reject) => { try { // extra precaution: According to chatgpt, if disconnected, then the payload will be loaded back in event queue whilst the socket will try to reestablish connection // https://socket.io/docs/v4/client-offline-behavior/ socket.emit('request', message); // inherently an aysnc console.log(`SocketEmit() for message to event queue ${message.header.messageID}`) resolve('') } catch (error) { console.error('Error emitting message:', error); sender.next(message) reject(error) } }) } // SO concept will be that if the message behind it is received, then async function checkMessage(message: WrappedMessage): Promise { return new Promise((resolve, reject) => { if (message.previousMessageID) { onHoldMessagesSubject.pipe( takeWhile(item => message.previousMessageID === item.thisMessageID ) ).subscribe({ complete: () => { resolve('previousMessageID matched') // console.log(`${message.payload.header.messageID} : Previous messageID(${message.previousMessageID}) matched`) // console.log(`matched`) } }) } else { console.log('No previous messageID. This should be the first message') resolve('No previous message ID. Please Proceed.') } }) }