123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- import { Observable, Observer, Subject } from "rxjs";
- import dotenv from 'dotenv';
- import { v4 as uuidv4 } from 'uuid';
- import { WebsocketTransportService } from "../transport/websocket";
- import { ITransportReceiving, ITransportSuper, ITransportTransmitting, ReceiverProfile, TransportEventNotification, TransportManagerEventNotification, TransportManagerInterface, TransportMessage, TransportSettings, TransportType } from "../interface/ITransport.interface";
- dotenv.config();
- /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
- So how?: */
- export class ConnectionManager {
- private incomingMessage: Subject<TransportMessage> = new Subject() // only for client roles usage. Servers will listen to event notification for incoming requests
- private outGoingMessage: Subject<FisMessage> = new Subject()
- private connectedClients: ReceiverProfile[] = []
- private transportEventNotification: Subject<TransportEventNotification> = new Subject()
- // this is assuming that the application will do request response, otherwise, just one will do.
- private transportService!: ITransportSuper
- private transmittingService!: ITransportTransmitting
- private receivingService!: ITransportReceiving
- private serverPort!: number
- private serverUrl!: string
- constructor(port?: number, url?: string) {
- if (port) {
- this.serverPort = port
- this.establishTransportTransmitting(process.env.Transport as TransportType).then((transmissionService: ITransportTransmitting) => {
- transmissionService.getTransportEventNotification().subscribe((notification: TransportEventNotification) => {
- // Collect client information when they are connected
- if (notification.event == `Connection` && notification.description == 'New Client Connected') {
- this.connectedClients.push(notification.data?.payload)
- this.handleNewReceiver(notification.data?.payload)
- }
- })
- }).catch((error) => console.error(error))
- }
- /* For web browser ui, because this server can also act as a receiver, not just transmitter as well. */
- if (url) {
- this.serverUrl = url
- this.establishTransportReceiving(process.env.Transport as TransportType).then((receivingService: ITransportReceiving) => {
- // logic here
- })
- }
- if (!port && !url) console.error(`No role has assigned for transport service due to lack of information provided!`)
- this.processOutputMessage()
- }
- // use for as a transmitter
- public emit(message: TransportMessage): void {
- if(this.transmittingService) {
- this.transmittingService.transmit(message)
- }
- }
- // use for as a receiver
- public send(message: TransportMessage): Observable<TransportMessage> {
- return new Observable((response: Observer<TransportMessage>) => {
- // logic here
- })
- }
- public getTransportManagerEventNotification(): Observable<TransportManagerEventNotification> {
- return new Observable((notification: Observer<TransportManagerEventNotification>) => {
- this.transportEventNotification.subscribe((transportNotification: TransportEventNotification) => {
- if (transportNotification.event == 'Connection') {
- notification.next({
- event: 'New Client Connection',
- data: transportNotification.data
- })
- }
- })
- })
- }
- private async establishTransportTransmitting(tranportType: TransportType): Promise<ITransportTransmitting> {
- console.log(`Setting up ${tranportType} transmitter`)
- return new Promise((resolve, reject) => {
- let setting: TransportSettings = {
- profileInfo: {
- id: uuidv4(),
- name: 'For Server',
- port: this.serverPort,
- transport: tranportType
- }
- }
- if (tranportType == 'WEBSOCKET') {
- this.transportService = this.transportService ? this.transportService : new WebsocketTransportService(setting)
- this.transmittingService = this.transportService
- resolve(this.transmittingService)
- } else {
- reject(`No such Transport Type Exist...`)
- }
- })
- }
- private async establishTransportReceiving(tranportType: TransportType): Promise<ITransportReceiving> {
- return new Promise((resolve, reject) => {
- let setting: TransportSettings = {
- profileInfo: {
- id: uuidv4(),
- name: 'For Client',
- url: this.serverUrl,
- transport: tranportType
- }
- }
- // will be sharing one instance, This part may break the code
- if (tranportType == 'WEBSOCKET') {
- this.receivingService = this.transportService ? this.transportService : new WebsocketTransportService(setting)
- resolve(this.receivingService)
- }
- })
- }
- // This function will wrap the message to be delivered over to established transport to be sent over
- private processOutputMessage(): void {
- this.outGoingMessage.subscribe({
- next: (message: FisMessage) => {
- if (message.header.messageName == 'NotificationMessage') {
- // This is just here temporarily. Because The application will be concerned with whom the party is subcribed to, not this transport manager
- this.transmittingService.transmit({
- id: message.header.messageID,
- receiverID: '',
- payload: message,
- event: `notification`
- })
- }
- if (message.header.messageName == 'ResponseMessage') {
- // Map responses according to the requestID???
- this.transmittingService.transmit({
- id: message.header.messageID,
- receiverID: '',
- payload: message,
- event: `resposne`
- })
- }
- }
- })
- }
- // Responsibility include subscribing to event notification as well as tranpport instance for message transmission
- private handleNewReceiver(receiver: ReceiverProfile) {
- receiver.eventNotification.subscribe({
- next: event => {
- // new request will be handled. And then manager will here will pick it up from eventNotification and respond accordingly if there's a need for it
- if (event.event == `Disconnection`) {
- console.error(`Client ${event.data?.receiverID ?? 'undefined'} disconnected.`)
- receiver.eventNotification.complete() // no need to listen since it's disconnected. A new one will be established when it reconnects again
- }
- if (event.event == `New Message`) {
- console.log(`New Client request ${event.data?.payload.header.messageID} at receiverID: ${receiver.uuid}`)
- }
- },
- error: error => {
- console.error(error)
- receiver.eventNotification.error(error)
- }
- })
- }
- }
- export interface FisMessage {
- header: {
- messageName: 'RequestMessage' | 'ResponseMessage' | 'NotificationMessage',
- messageID: string
- },
- data: any
- }
|