msg.transmission.receiver.ts 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. import { filter, map, Observable, Observer, PartialObserver, Subject, Subscriber, Subscription, takeWhile, Unsubscribable } from 'rxjs';
  2. import { AdapterProfile, AdapterSet, AdaptorTransmissionRole, TransportEvent, TransportMessage } from '../interface/connector.interface';
  3. import { MessageTransmissionBase } from './msg.transmission.base';
  4. import { Bus, EventMessage, FisMessage, MessageReceiver as MessageReceiverInterface, ReceiverProfile, TransmissionMessage, TransmitterProfile } from '../interface/transport.interface'
  5. import { ConnectionAdapter } from '../connector/connector.base';
  6. import { v4 as uuidv4 } from 'uuid'
  7. import { ReceiverConnectionAdapter } from '../connector/connector.receiver';
  8. import { ConnectionManager } from '../connector/connector.manager';
  9. export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
  10. receiverProfile!: ReceiverProfile;
  11. private incomingTransportMessage: Subject<TransportMessage> = new Subject()
  12. constructor(profile: ReceiverProfile, adapterSets: AdapterSet[]) {
  13. super();
  14. this.setReceiver(profile)
  15. }
  16. setReceiver(receiverProfile: ReceiverProfile): void {
  17. this.receiverProfile = receiverProfile
  18. }
  19. getMessageBus(bus: Bus): Observable<TransmissionMessage> {
  20. return new Observable((observable: Observer<TransmissionMessage>) => {
  21. // logic here
  22. if (bus == Bus.GeneralBus) {
  23. const subscription: Subscription = this.incomingTransportMessage.pipe(
  24. filter((message: TransportMessage) => message.payload === `Incoming Message`),
  25. map(message => message.payload as TransportMessage)
  26. ).subscribe((message: TransportMessage) => {
  27. observable.next({
  28. adapterId: message.target as string, // assign adapter id so that it knows who to reply back
  29. payload: message.payload
  30. });
  31. });
  32. // Clean up on unsubscription
  33. return () => {
  34. subscription.unsubscribe();
  35. };
  36. }
  37. })
  38. }
  39. }