| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 | import { Observable, Subject } from "rxjs";import dotenv from 'dotenv';import { v4 as uuidv4 } from 'uuid';import { WebsocketTransportService } from "./websocket";import { ITransport, TransportSettings } from "../interface/ITransport.interface";dotenv.config();/* The application doesn't care at this point whether or not what transport to use or what role. it just wants to send request and expectingresponse as well as emitting notification and receiving notification from concerned parties. All the managing parts should be here. */export class TransportService {    private incomingResponse: Subject<any> = new Subject()    private eventNotification: Subject<any> = new Subject()    // this is assuming that the application will do request response, otherwise, just one will do.    private transmittingService!: ITransport    private receivingService!: ITransport    private serverPort!: number    private serverUrl!: string    constructor(port?: number, url?: string) {        // just putting it here for testing for now.        if (port) this.serverPort = port        if (url) this.serverUrl = url        // logic here        this.establishTransportTransmission({ transportName: process.env.Transport?.toString() })        this.establishTransportReceiving({ transportName: process.env.Transport?.toString() }).then(() => {            // pipe the notification from the server into local notification            this.receivingService.getEventNotification().subscribe(this.eventNotification)        })    }    public getIncomingResponse(): Observable<any> {        return this.incomingResponse.asObservable()    }    public getEventNotification(): Observable<any> {        return this.eventNotification.asObservable()    }    public send(message: any): Observable<any> {        return new Observable((response) => {            this.transmittingService.send(message)            /// demultiplex here, assuming all responses are piped into this.incomingResponse            this.incomingResponse.subscribe(responseMsg => {                if (responseMsg.id == message.id) {                    response.next(responseMsg)                } // don't forget to add complete message to close out this function            })        })    }    public emit(message: any): void {        this.transmittingService.emit('notification', message)    }    private async establishTransportTransmission(tranportType: TranportType): Promise<any> {        console.log(`Setting up ${tranportType.transportName} transmitter`)        return new Promise((resolve, reject) => {            let setting: TransportSettings = {                role: 'Server',                profileInfo: {                    id: uuidv4(),                    name: 'Server',                    port: this.serverPort ?? 3000                }            }            if (tranportType.transportName == 'WEBSOCKET') {                this.transmittingService = new WebsocketTransportService(setting)                resolve(this.transmittingService)            }        })    }    private async establishTransportReceiving(tranportType: TranportType): Promise<any> {        console.log(`Setting up ${tranportType.transportName} Receiver`)        return new Promise((resolve, reject) => {            let setting: TransportSettings = {                role: 'Client',                profileInfo: {                    id: uuidv4(),                    name: 'Client',                    url: this.serverUrl ?? 'http://localhost:3000'                }            }            if (tranportType.transportName == 'WEBSOCKET') {                this.receivingService = new WebsocketTransportService(setting)                resolve(this.receivingService)            }        })    }}export interface TranportType {    transportName: 'WEBSOCKET' | 'GRPC' | 'HTTP' | 'KAFKA' | string | undefined}
 |