import { MessageTransmissionBase } from "../base/msg.transmission.base"; import { v4 as uuidv4 } from 'uuid' import { BehaviorSubject, distinct, distinctUntilChanged, filter, map, Observable, Subject, Subscription } from "rxjs"; import { RetransmissionService } from "../utils/retransmission.service"; import { WrappedMessage } from "../utils/message.ordering"; import ConsoleLogger from "../utils/log.utils"; import { AdapterInterface, ConnectionState, FisMessage, GeneralEvent, MessageTransmitterInterface, TransmitterAdapterInterface, TransportMessage } from "../interface/interface"; import { error } from "console"; /* Take in all the messages that needs to be transported, and divide them accordingly. So the connector instances will do just that connectors or adapters will have their own identifier*/ export class MessageTransmissionTransmitter extends MessageTransmissionBase implements MessageTransmitterInterface { private connectionStateEvent: BehaviorSubject = new BehaviorSubject('OFFLINE') private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionTransmitter`, ['transmission']) private messageToBeBuffered!: Subject private messageToBeTransmitted!: Subject private buffer!: RetransmissionService; private currentAdapter!: TransmitterAdapterInterface constructor(clientId: string, event: Subject>) { super() this.console.log({ message: `Constructing Transmitter Transmission with ${clientId}` }) this.event = event this.messageToBeTransmitted = new Subject() this.messageToBeBuffered = new Subject() this.buffer = new RetransmissionService() this.handleAdapters(this.event) this.setupBuffer() // special case just for http in case of server/client disconnected, the unsent msg will be flushed back into messageToBeBuffered // logic here } public emit(message: FisMessage): void { this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting message` : `Buffering message`}` }) this.messageToBeBuffered.next(message) } private setupBuffer(): void { this.console.log({ message: `Setting up Retransmission Service...` }) this.event.pipe( filter(event => event.data.clientId == this.clientId), filter(event => event.event == 'Client Disconnected' || event.event == 'Client Re-connected' || event.event == 'Client Connected' || event.event == 'Server Disconnected' || event.event == 'Server Connected'), map(event => { if (event.event == 'Client Disconnected' || event.event == 'Server Disconnected') { return 'OFFLINE' } else { return `ONLINE` } }), distinctUntilChanged() ).subscribe((signal: ConnectionState) => { this.connectionStateEvent.next(signal) if (signal == 'OFFLINE') this.console.error({ message: `${this.clientId} disconnected` }) if (signal == 'ONLINE') this.console.log({ message: `${this.clientId} connected` }) }) this.buffer.implementRetransmission(this.messageToBeBuffered, this.connectionStateEvent.asObservable(), true) // automatically subscribe to allow released bffered messages to be released this.buffer.returnSubjectForBufferedItems().subscribe((bufferedMessage: WrappedMessage) => { // need to work with wrapped messages this.console.log({ message: `Releasing ${bufferedMessage.thisMessageID}` }); if (this.currentAdapter) { // this.currentAdapter.emit(bufferedMessage) this.messageToBeTransmitted.next(bufferedMessage) } else { this.messageToBeBuffered.next(bufferedMessage) this.console.error({ message: `Adapter is not set. Please ensure adapters are ready.` }) } }) } private handleAdapters(adaptersEvent: Subject>): void { adaptersEvent.pipe( filter(event => event.event === `New Adapter`), map(event => { return event.data }), ).subscribe({ next: (adapters: AdapterInterface[]) => { adapters.forEach((adapter: AdapterInterface) => { if (adapter.role === `Transmitter`) { this.adapters.push(adapter as TransmitterAdapterInterface) adaptersEvent.next({ id: uuidv4(), type: 'Transmission Event', event: `New Adapter`, date: new Date(), data: adapter, transport: adapter.transport }) } }) }, error: error => this.console.error({ message: 'Observer Error', details: error }) }) // listen to newly added adapters in transmission adaptersEvent.pipe( filter(event => event.type === `Transmission Event`), filter(event => event.event === `New Adapter`), map(event => { return event.data }) ).subscribe((adapter: AdapterInterface) => { if (!this.currentAdapter) { this.currentAdapter = adapter as TransmitterAdapterInterface } else { this.console.log({ message: `Already have existing transmitting adapter. Currently hardcode to use only 1` }) } }) } // temporary logic for now. private setUpAdapter(): void { if (!this.currentAdapter && this.adapters.some(adapter => adapter.transport === `Websocket`)) { this.currentAdapter = this.adapters.find(adapter => adapter.transport === `Websocket`) as TransmitterAdapterInterface } else { this.console.error({ message: 'No websocket socket adapter avaiable' }) } } private uniqueHandlerToFlushUnsentMessages(event: Observable>): void { event.pipe( filter(event => event.event == 'Re-Flush'), filter(event => event.data.clientId == this.clientId), ).subscribe((event: GeneralEvent) => { this.console.log({ message: `${this.connectionStateEvent.getValue() == 'ONLINE' ? `Transmitting ${((event.data.payload as TransportMessage).payload as WrappedMessage).thisMessageID}` : `Buffering ${((event.data.payload as TransportMessage).payload as WrappedMessage).thisMessageID}`}` }) this.messageToBeBuffered.next(((event.data.payload as TransportMessage).payload as WrappedMessage)) }) } }