transport.service.ts 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import { Observable, Subject } from "rxjs";
  2. import dotenv from 'dotenv';
  3. import { v4 as uuidv4 } from 'uuid';
  4. import { WebsocketTransportService } from "./websocket";
  5. import { ITransport, TransportSettings } from "../interface/ITransport.interface";
  6. dotenv.config();
  7. /* The application doesn't care at this point whether or not what transport to use or what role. it just wants to send request and expecting
  8. response as well as emitting notification and receiving notification from concerned parties. All the managing parts should be here. */
  9. export class TransportService {
  10. private incomingResponse: Subject<any> = new Subject()
  11. private eventNotification: Subject<any> = new Subject()
  12. // this is assuming that the application will do request response, otherwise, just one will do.
  13. private transmittingService!: ITransport
  14. private receivingService!: ITransport
  15. private serverPort!: number
  16. private serverUrl!: string
  17. constructor(port?: number, url?: string) {
  18. // just putting it here for testing for now.
  19. if (port) this.serverPort = port
  20. if (url) this.serverUrl = url
  21. // logic here
  22. this.establishTransportTransmission({ transportName: process.env.Transport?.toString() })
  23. this.establishTransportReceiving({ transportName: process.env.Transport?.toString() }).then(() => {
  24. // pipe the notification from the server into local notification
  25. this.receivingService.getEventNotification().subscribe(this.eventNotification)
  26. })
  27. }
  28. public getIncomingResponse(): Observable<any> {
  29. return this.incomingResponse.asObservable()
  30. }
  31. public getEventNotification(): Observable<any> {
  32. return this.eventNotification.asObservable()
  33. }
  34. public send(message: any): Observable<any> {
  35. return new Observable((response) => {
  36. this.transmittingService.send(message)
  37. /// demultiplex here, assuming all responses are piped into this.incomingResponse
  38. this.incomingResponse.subscribe(responseMsg => {
  39. if (responseMsg.id == message.id) {
  40. response.next(responseMsg)
  41. } // don't forget to add complete message to close out this function
  42. })
  43. })
  44. }
  45. public emit(message: any): void {
  46. this.transmittingService.emit('notification', message)
  47. }
  48. private async establishTransportTransmission(tranportType: TranportType): Promise<any> {
  49. console.log(`Setting up ${tranportType.transportName} transmitter`)
  50. return new Promise((resolve, reject) => {
  51. let setting: TransportSettings = {
  52. role: 'Server',
  53. profileInfo: {
  54. id: uuidv4(),
  55. name: 'Server',
  56. port: this.serverPort ?? 3000
  57. }
  58. }
  59. if (tranportType.transportName == 'WEBSOCKET') {
  60. this.transmittingService = new WebsocketTransportService(setting)
  61. resolve(this.transmittingService)
  62. }
  63. })
  64. }
  65. private async establishTransportReceiving(tranportType: TranportType): Promise<any> {
  66. console.log(`Setting up ${tranportType.transportName} Receiver`)
  67. return new Promise((resolve, reject) => {
  68. let setting: TransportSettings = {
  69. role: 'Client',
  70. profileInfo: {
  71. id: uuidv4(),
  72. name: 'Client',
  73. url: this.serverUrl ?? 'http://localhost:3000'
  74. }
  75. }
  76. if (tranportType.transportName == 'WEBSOCKET') {
  77. this.receivingService = new WebsocketTransportService(setting)
  78. resolve(this.receivingService)
  79. }
  80. })
  81. }
  82. }
  83. export interface TranportType {
  84. transportName: 'WEBSOCKET' | 'GRPC' | 'HTTP' | 'KAFKA' | string | undefined
  85. }