import { BehaviorSubject, filter, map, Observable, Observer, Subject, Subscription } from 'rxjs'; import { v4 as uuidv4 } from 'uuid' import { ReceiverAdapter } from '../adapters/adapter.receiver'; import { checkMessage, WrappedMessage } from '../utils/message.ordering'; import ConsoleLogger from '../utils/log.utils'; import { MessageTransmissionBase } from '../base/msg.transmission.base'; import { AdapterInterface, AdapterManagerInterface, ConnectionState, FisMessage, GeneralEvent, MessageReceiverInterface, ReceiverAdapterInterface, TransmissionProfile, TransportMessage } from '../interface/interface'; import { error } from 'console'; export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface { private connectionStateEvent: BehaviorSubject = new BehaviorSubject(`OFFLINE`) private console: ConsoleLogger = new ConsoleLogger(`MessageTransmissionReceiver`, ['transmission']) private onHoldMessage: Subject = new Subject() private currentAdapter!: ReceiverAdapterInterface private incomingMessage: Subject> = new Subject() // private toBePassedOver: Subject = new Subject() constructor(profile: TransmissionProfile, adapterManager: AdapterManagerInterface) { super() this.profile = profile this.console.log({ message: `Constructing Receiver Transmission for Receiving target: ${this.profile.target}` }) this.initializeReceiverComponents(adapterManager) } public getReceivables(): Observable> { return new Observable((receivable: Observer>) => { this.console.log({ message: `Transmission streaming messages from ${this.profile.target}` }) const subscription: Subscription = this.incomingMessage.pipe( filter((event: GeneralEvent) => event.event == 'New Message'), ).subscribe((event: GeneralEvent) => { // console.log(event) // data is transportMessage instead of eventmessage this.onHoldMessage.next(((event.data as TransportMessage).payload as WrappedMessage)) checkMessage(((event.data as TransportMessage).payload as WrappedMessage), this.onHoldMessage).then(() => { // only release the message before it exists this.console.log({ message: `This one passes. Does have previousID. Case for message ordering` }) // console.log(((event.data as TransportMessage).payload as WrappedMessage)) receivable.next(event); }).catch((error) => { this.console.log({ message: `Observer Error`, details: error }) }) }) // Clean up on unsubscription return () => { subscription.unsubscribe(); }; }) } /* Assigned and update adapters record. Currently no logic to swtich adapters based on performance or whatever logic to be integrated in the future */ private initializeReceiverComponents(adapterManager: AdapterManagerInterface): void { adapterManager.subscribeForAdapters(this.profile.target, `Receiver`).pipe( ).subscribe((adapter: AdapterInterface) => { this.adapters.push(adapter) this.console.log({ message: `Adding new ${adapter.getAdapterProfile(`transportType`)} receiving adapter. Current adapter length: ${this.adapters.length}` }) if (!this.currentAdapter) { this.console.log({ message: `Setting this ${adapter.getAdapterProfile(`role`)} as current adapter.` }) this.currentAdapter = adapter as ReceiverAdapterInterface this.currentAdapter.subscribeForIncoming().subscribe({ next: (message: GeneralEvent) => { this.console.log({ message: `Received ${(((message.data as TransportMessage).payload as WrappedMessage).payload as FisMessage).header.messageID} from ${((message.data as TransportMessage).source)}`, details: message }) this.incomingMessage.next(message) }, error: error => { // Error handling. Idealling switching to other adapters } }) let connectionState: Observable = this.currentAdapter.getAdapterProfile(`connectionState`) as Observable connectionState.subscribe(this.connectionStateEvent) } else { this.currentAdapter.subscribeForIncoming().subscribe({ next: (message: GeneralEvent) => this.incomingMessage.next(message), error: error => { // Error handling. Idealling switching to other adapters } }) } }) } }