import { filter, map, Observable, Observer, PartialObserver, Subject, Subscriber, Subscription, takeWhile, Unsubscribable } from 'rxjs'; import { AdapterProfile, AdapterSet, AdaptorTransmissionRole, TransportEvent, TransportMessage } from '../interface/connector.interface'; import { MessageTransmissionBase } from './msg.transmission.base'; import { Bus, EventMessage, FisMessage, MessageReceiver as MessageReceiverInterface, ReceiverProfile, TransmissionMessage, TransmitterProfile } from '../interface/transport.interface' import { ConnectionAdapter } from '../connector/connector.base'; import { v4 as uuidv4 } from 'uuid' import { ReceiverConnectionAdapter } from '../connector/connector.receiver'; import { ConnectionManager } from '../connector/connector.manager'; export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface { receiverProfile!: ReceiverProfile; private incomingTransportMessage: Subject = new Subject() constructor(profile: ReceiverProfile, adapterSets: AdapterSet[]) { super(); this.setReceiver(profile) } setReceiver(receiverProfile: ReceiverProfile): void { this.receiverProfile = receiverProfile } getMessageBus(bus: Bus): Observable { return new Observable((observable: Observer) => { // logic here if (bus == Bus.GeneralBus) { const subscription: Subscription = this.incomingTransportMessage.pipe( filter((message: TransportMessage) => message.payload === `Incoming Message`), map(message => message.payload as TransportMessage) ).subscribe((message: TransportMessage) => { observable.next({ adapterId: message.target as string, // assign adapter id so that it knows who to reply back payload: message.payload }); }); // Clean up on unsubscription return () => { subscription.unsubscribe(); }; } }) } }