123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- import { Observable, Subject } from "rxjs";
- import dotenv from 'dotenv';
- import { v4 as uuidv4 } from 'uuid';
- import { WebsocketTransportService } from "./websocket";
- import { ITransport, TransportSettings } from "../interface/ITransport.interface";
- dotenv.config();
- /* The application doesn't care at this point whether or not what transport to use or what role. it just wants to send request and expecting
- response as well as emitting notification and receiving notification from concerned parties. All the managing parts should be here. */
- export class TransportService {
- private incomingResponse: Subject<any> = new Subject()
- private eventNotification: Subject<any> = new Subject()
- // this is assuming that the application will do request response, otherwise, just one will do.
- private transmittingService!: ITransport
- private receivingService!: ITransport
- private serverPort!: number
- private serverUrl!: string
- constructor(port?: number, url?: string) {
- // just putting it here for testing for now.
- if (port) this.serverPort = port
- if (url) this.serverUrl = url
- // logic here
- this.establishTransportTransmission({ transportName: process.env.Transport?.toString() })
- this.establishTransportReceiving({ transportName: process.env.Transport?.toString() }).then(() => {
- // pipe the notification from the server into local notification
- this.receivingService.getEventNotification().subscribe(this.eventNotification)
- })
- }
- public getIncomingResponse(): Observable<any> {
- return this.incomingResponse.asObservable()
- }
- public getEventNotification(): Observable<any> {
- return this.eventNotification.asObservable()
- }
- public send(message: any): Observable<any> {
- return new Observable((response) => {
- this.transmittingService.send(message)
- /// demultiplex here, assuming all responses are piped into this.incomingResponse
- this.incomingResponse.subscribe(responseMsg => {
- if (responseMsg.id == message.id) {
- response.next(responseMsg)
- } // don't forget to add complete message to close out this function
- })
- })
- }
- public emit(message: any): void {
- this.transmittingService.emit('notification', message)
- }
- private async establishTransportTransmission(tranportType: TranportType): Promise<any> {
- console.log(`Setting up ${tranportType.transportName} transmitter`)
- return new Promise((resolve, reject) => {
- let setting: TransportSettings = {
- role: 'Server',
- profileInfo: {
- id: uuidv4(),
- name: 'Server',
- port: this.serverPort ?? 3000
- }
- }
- if (tranportType.transportName == 'WEBSOCKET') {
- this.transmittingService = new WebsocketTransportService(setting)
- resolve(this.transmittingService)
- }
- })
- }
- private async establishTransportReceiving(tranportType: TranportType): Promise<any> {
- console.log(`Setting up ${tranportType.transportName} Receiver`)
- return new Promise((resolve, reject) => {
- let setting: TransportSettings = {
- role: 'Client',
- profileInfo: {
- id: uuidv4(),
- name: 'Client',
- url: this.serverUrl ?? 'http://localhost:3000'
- }
- }
- if (tranportType.transportName == 'WEBSOCKET') {
- this.receivingService = new WebsocketTransportService(setting)
- resolve(this.receivingService)
- }
- })
- }
- }
- export interface TranportType {
- transportName: 'WEBSOCKET' | 'GRPC' | 'HTTP' | 'KAFKA' | string | undefined
- }
|