|
@@ -1,69 +1,133 @@
|
|
import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
|
|
import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
|
|
import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
|
|
import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
|
|
import { ConnectionManager } from "../connector/connector.manager";
|
|
import { ConnectionManager } from "../connector/connector.manager";
|
|
-import { MessageTransmission, MessageTransmissionManager as MessageTransmissionManagerInterface, ReceiverProfile, TransmissionProfile, TransmitterProfile } from "../interface/transport.interface";
|
|
|
|
|
|
+import { EventMessage, MessageTransmission, MessageTransmissionManager as MessageTransmissionManagerInterface, ReceiverProfile, TransmissionProfile, TransmitterProfile } from "../interface/transport.interface";
|
|
import { v4 as uuidv4 } from 'uuid'
|
|
import { v4 as uuidv4 } from 'uuid'
|
|
-import { AdaptorTransmissionRole, RequestResponseConnectionAdapter, TransportEvent } from "../interface/connector.interface";
|
|
|
|
|
|
+import { AdapterSet, AdaptorTransmissionRole, RequestResponseConnectionAdapter, Transport, TransportEvent, TransportService } from "../interface/connector.interface";
|
|
import { MessageTransmissionRequestResponse } from "./msg.transmission.request-response";
|
|
import { MessageTransmissionRequestResponse } from "./msg.transmission.request-response";
|
|
-/*OKay, the application don't give a shit whose request it comes from. That will be the responsibility of this manager in particular.
|
|
|
|
-So, from the transmtiter perspective, it would be as if it's only coming from one source, but the demultiplexing will occur here
|
|
|
|
-so to speak. The plan, for one type of transport, this manager will make sure all of the connector instances travel through here.
|
|
|
|
-But of course, the transmitter or fis app actor can always instaniate more transmitter and receiver if needed, and there will be
|
|
|
|
-options to cater for wanting to instante different types of transport services according to their needs. */
|
|
|
|
-export class MessageTransmissionManager implements MessageTransmissionManagerInterface {
|
|
|
|
- // transmission: MessageTransmission[] = []
|
|
|
|
|
|
+import { filter, Observable, Observer, Subject } from "rxjs";
|
|
|
|
+import { WebsocketTransportService } from "../transport/websocket";
|
|
|
|
+import { HttpTransportService } from "../transport/http";
|
|
|
|
+import { error } from "console";
|
|
|
|
+
|
|
|
|
+export class MessageTransmissionManager {
|
|
|
|
+ private transportServiceArray: TransportService[] = []
|
|
|
|
+ private transportSet: Set<TransportSet> = new Set()
|
|
|
|
+ transmission: MessageTransmission[] = []
|
|
connectionManager!: ConnectionManager
|
|
connectionManager!: ConnectionManager
|
|
- messageTransmissionTransmitters: MessageTransmissionTransmitter[] = []
|
|
|
|
- messageTransmissionReceiver: MessageTransmissionReceiver[] = []
|
|
|
|
- messageTransmissionRequestResponse: MessageTransmissionRequestResponse[] = []
|
|
|
|
-
|
|
|
|
- constructor() {
|
|
|
|
- // logic here
|
|
|
|
- console.log(`TransmissionManager: Contructing Transmission Manager...`)
|
|
|
|
- this.instantiateConnectionManager()
|
|
|
|
|
|
+ event!: Subject<TransportEvent>
|
|
|
|
+
|
|
|
|
+ constructor(event: Subject<TransportEvent>) {
|
|
|
|
+ // logic here
|
|
|
|
+ console.log(`TransmissionManager: Contructing Transmission Manager...`)
|
|
|
|
+ this.event = event
|
|
|
|
+ this.connectionManager = new ConnectionManager(this.event)
|
|
|
|
+
|
|
|
|
+ this.sort(this.transportSet)
|
|
|
|
+ this.transportSet.forEach(set => {
|
|
|
|
+ this.setUpTransportService(set, event)
|
|
|
|
+ })
|
|
}
|
|
}
|
|
|
|
|
|
/* so there will be some changes here. will nto be assigning just one, but all of them dynamically to pour into this boy
|
|
/* so there will be some changes here. will nto be assigning just one, but all of them dynamically to pour into this boy
|
|
Transmitter only have to call this once. */
|
|
Transmitter only have to call this once. */
|
|
- getTransmissionInstance(): MessageTransmission {
|
|
|
|
- let transmitter: MessageTransmissionTransmitter = this.getTransmitter()
|
|
|
|
- let receiver: MessageTransmissionReceiver = this.getReceiver()
|
|
|
|
- let requestResponse: MessageTransmissionRequestResponse = this.getRequestResponse(transmitter, receiver)
|
|
|
|
- let transmission: MessageTransmission = {
|
|
|
|
- id: uuidv4(),
|
|
|
|
- transmitter: transmitter,
|
|
|
|
- receiver: receiver,
|
|
|
|
- requestResponse: requestResponse,
|
|
|
|
- event: this.connectionManager.transportEvent.asObservable()
|
|
|
|
- }
|
|
|
|
- // this.transmission.push(transmission)
|
|
|
|
- return transmission
|
|
|
|
|
|
+ subscribe(): Observable<MessageTransmission> {
|
|
|
|
+ return new Observable((observer: Observer<MessageTransmission>) => {
|
|
|
|
+ this.event.pipe(
|
|
|
|
+ filter(event => event.event == 'New Client')
|
|
|
|
+ ).subscribe(event => {
|
|
|
|
+ // get all adapters for all the connection
|
|
|
|
+ let adapterSet: AdapterSet[] = []
|
|
|
|
+ let clientId: string = (event.data as EventMessage).clientId
|
|
|
|
+ if (this.transportServiceArray.length > 0) {
|
|
|
|
+ this.transportServiceArray.forEach(transport => {
|
|
|
|
+ adapterSet.push(this.connectionManager.getAdapter(clientId, transport))
|
|
|
|
+ })
|
|
|
|
+ } else {
|
|
|
|
+ observer.error('No Transport is instantiated.... ERROR!!!!')
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 1 set only
|
|
|
|
+ let transmitter: MessageTransmissionTransmitter = this.getTransmitter(clientId, adapterSet)
|
|
|
|
+ let receiver: MessageTransmissionReceiver = this.getReceiver(clientId, adapterSet)
|
|
|
|
+ let requestResponse: MessageTransmissionRequestResponse = this.getRequestResponse(transmitter, receiver)
|
|
|
|
+ let transmission: MessageTransmission = {
|
|
|
|
+ id: clientId,
|
|
|
|
+ transmitter: transmitter,
|
|
|
|
+ receiver: receiver,
|
|
|
|
+ requestResponse: requestResponse,
|
|
|
|
+ event: this.event.asObservable()
|
|
|
|
+ }
|
|
|
|
+ this.transmission.push(transmission)
|
|
|
|
+
|
|
|
|
+ observer.next(transmission)
|
|
|
|
+ })
|
|
|
|
+ })
|
|
}
|
|
}
|
|
|
|
|
|
- private getTransmitter(): MessageTransmissionTransmitter {
|
|
|
|
|
|
+
|
|
|
|
+ private getTransmitter(transmissionId: string, adapterSets: AdapterSet[]): MessageTransmissionTransmitter {
|
|
let transmitterProfile: TransmitterProfile = {
|
|
let transmitterProfile: TransmitterProfile = {
|
|
- id: uuidv4(),
|
|
|
|
|
|
+ id: transmissionId,
|
|
name: '', // for now make it empty. We will use the assigned uuid here
|
|
name: '', // for now make it empty. We will use the assigned uuid here
|
|
dateCreated: new Date()
|
|
dateCreated: new Date()
|
|
}
|
|
}
|
|
- return new MessageTransmissionTransmitter(transmitterProfile, AdaptorTransmissionRole.Transmitter, this.connectionManager.transportEvent)
|
|
|
|
|
|
+ return new MessageTransmissionTransmitter(transmitterProfile, adapterSets)
|
|
}
|
|
}
|
|
|
|
|
|
- private getReceiver(): MessageTransmissionReceiver {
|
|
|
|
|
|
+ private getReceiver(transmissionId: string, adapterSets: AdapterSet[]): MessageTransmissionReceiver {
|
|
let receiverProfile: ReceiverProfile = {
|
|
let receiverProfile: ReceiverProfile = {
|
|
- id: uuidv4(),
|
|
|
|
|
|
+ id: transmissionId,
|
|
name: '', // for now make it empty. We will use the assigned uuid here
|
|
name: '', // for now make it empty. We will use the assigned uuid here
|
|
dateCreated: new Date()
|
|
dateCreated: new Date()
|
|
}
|
|
}
|
|
- return new MessageTransmissionReceiver(receiverProfile, AdaptorTransmissionRole.Receiver, this.connectionManager.transportEvent)
|
|
|
|
|
|
+ return new MessageTransmissionReceiver(receiverProfile, adapterSets)
|
|
}
|
|
}
|
|
|
|
|
|
private getRequestResponse(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver): MessageTransmissionRequestResponse {
|
|
private getRequestResponse(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver): MessageTransmissionRequestResponse {
|
|
- return new MessageTransmissionRequestResponse(transmitterInstance, receiverInstance, AdaptorTransmissionRole.RequestResponse, this.connectionManager.transportEvent)
|
|
|
|
|
|
+ return new MessageTransmissionRequestResponse(transmitterInstance, receiverInstance)
|
|
}
|
|
}
|
|
|
|
|
|
- private instantiateConnectionManager(): void {
|
|
|
|
- this.connectionManager = new ConnectionManager()
|
|
|
|
|
|
+
|
|
|
|
+ // Server to be set up as well as acquiring client information if needed. Like in the case for grpc and socket. Http not requ`ired.
|
|
|
|
+ private setUpTransportService(transportSet: TransportSet, event: Subject<TransportEvent>): void {
|
|
|
|
+ this.instantiateTransportService(transportSet.transport, event).then((transportService: TransportService) => {
|
|
|
|
+ this.transportServiceArray.push(transportService)
|
|
|
|
+ if (transportService instanceof WebsocketTransportService) {
|
|
|
|
+ console.log(`Just Double Checking... this is websocket`)
|
|
|
|
+ transportService.startServer(transportSet.port);
|
|
|
|
+ } else if (transportService instanceof HttpTransportService) {
|
|
|
|
+ console.log(`Just Double Checking... this is http`)
|
|
|
|
+ transportService.startServer(transportSet.port);
|
|
|
|
+ // Additional Http-specific setup if needed.
|
|
|
|
+ }
|
|
|
|
+ }).catch((error) => { throw new Error(error) })
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private async instantiateTransportService(transportType: Transport, event: Subject<TransportEvent>): Promise<TransportService> {
|
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
|
+ if (transportType == Transport.Websocket) {
|
|
|
|
+ resolve(new WebsocketTransportService(event))
|
|
|
|
+ }
|
|
|
|
+ else if (transportType == Transport.Http) {
|
|
|
|
+ resolve(new HttpTransportService(event))
|
|
|
|
+ } else {
|
|
|
|
+ reject(`No Transport Service is not properly instantiated`)
|
|
|
|
+ }
|
|
|
|
+ })
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private sort(transportSet: Set<TransportSet>): void {
|
|
|
|
+ let transportList: string[] = process.env.Transport?.split(',') || []
|
|
|
|
+ let portList: number[] = (process.env.PORT?.split(',') || []).map(port => Number(port));
|
|
|
|
+ transportList.forEach((transport, index) => {
|
|
|
|
+ transportSet.add({ transport: transport, port: portList[index] } as unknown as TransportSet)
|
|
|
|
+ })
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+interface TransportSet {
|
|
|
|
+ transport: Transport,
|
|
|
|
+ port: number
|
|
}
|
|
}
|