import { Observable, Observer, Subject } from "rxjs"; import dotenv from 'dotenv'; import { v4 as uuidv4 } from 'uuid'; import { WebsocketTransportService } from "../transport/websocket"; import { ITransportReceiving, ITransportSuper, ITransportTransmitting, ReceiverProfile, TransportEventNotification, TransportManagerEventNotification, TransportManagerInterface, TransportMessage, TransportSettings, TransportType } from "../interface/ITransport.interface"; dotenv.config(); /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers So how?: */ export class ConnectionManager { private incomingMessage: Subject = new Subject() // only for client roles usage. Servers will listen to event notification for incoming requests private outGoingMessage: Subject = new Subject() private connectedClients: ReceiverProfile[] = [] private transportEventNotification: Subject = new Subject() // this is assuming that the application will do request response, otherwise, just one will do. private transportService!: ITransportSuper private transmittingService!: ITransportTransmitting private receivingService!: ITransportReceiving private serverPort!: number private serverUrl!: string constructor(port?: number, url?: string) { if (port) { this.serverPort = port this.establishTransportTransmitting(process.env.Transport as TransportType).then((transmissionService: ITransportTransmitting) => { transmissionService.getTransportEventNotification().subscribe((notification: TransportEventNotification) => { // Collect client information when they are connected if (notification.event == `Connection` && notification.description == 'New Client Connected') { this.connectedClients.push(notification.data?.payload) this.handleNewReceiver(notification.data?.payload) } }) }).catch((error) => console.error(error)) } /* For web browser ui, because this server can also act as a receiver, not just transmitter as well. */ if (url) { this.serverUrl = url this.establishTransportReceiving(process.env.Transport as TransportType).then((receivingService: ITransportReceiving) => { // logic here }) } if (!port && !url) console.error(`No role has assigned for transport service due to lack of information provided!`) this.processOutputMessage() } // use for as a transmitter public emit(message: TransportMessage): void { if(this.transmittingService) { this.transmittingService.transmit(message) } } // use for as a receiver public send(message: TransportMessage): Observable { return new Observable((response: Observer) => { // logic here }) } public getTransportManagerEventNotification(): Observable { return new Observable((notification: Observer) => { this.transportEventNotification.subscribe((transportNotification: TransportEventNotification) => { if (transportNotification.event == 'Connection') { notification.next({ event: 'New Client Connection', data: transportNotification.data }) } }) }) } private async establishTransportTransmitting(tranportType: TransportType): Promise { console.log(`Setting up ${tranportType} transmitter`) return new Promise((resolve, reject) => { let setting: TransportSettings = { profileInfo: { id: uuidv4(), name: 'For Server', port: this.serverPort, transport: tranportType } } if (tranportType == 'WEBSOCKET') { this.transportService = this.transportService ? this.transportService : new WebsocketTransportService(setting) this.transmittingService = this.transportService resolve(this.transmittingService) } else { reject(`No such Transport Type Exist...`) } }) } private async establishTransportReceiving(tranportType: TransportType): Promise { return new Promise((resolve, reject) => { let setting: TransportSettings = { profileInfo: { id: uuidv4(), name: 'For Client', url: this.serverUrl, transport: tranportType } } // will be sharing one instance, This part may break the code if (tranportType == 'WEBSOCKET') { this.receivingService = this.transportService ? this.transportService : new WebsocketTransportService(setting) resolve(this.receivingService) } }) } // This function will wrap the message to be delivered over to established transport to be sent over private processOutputMessage(): void { this.outGoingMessage.subscribe({ next: (message: FisMessage) => { if (message.header.messageName == 'NotificationMessage') { // This is just here temporarily. Because The application will be concerned with whom the party is subcribed to, not this transport manager this.transmittingService.transmit({ id: message.header.messageID, receiverID: '', payload: message, event: `notification` }) } if (message.header.messageName == 'ResponseMessage') { // Map responses according to the requestID??? this.transmittingService.transmit({ id: message.header.messageID, receiverID: '', payload: message, event: `resposne` }) } } }) } // Responsibility include subscribing to event notification as well as tranpport instance for message transmission private handleNewReceiver(receiver: ReceiverProfile) { receiver.eventNotification.subscribe({ next: event => { // new request will be handled. And then manager will here will pick it up from eventNotification and respond accordingly if there's a need for it if (event.event == `Disconnection`) { console.error(`Client ${event.data?.receiverID ?? 'undefined'} disconnected.`) receiver.eventNotification.complete() // no need to listen since it's disconnected. A new one will be established when it reconnects again } if (event.event == `New Message`) { console.log(`New Client request ${event.data?.payload.header.messageID} at receiverID: ${receiver.uuid}`) } }, error: error => { console.error(error) receiver.eventNotification.error(error) } }) } } export interface FisMessage { header: { messageName: 'RequestMessage' | 'ResponseMessage' | 'NotificationMessage', messageID: string }, data: any }