123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- import { BehaviorSubject, buffer, concatMap, distinctUntilChanged, from, interval, Observable, Subject, Subscriber, take, takeUntil, takeWhile } from "rxjs";
- import { io, Socket } from "socket.io-client";
- import { prepareResponseMessages } from "../services/utility/prepareFISmessage";
- import { BaseMessage } from "../dependencies/logging/services/logging-service";
- import { rejects } from "assert";
- // Connect to the server
- const socket: Socket = io('http://localhost:3000');
- export let abstractStorage: WrappedMessage[] = []
- export let bufferReleaseSignal: Subject<void> = new Subject()
- export let sender: Subject<BaseMessage> = prepareResponseMessages(3000, 500)
- export let receiverConnectionState: BehaviorSubject<'OFFLINE' | 'ONLINE'> = new BehaviorSubject('OFFLINE')
- export let transmissionState: BehaviorSubject<'TRANSMITTING' | 'IDLE' | 'ARRAY EMPTY' | 'STORING DATA' | 'GETTING STORED DATA'> = new BehaviorSubject('ARRAY EMPTY')
- export let arrayToBeTransmitted: Subject<WrappedMessage[]> = new Subject()
- export let toBeWrapped: Subject<any> = new Subject()
- export let wrappedMessage: Subject<WrappedMessage> = new Subject()
- export let transportLayerMessages = new Subject<any>()
- // run this to active the release mechanism
- releaseSignalManager()
- // sender goes to toBeWrapped
- sender.subscribe(message => toBeWrapped.next(message))
- // toBeWrapped will wrap the message with timeReceived and push next to wrappedMesasge subject
- let currentMessageId: string | null
- toBeWrapped.subscribe(message => {
- wrappedMessage.next(wrapMessageWithTimeReceived(message, currentMessageId ? currentMessageId : null))
- currentMessageId = message.header.messageID
- })
- //simulate connection test
- // wrappedMessage will then be pushed to buffer
- wrappedMessage.pipe(buffer(bufferReleaseSignal)).subscribe((bufferedMessages: WrappedMessage[]) => {
- console.log(bufferedMessages.length + ' buffered messages')
- console.log(`Released buffered message: ${bufferedMessages.length} total messages. To Be sorted.`)
- // arrayToBeTransmitted.next(sortMessage(bufferedMessages))
- arrayToBeTransmitted.next(bufferedMessages.length > 0 ? sortMessage(bufferedMessages) : [])
- });
- arrayToBeTransmitted.subscribe(array => {
- if (array.length > 0) {
- /* Note: Latest update, no point checking the receiver's connection state, since, once the message is pass on, it will
- be flushed into the event queue to be executed at a later time, which the connnection state would be mutated by then. */
- // update transmission to indicate that this batch of array is being processed
- transmissionState.next('TRANSMITTING')
- from(array).pipe(
- concatMap((message: WrappedMessage) => {
- if (transmissionState.getValue() === 'TRANSMITTING') {
- console.log(message.timeReceived);
- return sendMessage(message).catch((error) => {
- return storeMessage(message).then((msgId) => {
- console.log(`Message (${msgId}) stored Successfully. {TransmissionState: ${transmissionState.getValue()}}`);
- }).catch((error) => {
- console.error(error);
- });
- });
- } else if (transmissionState.getValue() === 'STORING DATA') {
- return storeMessage(message).then((msgId) => {
- console.log(`Message (${msgId}) stored Successfully. {TransmissionState: ${transmissionState.getValue()}}`);
- }).catch((error) => {
- console.error(error);
- });
- } else if (receiverConnectionState.getValue() === 'OFFLINE') {
- transmissionState.next('STORING DATA'); // to be fired every message processing
- return storeMessage(message).then((msgId) => {
- console.log(`Message (${msgId}) stored Successfully. {TransmissionState: ${transmissionState.getValue()}}`);
- }).catch((error) => {
- console.error(error);
- });
- } else {
- return Promise.resolve(); // No async work, but need to return a resolved promise
- }
- })
- ).subscribe({
- error: err => console.error(err),
- complete: () => {
- // update transmission state to indicate this batch is completed
- console.log(`Processing buffered array completed. Changing transmission state to ARRAY EMPTY`);
- transmissionState.next('ARRAY EMPTY');
- if (receiverConnectionState.getValue() === 'ONLINE' && transmissionState.getValue() === 'ARRAY EMPTY') {
- setTimeout(() => {
- bufferReleaseSignal.next()
- }, 1000)
- }
- // Do nothing if the receiver connection is offline
- }
- });
- } else {
- // If I don't do setTimeout, then bufferrelasesignal will be overloaded
- setTimeout(() => {
- bufferReleaseSignal.next()
- }, 3000)
- }
- }
- )
- /* Utils */
- function releaseSignalManager() {
- receiverConnectionState.pipe(
- distinctUntilChanged()
- ).subscribe(clientState => {
- console.log(`Client is now ${clientState}`)
- if (clientState == 'OFFLINE') {
- console.log(`Current transmission state: ${transmissionState.getValue()}`)
- // just keep buffering
- }
- if (clientState == 'ONLINE') {
- console.log(`Current transmission state: ${transmissionState.getValue()}`)
- // get the stored messages to pump it back into the buffer to be ready to be processed immediately
- if (transmissionState.getValue() == 'ARRAY EMPTY') {
- getDataAndUpdateState()
- }
- if (transmissionState.getValue() == 'STORING DATA') {
- // have to wait for storing data to be completed before proceeding to the code above
- transmissionState.pipe(
- takeWhile(value => value == 'ARRAY EMPTY') //listen to this value and then destroy this observable
- ).subscribe({
- next: () => {
- getDataAndUpdateState()
- },
- error: err => console.error(err),
- complete: () => { }
- })
- }
- }
- })
- }
- function sortMessage(array: WrappedMessage[]): WrappedMessage[] {
- console.log(`Sorting ${array.length} messages....`)
- return array.sort((a, b) => {
- return new Date(a.timeReceived).getTime() - new Date(b.timeReceived).getTime();
- });
- }
- function wrapMessageWithTimeReceived(message: any, previousMessageID: string): any {
- // check if message has already a time received property if so no need to add anymore
- if (!message.timeReceived) {
- let WrappedMessage: WrappedMessage = {
- timeReceived: new Date(),
- payload: message as BaseMessage,
- previousMessageID: previousMessageID
- }
- return WrappedMessage
- } else {
- return message as WrappedMessage
- }
- }
- async function getStoredMessages(): Promise<WrappedMessage[]> {
- return new Promise((resolve, reject) => {
- let array = []
- setTimeout(() => {
- abstractStorage.forEach(message => {
- array.push(message)
- })
- abstractStorage = []
- }, 5000)
- resolve(array)
- })
- }
- // just an abstraction
- async function storeMessage(message: WrappedMessage): Promise<any> {
- return new Promise((resolve, reject) => {
- try {
- setTimeout(() => {
- console.log(`Storing ${message.payload.header.messageID}....`)
- abstractStorage.push(message)
- resolve(message.payload.header.messageID)
- }, 1000)
- }
- catch (error) {
- reject(error)
- }
- })
- }
- function getDataAndUpdateState() {
- transmissionState.next('GETTING STORED DATA')
- console.log(`Current transmission state: ${transmissionState.getValue()}`)
- getStoredMessages().then((storedMessages: WrappedMessage[]) => {
- if (storedMessages.length > 0) {
- console.log(`${storedMessages.length} STORED messages.`)
- from(storedMessages).subscribe({
- next: message => {
- wrappedMessage.next(message)
- },
- error: err => console.error(err),
- complete: () => {
- console.log(`Flushed back ${storedMessages.length} messages back in buffer`)
- transmissionState.next('ARRAY EMPTY')
- bufferReleaseSignal.next()
- }
- })
- } else {
- console.log(`${storedMessages.length} STORED messages.`)
- transmissionState.next('ARRAY EMPTY')
- bufferReleaseSignal.next()
- }
- }).catch((err) => {
- console.error(err)
- })
- }
- export interface WrappedMessage {
- timeReceived: any, // this property is for sender to sort
- payload: BaseMessage,
- previousMessageID?: string // this property is for receiver to sort
- }
- // Listen for a connection event
- socket.on('connect', () => {
- socket.emit('Hello from the client!')
- console.log('Connected to the server:', socket.id)
- receiverConnectionState.next('ONLINE')
- });
- // Listen for messages from the server
- socket.on('message', (msg: string) => {
- console.log('Message from server:', msg);
- })
- async function sendMessage(message: WrappedMessage): Promise<any> {
- return new Promise((resolve, reject) => {
- try {
- // extra precaution: According to chatgpt, if disconnected, then the payload will be loaded back in event queue whilst the socket will try to reestablish connection
- // https://socket.io/docs/v4/client-offline-behavior/
- socket.emit('message', message); // inherently an aysnc
- console.log(`SocketEmit() for message to event queue ${message.payload.header.messageID}
- current tranmission State: ${transmissionState.getValue()}
- current connection State: ${receiverConnectionState.getValue()}
- ${receiverConnectionState.getValue()=='OFFLINE'? 'Message in the event queue will be attempted again after connection is back' : 'Sent over'}`);
- resolve('')
- } catch (error) {
- console.error('Error emitting message:', error);
- wrappedMessage.next(message)
- reject(error)
- }``
- })
- }
- // Handle disconnection
- socket.on('disconnect', () => {
- console.log('Disconnected from the server');
- receiverConnectionState.next('OFFLINE')
- });
|