123456789101112131415161718192021222324252627282930313233343536373839404142434445 |
- import { filter, map, Observable, Observer, PartialObserver, Subject, Subscriber, Subscription, takeWhile, Unsubscribable } from 'rxjs';
- import { AdapterProfile, AdapterSet, AdaptorTransmissionRole, TransportEvent, TransportMessage } from '../interface/connector.interface';
- import { MessageTransmissionBase } from './msg.transmission.base';
- import { Bus, EventMessage, FisMessage, MessageReceiver as MessageReceiverInterface, ReceiverProfile, TransmissionMessage, TransmitterProfile } from '../interface/transport.interface'
- import { ConnectionAdapter } from '../connector/connector.base';
- import { v4 as uuidv4 } from 'uuid'
- import { ReceiverConnectionAdapter } from '../connector/connector.receiver';
- import { ConnectionManager } from '../connector/connector.manager';
- export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
- receiverProfile!: ReceiverProfile;
- private incomingTransportMessage: Subject<TransportMessage> = new Subject()
- constructor(profile: ReceiverProfile, adapterSets: AdapterSet[]) {
- super();
- this.setReceiver(profile)
- }
- setReceiver(receiverProfile: ReceiverProfile): void {
- this.receiverProfile = receiverProfile
- }
- getMessageBus(bus: Bus): Observable<TransmissionMessage> {
- return new Observable((observable: Observer<TransmissionMessage>) => {
- // logic here
- if (bus == Bus.GeneralBus) {
- const subscription: Subscription = this.incomingTransportMessage.pipe(
- filter((message: TransportMessage) => message.payload === `Incoming Message`),
- map(message => message.payload as TransportMessage)
- ).subscribe((message: TransportMessage) => {
- observable.next({
- adapterId: message.target as string, // assign adapter id so that it knows who to reply back
- payload: message.payload
- });
- });
- // Clean up on unsubscription
- return () => {
- subscription.unsubscribe();
- };
- }
- })
- }
- }
|