import { BehaviorSubject, buffer, concatMap, distinctUntilChanged, from, interval, Observable, Subject, Subscriber, take, takeUntil, takeWhile } from "rxjs"; import { io, Socket } from "socket.io-client"; import { prepareResponseMessages } from "../services/utility/prepareFISmessage"; import { BaseMessage } from "../dependencies/logging/services/logging-service"; import { rejects } from "assert"; // Connect to the server const socket: Socket = io('http://localhost:3000'); export let abstractStorage: WrappedMessage[] = [] export let bufferReleaseSignal: Subject = new Subject() export let sender: Subject = prepareResponseMessages(3000, 500) export let receiverConnectionState: BehaviorSubject<'OFFLINE' | 'ONLINE'> = new BehaviorSubject('OFFLINE') export let transmissionState: BehaviorSubject<'TRANSMITTING' | 'IDLE' | 'ARRAY EMPTY' | 'STORING DATA' | 'GETTING STORED DATA'> = new BehaviorSubject('ARRAY EMPTY') export let arrayToBeTransmitted: Subject = new Subject() export let toBeWrapped: Subject = new Subject() export let wrappedMessage: Subject = new Subject() export let transportLayerMessages = new Subject() // run this to active the release mechanism releaseSignalManager() // sender goes to toBeWrapped sender.subscribe(message => toBeWrapped.next(message)) // toBeWrapped will wrap the message with timeReceived and push next to wrappedMesasge subject let currentMessageId: string | null toBeWrapped.subscribe(message => { wrappedMessage.next(wrapMessageWithTimeReceived(message, currentMessageId ? currentMessageId : null)) currentMessageId = message.header.messageID }) //simulate connection test // wrappedMessage will then be pushed to buffer wrappedMessage.pipe(buffer(bufferReleaseSignal)).subscribe((bufferedMessages: WrappedMessage[]) => { console.log(bufferedMessages.length + ' buffered messages') console.log(`Released buffered message: ${bufferedMessages.length} total messages. To Be sorted.`) // arrayToBeTransmitted.next(sortMessage(bufferedMessages)) arrayToBeTransmitted.next(bufferedMessages.length > 0 ? sortMessage(bufferedMessages) : []) }); arrayToBeTransmitted.subscribe(array => { if (array.length > 0) { /* Note: Latest update, no point checking the receiver's connection state, since, once the message is pass on, it will be flushed into the event queue to be executed at a later time, which the connnection state would be mutated by then. */ // update transmission to indicate that this batch of array is being processed transmissionState.next('TRANSMITTING') from(array).pipe( concatMap((message: WrappedMessage) => { if (transmissionState.getValue() === 'TRANSMITTING') { console.log(message.timeReceived); return sendMessage(message).catch((error) => { return storeMessage(message).then((msgId) => { console.log(`Message (${msgId}) stored Successfully. {TransmissionState: ${transmissionState.getValue()}}`); }).catch((error) => { console.error(error); }); }); } else if (transmissionState.getValue() === 'STORING DATA') { return storeMessage(message).then((msgId) => { console.log(`Message (${msgId}) stored Successfully. {TransmissionState: ${transmissionState.getValue()}}`); }).catch((error) => { console.error(error); }); } else if (receiverConnectionState.getValue() === 'OFFLINE') { transmissionState.next('STORING DATA'); // to be fired every message processing return storeMessage(message).then((msgId) => { console.log(`Message (${msgId}) stored Successfully. {TransmissionState: ${transmissionState.getValue()}}`); }).catch((error) => { console.error(error); }); } else { return Promise.resolve(); // No async work, but need to return a resolved promise } }) ).subscribe({ error: err => console.error(err), complete: () => { // update transmission state to indicate this batch is completed console.log(`Processing buffered array completed. Changing transmission state to ARRAY EMPTY`); transmissionState.next('ARRAY EMPTY'); if (receiverConnectionState.getValue() === 'ONLINE' && transmissionState.getValue() === 'ARRAY EMPTY') { setTimeout(() => { bufferReleaseSignal.next() }, 1000) } // Do nothing if the receiver connection is offline } }); } else { // If I don't do setTimeout, then bufferrelasesignal will be overloaded setTimeout(() => { bufferReleaseSignal.next() }, 3000) } } ) /* Utils */ function releaseSignalManager() { receiverConnectionState.pipe( distinctUntilChanged() ).subscribe(clientState => { console.log(`Client is now ${clientState}`) if (clientState == 'OFFLINE') { console.log(`Current transmission state: ${transmissionState.getValue()}`) // just keep buffering } if (clientState == 'ONLINE') { console.log(`Current transmission state: ${transmissionState.getValue()}`) // get the stored messages to pump it back into the buffer to be ready to be processed immediately if (transmissionState.getValue() == 'ARRAY EMPTY') { getDataAndUpdateState() } if (transmissionState.getValue() == 'STORING DATA') { // have to wait for storing data to be completed before proceeding to the code above transmissionState.pipe( takeWhile(value => value == 'ARRAY EMPTY') //listen to this value and then destroy this observable ).subscribe({ next: () => { getDataAndUpdateState() }, error: err => console.error(err), complete: () => { } }) } } }) } function sortMessage(array: WrappedMessage[]): WrappedMessage[] { console.log(`Sorting ${array.length} messages....`) return array.sort((a, b) => { return new Date(a.timeReceived).getTime() - new Date(b.timeReceived).getTime(); }); } function wrapMessageWithTimeReceived(message: any, previousMessageID: string): any { // check if message has already a time received property if so no need to add anymore if (!message.timeReceived) { let WrappedMessage: WrappedMessage = { timeReceived: new Date(), payload: message as BaseMessage, previousMessageID: previousMessageID } return WrappedMessage } else { return message as WrappedMessage } } async function getStoredMessages(): Promise { return new Promise((resolve, reject) => { let array = [] setTimeout(() => { abstractStorage.forEach(message => { array.push(message) }) abstractStorage = [] }, 5000) resolve(array) }) } // just an abstraction async function storeMessage(message: WrappedMessage): Promise { return new Promise((resolve, reject) => { try { setTimeout(() => { console.log(`Storing ${message.payload.header.messageID}....`) abstractStorage.push(message) resolve(message.payload.header.messageID) }, 1000) } catch (error) { reject(error) } }) } function getDataAndUpdateState() { transmissionState.next('GETTING STORED DATA') console.log(`Current transmission state: ${transmissionState.getValue()}`) getStoredMessages().then((storedMessages: WrappedMessage[]) => { if (storedMessages.length > 0) { console.log(`${storedMessages.length} STORED messages.`) from(storedMessages).subscribe({ next: message => { wrappedMessage.next(message) }, error: err => console.error(err), complete: () => { console.log(`Flushed back ${storedMessages.length} messages back in buffer`) transmissionState.next('ARRAY EMPTY') bufferReleaseSignal.next() } }) } else { console.log(`${storedMessages.length} STORED messages.`) transmissionState.next('ARRAY EMPTY') bufferReleaseSignal.next() } }).catch((err) => { console.error(err) }) } export interface WrappedMessage { timeReceived: any, // this property is for sender to sort payload: BaseMessage, previousMessageID?: string // this property is for receiver to sort } // Listen for a connection event socket.on('connect', () => { socket.emit('Hello from the client!') console.log('Connected to the server:', socket.id) receiverConnectionState.next('ONLINE') }); // Listen for messages from the server socket.on('message', (msg: string) => { console.log('Message from server:', msg); }) async function sendMessage(message: WrappedMessage): Promise { return new Promise((resolve, reject) => { try { // extra precaution: According to chatgpt, if disconnected, then the payload will be loaded back in event queue whilst the socket will try to reestablish connection // https://socket.io/docs/v4/client-offline-behavior/ socket.emit('message', message); // inherently an aysnc console.log(`SocketEmit() for message to event queue ${message.payload.header.messageID} current tranmission State: ${transmissionState.getValue()} current connection State: ${receiverConnectionState.getValue()} ${receiverConnectionState.getValue()=='OFFLINE'? 'Message in the event queue will be attempted again after connection is back' : 'Sent over'}`); resolve('') } catch (error) { console.error('Error emitting message:', error); wrappedMessage.next(message) reject(error) }`` }) } // Handle disconnection socket.on('disconnect', () => { console.log('Disconnected from the server'); receiverConnectionState.next('OFFLINE') });