|
@@ -0,0 +1,250 @@
|
|
|
+import { BehaviorSubject, buffer, concatMap, distinctUntilChanged, from, interval, Observable, Subject, Subscriber, take, takeUntil, takeWhile } from "rxjs";
|
|
|
+import { io, Socket } from "socket.io-client";
|
|
|
+import { prepareResponseMessages } from "../services/utility/prepareFISmessage";
|
|
|
+import { BaseMessage } from "../dependencies/logging/services/logging-service";
|
|
|
+import { rejects } from "assert";
|
|
|
+// Connect to the server
|
|
|
+const socket: Socket = io('http://localhost:3000');
|
|
|
+
|
|
|
+export let abstractStorage: WrappedMessage[] = []
|
|
|
+export let bufferReleaseSignal: Subject<void> = new Subject()
|
|
|
+export let sender: Subject<BaseMessage> = prepareResponseMessages(3000, 500)
|
|
|
+export let receiverConnectionState: BehaviorSubject<'OFFLINE' | 'ONLINE'> = new BehaviorSubject('OFFLINE')
|
|
|
+export let transmissionState: BehaviorSubject<'TRANSMITTING' | 'IDLE' | 'ARRAY EMPTY' | 'STORING DATA' | 'GETTING STORED DATA'> = new BehaviorSubject('ARRAY EMPTY')
|
|
|
+export let arrayToBeTransmitted: Subject<WrappedMessage[]> = new Subject()
|
|
|
+export let toBeWrapped: Subject<any> = new Subject()
|
|
|
+export let wrappedMessage: Subject<WrappedMessage> = new Subject()
|
|
|
+export let transportLayerMessages = new Subject<any>()
|
|
|
+
|
|
|
+// run this to active the release mechanism
|
|
|
+releaseSignalManager()
|
|
|
+// sender goes to toBeWrapped
|
|
|
+sender.subscribe(message => toBeWrapped.next(message))
|
|
|
+// toBeWrapped will wrap the message with timeReceived and push next to wrappedMesasge subject
|
|
|
+let currentMessageId: string | null
|
|
|
+toBeWrapped.subscribe(message => {
|
|
|
+ wrappedMessage.next(wrapMessageWithTimeReceived(message, currentMessageId ? currentMessageId : null))
|
|
|
+ currentMessageId = message.header.messageID
|
|
|
+})
|
|
|
+//simulate connection test
|
|
|
+
|
|
|
+// wrappedMessage will then be pushed to buffer
|
|
|
+wrappedMessage.pipe(buffer(bufferReleaseSignal)).subscribe((bufferedMessages: WrappedMessage[]) => {
|
|
|
+ console.log(bufferedMessages.length + ' buffered messages')
|
|
|
+ console.log(`Released buffered message: ${bufferedMessages.length} total messages. To Be sorted.`)
|
|
|
+ // arrayToBeTransmitted.next(sortMessage(bufferedMessages))
|
|
|
+ arrayToBeTransmitted.next(bufferedMessages.length > 0 ? sortMessage(bufferedMessages) : [])
|
|
|
+});
|
|
|
+
|
|
|
+arrayToBeTransmitted.subscribe(array => {
|
|
|
+ if (array.length > 0) {
|
|
|
+ /* Note: Latest update, no point checking the receiver's connection state, since, once the message is pass on, it will
|
|
|
+ be flushed into the event queue to be executed at a later time, which the connnection state would be mutated by then. */
|
|
|
+ // update transmission to indicate that this batch of array is being processed
|
|
|
+ transmissionState.next('TRANSMITTING')
|
|
|
+ from(array).pipe(
|
|
|
+ concatMap((message: WrappedMessage) => {
|
|
|
+ if (transmissionState.getValue() === 'TRANSMITTING') {
|
|
|
+ console.log(message.timeReceived);
|
|
|
+ return sendMessage(message).catch((error) => {
|
|
|
+ return storeMessage(message).then((msgId) => {
|
|
|
+ console.log(`Message (${msgId}) stored Successfully. {TransmissionState: ${transmissionState.getValue()}}`);
|
|
|
+ }).catch((error) => {
|
|
|
+ console.error(error);
|
|
|
+ });
|
|
|
+ });
|
|
|
+ } else if (transmissionState.getValue() === 'STORING DATA') {
|
|
|
+ return storeMessage(message).then((msgId) => {
|
|
|
+ console.log(`Message (${msgId}) stored Successfully. {TransmissionState: ${transmissionState.getValue()}}`);
|
|
|
+ }).catch((error) => {
|
|
|
+ console.error(error);
|
|
|
+ });
|
|
|
+ } else if (receiverConnectionState.getValue() === 'OFFLINE') {
|
|
|
+ transmissionState.next('STORING DATA'); // to be fired every message processing
|
|
|
+ return storeMessage(message).then((msgId) => {
|
|
|
+ console.log(`Message (${msgId}) stored Successfully. {TransmissionState: ${transmissionState.getValue()}}`);
|
|
|
+ }).catch((error) => {
|
|
|
+ console.error(error);
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ return Promise.resolve(); // No async work, but need to return a resolved promise
|
|
|
+ }
|
|
|
+ })
|
|
|
+ ).subscribe({
|
|
|
+ error: err => console.error(err),
|
|
|
+ complete: () => {
|
|
|
+ // update transmission state to indicate this batch is completed
|
|
|
+ console.log(`Processing buffered array completed. Changing transmission state to ARRAY EMPTY`);
|
|
|
+ transmissionState.next('ARRAY EMPTY');
|
|
|
+
|
|
|
+ if (receiverConnectionState.getValue() === 'ONLINE' && transmissionState.getValue() === 'ARRAY EMPTY') {
|
|
|
+ setTimeout(() => {
|
|
|
+ bufferReleaseSignal.next()
|
|
|
+ }, 1000)
|
|
|
+ }
|
|
|
+ // Do nothing if the receiver connection is offline
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } else {
|
|
|
+ // If I don't do setTimeout, then bufferrelasesignal will be overloaded
|
|
|
+ setTimeout(() => {
|
|
|
+ bufferReleaseSignal.next()
|
|
|
+ }, 3000)
|
|
|
+ }
|
|
|
+}
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+/* Utils */
|
|
|
+function releaseSignalManager() {
|
|
|
+ receiverConnectionState.pipe(
|
|
|
+ distinctUntilChanged()
|
|
|
+ ).subscribe(clientState => {
|
|
|
+ console.log(`Client is now ${clientState}`)
|
|
|
+ if (clientState == 'OFFLINE') {
|
|
|
+ console.log(`Current transmission state: ${transmissionState.getValue()}`)
|
|
|
+ // just keep buffering
|
|
|
+ }
|
|
|
+ if (clientState == 'ONLINE') {
|
|
|
+ console.log(`Current transmission state: ${transmissionState.getValue()}`)
|
|
|
+ // get the stored messages to pump it back into the buffer to be ready to be processed immediately
|
|
|
+ if (transmissionState.getValue() == 'ARRAY EMPTY') {
|
|
|
+ getDataAndUpdateState()
|
|
|
+ }
|
|
|
+ if (transmissionState.getValue() == 'STORING DATA') {
|
|
|
+ // have to wait for storing data to be completed before proceeding to the code above
|
|
|
+ transmissionState.pipe(
|
|
|
+ takeWhile(value => value == 'ARRAY EMPTY') //listen to this value and then destroy this observable
|
|
|
+ ).subscribe({
|
|
|
+ next: () => {
|
|
|
+ getDataAndUpdateState()
|
|
|
+ },
|
|
|
+ error: err => console.error(err),
|
|
|
+ complete: () => { }
|
|
|
+ })
|
|
|
+ }
|
|
|
+ }
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+function sortMessage(array: WrappedMessage[]): WrappedMessage[] {
|
|
|
+ console.log(`Sorting ${array.length} messages....`)
|
|
|
+ return array.sort((a, b) => {
|
|
|
+ return new Date(a.timeReceived).getTime() - new Date(b.timeReceived).getTime();
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
+function wrapMessageWithTimeReceived(message: any, previousMessageID: string): any {
|
|
|
+ // check if message has already a time received property if so no need to add anymore
|
|
|
+ if (!message.timeReceived) {
|
|
|
+ let WrappedMessage: WrappedMessage = {
|
|
|
+ timeReceived: new Date(),
|
|
|
+ payload: message as BaseMessage,
|
|
|
+ previousMessageID: previousMessageID
|
|
|
+ }
|
|
|
+ return WrappedMessage
|
|
|
+ } else {
|
|
|
+ return message as WrappedMessage
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+async function getStoredMessages(): Promise<WrappedMessage[]> {
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
+ let array = []
|
|
|
+ setTimeout(() => {
|
|
|
+ abstractStorage.forEach(message => {
|
|
|
+ array.push(message)
|
|
|
+ })
|
|
|
+ abstractStorage = []
|
|
|
+ }, 5000)
|
|
|
+ resolve(array)
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+// just an abstraction
|
|
|
+async function storeMessage(message: WrappedMessage): Promise<any> {
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
+ try {
|
|
|
+ setTimeout(() => {
|
|
|
+ console.log(`Storing ${message.payload.header.messageID}....`)
|
|
|
+ abstractStorage.push(message)
|
|
|
+ resolve(message.payload.header.messageID)
|
|
|
+ }, 1000)
|
|
|
+ }
|
|
|
+ catch (error) {
|
|
|
+ reject(error)
|
|
|
+ }
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+function getDataAndUpdateState() {
|
|
|
+ transmissionState.next('GETTING STORED DATA')
|
|
|
+ console.log(`Current transmission state: ${transmissionState.getValue()}`)
|
|
|
+ getStoredMessages().then((storedMessages: WrappedMessage[]) => {
|
|
|
+ if (storedMessages.length > 0) {
|
|
|
+ console.log(`${storedMessages.length} STORED messages.`)
|
|
|
+ from(storedMessages).subscribe({
|
|
|
+ next: message => {
|
|
|
+ wrappedMessage.next(message)
|
|
|
+ },
|
|
|
+ error: err => console.error(err),
|
|
|
+ complete: () => {
|
|
|
+ console.log(`Flushed back ${storedMessages.length} messages back in buffer`)
|
|
|
+ transmissionState.next('ARRAY EMPTY')
|
|
|
+ bufferReleaseSignal.next()
|
|
|
+ }
|
|
|
+ })
|
|
|
+ } else {
|
|
|
+ console.log(`${storedMessages.length} STORED messages.`)
|
|
|
+ transmissionState.next('ARRAY EMPTY')
|
|
|
+ bufferReleaseSignal.next()
|
|
|
+ }
|
|
|
+ }).catch((err) => {
|
|
|
+ console.error(err)
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+export interface WrappedMessage {
|
|
|
+ timeReceived: any, // this property is for sender to sort
|
|
|
+ payload: BaseMessage,
|
|
|
+ previousMessageID?: string // this property is for receiver to sort
|
|
|
+}
|
|
|
+// Listen for a connection event
|
|
|
+socket.on('connect', () => {
|
|
|
+ socket.emit('Hello from the client!')
|
|
|
+ console.log('Connected to the server:', socket.id)
|
|
|
+ receiverConnectionState.next('ONLINE')
|
|
|
+});
|
|
|
+
|
|
|
+// Listen for messages from the server
|
|
|
+socket.on('message', (msg: string) => {
|
|
|
+ console.log('Message from server:', msg);
|
|
|
+})
|
|
|
+
|
|
|
+async function sendMessage(message: WrappedMessage): Promise<any> {
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
+ try {
|
|
|
+ // extra precaution: According to chatgpt, if disconnected, then the payload will be loaded back in event queue whilst the socket will try to reestablish connection
|
|
|
+ // https://socket.io/docs/v4/client-offline-behavior/
|
|
|
+ socket.emit('message', message); // inherently an aysnc
|
|
|
+ console.log(`SocketEmit() for message to event queue ${message.payload.header.messageID}
|
|
|
+ current tranmission State: ${transmissionState.getValue()}
|
|
|
+ current connection State: ${receiverConnectionState.getValue()}
|
|
|
+ ${receiverConnectionState.getValue()=='OFFLINE'? 'Message in the event queue will be attempted again after connection is back' : 'Sent over'}`);
|
|
|
+ resolve('')
|
|
|
+ } catch (error) {
|
|
|
+ console.error('Error emitting message:', error);
|
|
|
+ wrappedMessage.next(message)
|
|
|
+ reject(error)
|
|
|
+ }``
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
+// Handle disconnection
|
|
|
+socket.on('disconnect', () => {
|
|
|
+ console.log('Disconnected from the server');
|
|
|
+ receiverConnectionState.next('OFFLINE')
|
|
|
+});
|
|
|
+
|
|
|
+
|
|
|
+
|