connector.ts 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. import { Observable, Observer, Subject } from "rxjs";
  2. import dotenv from 'dotenv';
  3. import { v4 as uuidv4 } from 'uuid';
  4. import { WebsocketTransportService } from "../transport/websocket";
  5. import { ITransportReceiving, ITransportSuper, ITransportTransmitting, ReceiverProfile, TransportEventNotification, TransportManagerEventNotification, TransportManagerInterface, TransportMessage, TransportSettings, TransportType } from "../interface/ITransport.interface";
  6. dotenv.config();
  7. /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
  8. So how?: */
  9. export class ConnectionManager {
  10. private incomingMessage: Subject<TransportMessage> = new Subject() // only for client roles usage. Servers will listen to event notification for incoming requests
  11. private outGoingMessage: Subject<FisMessage> = new Subject()
  12. private connectedClients: ReceiverProfile[] = []
  13. private transportEventNotification: Subject<TransportEventNotification> = new Subject()
  14. // this is assuming that the application will do request response, otherwise, just one will do.
  15. private transportService!: ITransportSuper
  16. private transmittingService!: ITransportTransmitting
  17. private receivingService!: ITransportReceiving
  18. private serverPort!: number
  19. private serverUrl!: string
  20. constructor(port?: number, url?: string) {
  21. if (port) {
  22. this.serverPort = port
  23. this.establishTransportTransmitting(process.env.Transport as TransportType).then((transmissionService: ITransportTransmitting) => {
  24. transmissionService.getTransportEventNotification().subscribe((notification: TransportEventNotification) => {
  25. // Collect client information when they are connected
  26. if (notification.event == `Connection` && notification.description == 'New Client Connected') {
  27. this.connectedClients.push(notification.data?.payload)
  28. this.handleNewReceiver(notification.data?.payload)
  29. }
  30. })
  31. }).catch((error) => console.error(error))
  32. }
  33. /* For web browser ui, because this server can also act as a receiver, not just transmitter as well. */
  34. if (url) {
  35. this.serverUrl = url
  36. this.establishTransportReceiving(process.env.Transport as TransportType).then((receivingService: ITransportReceiving) => {
  37. // logic here
  38. })
  39. }
  40. if (!port && !url) console.error(`No role has assigned for transport service due to lack of information provided!`)
  41. this.processOutputMessage()
  42. }
  43. // use for as a transmitter
  44. public emit(message: TransportMessage): void {
  45. if(this.transmittingService) {
  46. this.transmittingService.transmit(message)
  47. }
  48. }
  49. // use for as a receiver
  50. public send(message: TransportMessage): Observable<TransportMessage> {
  51. return new Observable((response: Observer<TransportMessage>) => {
  52. // logic here
  53. })
  54. }
  55. public getTransportManagerEventNotification(): Observable<TransportManagerEventNotification> {
  56. return new Observable((notification: Observer<TransportManagerEventNotification>) => {
  57. this.transportEventNotification.subscribe((transportNotification: TransportEventNotification) => {
  58. if (transportNotification.event == 'Connection') {
  59. notification.next({
  60. event: 'New Client Connection',
  61. data: transportNotification.data
  62. })
  63. }
  64. })
  65. })
  66. }
  67. private async establishTransportTransmitting(tranportType: TransportType): Promise<ITransportTransmitting> {
  68. console.log(`Setting up ${tranportType} transmitter`)
  69. return new Promise((resolve, reject) => {
  70. let setting: TransportSettings = {
  71. profileInfo: {
  72. id: uuidv4(),
  73. name: 'For Server',
  74. port: this.serverPort,
  75. transport: tranportType
  76. }
  77. }
  78. if (tranportType == 'WEBSOCKET') {
  79. this.transportService = this.transportService ? this.transportService : new WebsocketTransportService(setting)
  80. this.transmittingService = this.transportService
  81. resolve(this.transmittingService)
  82. } else {
  83. reject(`No such Transport Type Exist...`)
  84. }
  85. })
  86. }
  87. private async establishTransportReceiving(tranportType: TransportType): Promise<ITransportReceiving> {
  88. return new Promise((resolve, reject) => {
  89. let setting: TransportSettings = {
  90. profileInfo: {
  91. id: uuidv4(),
  92. name: 'For Client',
  93. url: this.serverUrl,
  94. transport: tranportType
  95. }
  96. }
  97. // will be sharing one instance, This part may break the code
  98. if (tranportType == 'WEBSOCKET') {
  99. this.receivingService = this.transportService ? this.transportService : new WebsocketTransportService(setting)
  100. resolve(this.receivingService)
  101. }
  102. })
  103. }
  104. // This function will wrap the message to be delivered over to established transport to be sent over
  105. private processOutputMessage(): void {
  106. this.outGoingMessage.subscribe({
  107. next: (message: FisMessage) => {
  108. if (message.header.messageName == 'NotificationMessage') {
  109. // This is just here temporarily. Because The application will be concerned with whom the party is subcribed to, not this transport manager
  110. this.transmittingService.transmit({
  111. id: message.header.messageID,
  112. receiverID: '',
  113. payload: message,
  114. event: `notification`
  115. })
  116. }
  117. if (message.header.messageName == 'ResponseMessage') {
  118. // Map responses according to the requestID???
  119. this.transmittingService.transmit({
  120. id: message.header.messageID,
  121. receiverID: '',
  122. payload: message,
  123. event: `resposne`
  124. })
  125. }
  126. }
  127. })
  128. }
  129. // Responsibility include subscribing to event notification as well as tranpport instance for message transmission
  130. private handleNewReceiver(receiver: ReceiverProfile) {
  131. receiver.eventNotification.subscribe({
  132. next: event => {
  133. // new request will be handled. And then manager will here will pick it up from eventNotification and respond accordingly if there's a need for it
  134. if (event.event == `Disconnection`) {
  135. console.error(`Client ${event.data?.receiverID ?? 'undefined'} disconnected.`)
  136. receiver.eventNotification.complete() // no need to listen since it's disconnected. A new one will be established when it reconnects again
  137. }
  138. if (event.event == `New Message`) {
  139. console.log(`New Client request ${event.data?.payload.header.messageID} at receiverID: ${receiver.uuid}`)
  140. }
  141. },
  142. error: error => {
  143. console.error(error)
  144. receiver.eventNotification.error(error)
  145. }
  146. })
  147. }
  148. }
  149. export interface FisMessage {
  150. header: {
  151. messageName: 'RequestMessage' | 'ResponseMessage' | 'NotificationMessage',
  152. messageID: string
  153. },
  154. data: any
  155. }