import { Observable, Subject, takeWhile } from "rxjs"; import { prepareResponseMessages } from "../../services/utility/prepareFISmessage"; import { BaseMessage } from "../../dependencies/logging/interface/export"; import { io, Socket } from "socket.io-client"; import { WrappedMessage } from "../../interfaces/general.interface"; import * as fs from 'fs' import { ClientInfo } from "./socket.service"; import { MongoService } from "./temp-log-service"; let onHoldMessagesSubject: Subject = new Subject() let toBePassedOverToApp: Subject = new Subject() // Serve static files (optional) let sender: Subject = prepareResponseMessages(2, 1000) let serverSocketUrl: string = 'http://192.168.100.96:3000' // let serverSocketUrl: string = 'http://192.168.100.96:4047' let socket: Socket let client: string = 'client1' let mongoService: MongoService = new MongoService(client) establishSocketConnection(serverSocketUrl).then(() => { // sender.sub`scribe({ // next: (message: BaseMessage) => { // makeRequest(message).subscribe({ // next: (message: BaseMessage) => { // }, // complete: () => console.log(`Request ${message.header.messageID} has acquired all responses.`) // }) // } // })` }) // the interface the client Program will make without having to decide transport protocol function makeRequest(request: BaseMessage): Observable { return new Observable((response) => { sendMessage(request) toBePassedOverToApp.subscribe({ next: (message: BaseMessage | any) => { // console.log(message.header.messageName) // The identification of responses mapping to the request be adjusted accordingly // For now it's a simple demulti-plexing if (message.complete) { response.complete() } else { response.next(message) } }, error: err => console.error(err), complete: () => { } }) }) } // socket util: Assuming that the client program would already have something like this in place async function establishSocketConnection(serverUrl: string): Promise { return new Promise((resolve, reject) => { try { socket = io(serverUrl, { reconnection: true, // Enable automatic reconnections reconnectionAttempts: 100, // Retry up to 10 times reconnectionDelay: 500, // Start with a 500ms delay reconnectionDelayMax: 10000, // Delay can grow to a max of 10 seconds randomizationFactor: 0.3, }) // Check if it's a previuos client. let data: ClientInfo | null = checkOwnClientInfo(client) if (data) { socket.emit('notification', { agenda: 'existingClient', data: data }) } else { socket.emit('notification', { agenda: 'newClient' }) } // Listen for a connection event socket.on('connect', () => { // socket.emit('Hello from the client!') console.log('Connected to the server:', socket.id) }); // Listen for messages from the server socket.on('response', (msg: WrappedMessage) => { console.log('Message from server:', msg.payload.header.messageName ?? 'null', ' for ', msg.payload.header.messageID ?? 'complete'); if (!msg.payload.complete) { mongoService.write(msg.payload, msg.payload.header.messageID, () => console.log(`Error function doesn't exist.`)) } // Check the sequence by ensuring the message value before the current message exists, then pass them over to "App" // onHoldMessagesSubject.next(msg) // checkMessage(msg, onHoldMessageSubject).then(() => [ // toBePassedOverToApp.next(msg.payload as BaseMessage) // ]).catch((err) => console.error(err)) toBePassedOverToApp.next(msg.payload) }) socket.on('notification', (msg: any) => { if (msg.notification == 'Your credentials') { console.log(`Assigned client Name: ${msg.socketInfo.clientName}`) writeFile(msg.socketInfo as ClientInfo, client) } if (msg.notification == 'Your updated credentials') { console.log(`Updated socket ID: `, msg) // writeFile(msg.socketInfo as ClientInfo) } if (msg.notification == 'Failed Request') { console.log(`Resending request...`, msg.data.header.messageID) setTimeout(() => { sender.next(msg.data) }, 1000) } }) resolve('') // Handle disconnection socket.on('disconnect', () => { console.log('Disconnected from the server'); // receiverConnectionState.next('OFFLINE') }); } catch (error) { reject(error) } }) } function checkOwnClientInfo(filename: string): ClientInfo | null { // Check if the file exists if (fs.existsSync(`${filename}.json`)) { try { // Read the file contents const fileData = fs.readFileSync(`${filename}.json`, 'utf8'); // If the file is empty, return an error if (fileData.trim() === "") { throw new Error("File is empty"); } // Parse and return the data if present const jsonData = JSON.parse(fileData); return jsonData; } catch (err) { // Handle parsing errors or other file-related errors console.error("Error reading or parsing file:", err.message); return null; } } else { console.error("File does not exist"); return null; } } function writeFile(data: ClientInfo, filename: string) { // Write JSON data to a file fs.writeFile(`${filename}.json`, JSON.stringify(data, null, 2), (err) => { if (err) { console.error('Error writing file', err); } else { console.log('File has been written'); } }); } async function sendMessage(message: BaseMessage): Promise { return new Promise((resolve, reject) => { try { // extra precaution: According to chatgpt, if disconnected, then the payload will be loaded back in event queue whilst the socket will try to reestablish connection // https://socket.io/docs/v4/client-offline-behavior/ socket.emit('request', message); // inherently an aysnc console.log(`SocketEmit() for message to event queue ${message.header.messageID}`) resolve('') } catch (error) { console.error('Error emitting message:', error); sender.next(message) reject(error) } }) }