import { Observable, Subject } from "rxjs"; import dotenv from 'dotenv'; import { v4 as uuidv4 } from 'uuid'; import { WebsocketTransportService } from "./websocket"; import { ITransport, TransportSettings } from "../interface/ITransport.interface"; dotenv.config(); /* The application doesn't care at this point whether or not what transport to use or what role. it just wants to send request and expecting response as well as emitting notification and receiving notification from concerned parties. All the managing parts should be here. */ export class TransportService { private incomingResponse: Subject = new Subject() private eventNotification: Subject = new Subject() // this is assuming that the application will do request response, otherwise, just one will do. private transmittingService!: ITransport private receivingService!: ITransport private serverPort!: number private serverUrl!: string constructor(port?: number, url?: string) { // just putting it here for testing for now. if (port) this.serverPort = port if (url) this.serverUrl = url // logic here this.establishTransportTransmission({ transportName: process.env.Transport?.toString() }) this.establishTransportReceiving({ transportName: process.env.Transport?.toString() }).then(() => { // pipe the notification from the server into local notification this.receivingService.getEventNotification().subscribe(this.eventNotification) }) } public getIncomingResponse(): Observable { return this.incomingResponse.asObservable() } public getEventNotification(): Observable { return this.eventNotification.asObservable() } public send(message: any): Observable { return new Observable((response) => { this.transmittingService.send(message) /// demultiplex here, assuming all responses are piped into this.incomingResponse this.incomingResponse.subscribe(responseMsg => { if (responseMsg.id == message.id) { response.next(responseMsg) } // don't forget to add complete message to close out this function }) }) } public emit(message: any): void { this.transmittingService.emit('notification', message) } private async establishTransportTransmission(tranportType: TranportType): Promise { console.log(`Setting up ${tranportType.transportName} transmitter`) return new Promise((resolve, reject) => { let setting: TransportSettings = { role: 'Server', profileInfo: { id: uuidv4(), name: 'Server', port: this.serverPort ?? 3000 } } if (tranportType.transportName == 'WEBSOCKET') { this.transmittingService = new WebsocketTransportService(setting) resolve(this.transmittingService) } }) } private async establishTransportReceiving(tranportType: TranportType): Promise { console.log(`Setting up ${tranportType.transportName} Receiver`) return new Promise((resolve, reject) => { let setting: TransportSettings = { role: 'Client', profileInfo: { id: uuidv4(), name: 'Client', url: this.serverUrl ?? 'http://localhost:3000' } } if (tranportType.transportName == 'WEBSOCKET') { this.receivingService = new WebsocketTransportService(setting) resolve(this.receivingService) } }) } } export interface TranportType { transportName: 'WEBSOCKET' | 'GRPC' | 'HTTP' | 'KAFKA' | string | undefined }