import { BehaviorSubject, buffer, concatMap, distinctUntilChanged, from, Observable, Subject, takeWhile } from "rxjs"; import { BaseMessage } from "../dependencies/logging/dependencies/msgutil/dependencies/dependencies"; import { RetransmissionInterface } from "../interfaces/retransmission.interface"; import { WrappedMessage } from "../interfaces/general.interface"; import { sortMessageBasedOnDate } from "./utility/message-ordering"; export class RetransmissionService implements RetransmissionInterface { private sortMessage: boolean = false private bufferReleaseSignal: Subject = new Subject() private receiverConnectionState: BehaviorSubject<'OFFLINE' | 'ONLINE'> = new BehaviorSubject('OFFLINE') private transmissionState: BehaviorSubject<'TRANSMITTING' | 'IDLE' | 'ARRAY EMPTY' | 'STORING DATA' | 'GETTING STORED DATA'> = new BehaviorSubject('ARRAY EMPTY') private arrayToBeTransmitted: Subject = new Subject() private toBeWrapped: Subject = new Subject() private wrappedMessageToBeBuffered: Subject = new Subject() private messageToBeTransmitted: Subject = new Subject() // Interface public retransmission(payloadToBeTransmitted: Observable, eventListener: Observable, messageOrdering?: boolean): Observable { return new Observable((observer) => { if (messageOrdering) { this.sortMessage = true console.log(`Message ordering is set to ${this.sortMessage}`) } eventListener.subscribe(event => this.receiverConnectionState.next(event)) this.startWrappingOperation() this.startBufferTransmisionProcess() this.releaseSignalManager() payloadToBeTransmitted.subscribe((message) => { this.toBeWrapped.next(message) }) this.messageToBeTransmitted.subscribe(message => observer.next(message)) }) } private startWrappingOperation() { let currentMessageId: string | null this.toBeWrapped.subscribe(message => { this.wrappedMessageToBeBuffered.next(this.wrapMessageWithTimeReceived(message, currentMessageId ? currentMessageId : null)) currentMessageId = message.header.messageID }) //simulate connection test // wrappedMessageToBeBuffered will then be pushed to buffer this.wrappedMessageToBeBuffered.pipe(buffer(this.bufferReleaseSignal)).subscribe((bufferedMessages: WrappedMessage[]) => { console.log(bufferedMessages.length + ' buffered messages') console.log(`Released buffered message: ${bufferedMessages.length} total messages. To Be sorted.`) // arrayToBeTransmitted.next(sortMessage(bufferedMessages)) this.arrayToBeTransmitted.next(this.sortMessage && bufferedMessages.length > 0 ? sortMessageBasedOnDate(bufferedMessages) : []) }); } private wrapMessageWithTimeReceived(message: any, previousMessageID: string): WrappedMessage { // check if message has already a time received property if so no need to add anymore if (!message.timeReceived) { let WrappedMessage: WrappedMessage = { timeReceived: new Date(), payload: message as BaseMessage, previousMessageID: previousMessageID } return WrappedMessage } else { return message as WrappedMessage } } private startBufferTransmisionProcess() { console.log(`StartBufferTransmissionProcess`) this.arrayToBeTransmitted.subscribe(array => { if (array.length > 0) { this.transmissionState.next('TRANSMITTING') from(array).subscribe({ next: (message: WrappedMessage) => { if (this.receiverConnectionState.getValue() == 'OFFLINE') { // buffer this message. Flush it back to buffer this.wrappedMessageToBeBuffered.next(message) } if (this.receiverConnectionState.getValue() == 'ONLINE') { this.messageToBeTransmitted.next(message) } }, error: err => console.error(err), complete: () => { // update transmission state to indicate this batch is completed console.log(`Processing buffered array completed. Changing transmission state to ARRAY EMPTY`); this.transmissionState.next('ARRAY EMPTY'); if (this.receiverConnectionState.getValue() === 'ONLINE' && this.transmissionState.getValue() === 'ARRAY EMPTY') { setTimeout(() => { this.bufferReleaseSignal.next() }, 1000) } // Do nothing if the receiver connection is offline } }); } else { // If I don't do setTimeout, then bufferrelasesignal will be overloaded setTimeout(() => { this.bufferReleaseSignal.next() }, 3000) } } ) } private releaseSignalManager() { this.receiverConnectionState.pipe( distinctUntilChanged() ).subscribe(clientState => { console.log(`Client is now ${clientState}`) if (clientState == 'OFFLINE') { console.log(`Current transmission state: ${this.transmissionState.getValue()}`) // just keep buffering } if (clientState == 'ONLINE') { console.log(`Current transmission state: ${this.transmissionState.getValue()}`) // get the stored messages to pump it back into the buffer to be ready to be processed immediately if (this.transmissionState.getValue() == 'ARRAY EMPTY') { this.bufferReleaseSignal.next() } } }) } }