|
@@ -1,178 +0,0 @@
|
|
|
-import { Observable, Observer, Subject, Subscriber, Unsubscribable } from "rxjs";
|
|
|
-import dotenv from 'dotenv';
|
|
|
-import { v4 as uuidv4 } from 'uuid';
|
|
|
-import { ITransportReceiving, ITransportSuper, ITransportTransmitting, ReceiverProfile, TransportEventNotification, TransportManagerEventNotification, TransportManagerInterface, TransportMessage, TransportSettings, TransportType } from "../interface/ITransport.interface";
|
|
|
-import { AdaptorTransmissionRole, ConnectionAdaptorBase, ConnectionState } from "../interface/connector.interface";
|
|
|
-import { FisAppActor, FisMessage } from "../interface/transport.interface";
|
|
|
-
|
|
|
-dotenv.config();
|
|
|
-/* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
|
|
|
-So how?: */
|
|
|
-export class ConnectionAdapter implements FisAppActor, ConnectionAdaptorBase<any> {
|
|
|
- incomingMessageBus: Subject<any> = new Subject()
|
|
|
- outgoingMessageBus: Subject<any> = new Subject()
|
|
|
- connector: any;
|
|
|
- connectorProfile: any;
|
|
|
- connectionState!: ConnectionState;
|
|
|
- connectionStateBus!: Observable<ConnectionState>;
|
|
|
- adaptorTransmissionRole!: AdaptorTransmissionRole;
|
|
|
-
|
|
|
- // For Manager, TBD
|
|
|
- private connectedClients: ReceiverProfile[] = []
|
|
|
- private transportEventNotification: Subject<TransportEventNotification> = new Subject()
|
|
|
- // this is assuming that the application will do request response, otherwise, just one will do.
|
|
|
- private serverPort!: number
|
|
|
- private serverUrl!: string
|
|
|
-
|
|
|
- constructor(port?: number, url?: string) {
|
|
|
- if (port) {
|
|
|
- // instantiate the transport service. Eg: socket, http. Try to use the same service. Then return TransmitterConnectionAdapter
|
|
|
-
|
|
|
- }
|
|
|
- /* For web browser ui, because this server can also act as a receiver, not just transmitter as well. */
|
|
|
- if (url) {
|
|
|
- // if no transport service is instantiated, then instantiate a new one, otherwise use existying one.
|
|
|
- }
|
|
|
-
|
|
|
- if (!port && !url) console.error(`No role has assigned for transport service due to lack of information provided!`)
|
|
|
- this.processOutputMessage()
|
|
|
- }
|
|
|
-
|
|
|
- subscribeConnectionState(): Observable<ConnectionState> {
|
|
|
- throw new Error("Method not implemented.");
|
|
|
- }
|
|
|
- publishConnectionState(): void {
|
|
|
- throw new Error("Method not implemented.");
|
|
|
- }
|
|
|
- connect(): void {
|
|
|
- throw new Error("Method not implemented.");
|
|
|
- }
|
|
|
- disconnect(): void {
|
|
|
- throw new Error("Method not implemented.");
|
|
|
- }
|
|
|
- send(message: any): Observable<any> {
|
|
|
- return new Observable((response) => {
|
|
|
- // logic here
|
|
|
- })
|
|
|
-
|
|
|
- }
|
|
|
- emit(message: any): void {
|
|
|
- // logic here
|
|
|
- }
|
|
|
- emitStream(message: any): void {
|
|
|
- throw new Error("Method not implemented.");
|
|
|
- }
|
|
|
- subscribeMessages(messageFilter: any): Observable<any> {
|
|
|
- throw new Error("Method not implemented.");
|
|
|
- }
|
|
|
- subscribe(subscriber: Subscriber<any>): Unsubscribable {
|
|
|
- // Emit some value based on T (here for demonstration)
|
|
|
- subscriber.next({} as any);
|
|
|
- subscriber.complete();
|
|
|
-
|
|
|
- // Return an Unsubscribable object to allow unsubscription
|
|
|
- return {
|
|
|
- unsubscribe: () => {
|
|
|
- console.log('Unsubscribed');
|
|
|
- }
|
|
|
- };
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- public getTransportManagerEventNotification(): Observable<TransportManagerEventNotification> {
|
|
|
- return new Observable((notification: Observer<TransportManagerEventNotification>) => {
|
|
|
- this.transportEventNotification.subscribe((transportNotification: TransportEventNotification) => {
|
|
|
- if (transportNotification.event == 'Connection') {
|
|
|
- notification.next({
|
|
|
- event: 'New Client Connection',
|
|
|
- data: transportNotification.data
|
|
|
- })
|
|
|
- }
|
|
|
- })
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- private async establishTransportTransmitting(tranportType: TransportType): Promise<ITransportTransmitting> {
|
|
|
- console.log(`Setting up ${tranportType} transmitter`)
|
|
|
- return new Promise((resolve, reject) => {
|
|
|
- let setting: TransportSettings = {
|
|
|
- profileInfo: {
|
|
|
- id: uuidv4(),
|
|
|
- name: 'For Server',
|
|
|
- port: this.serverPort,
|
|
|
- transport: tranportType
|
|
|
- }
|
|
|
- }
|
|
|
- if (tranportType == 'WEBSOCKET') {
|
|
|
- // logic here
|
|
|
- } else {
|
|
|
- reject(`No such Transport Type Exist...`)
|
|
|
- }
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- private async establishTransportReceiving(tranportType: TransportType): Promise<ITransportReceiving> {
|
|
|
- return new Promise((resolve, reject) => {
|
|
|
- let setting: TransportSettings = {
|
|
|
- profileInfo: {
|
|
|
- id: uuidv4(),
|
|
|
- name: 'For Client',
|
|
|
- url: this.serverUrl,
|
|
|
- transport: tranportType
|
|
|
- }
|
|
|
- }
|
|
|
- // will be sharing one instance, This part may break the code
|
|
|
- if (tranportType == 'WEBSOCKET') {
|
|
|
- // logic here
|
|
|
- }
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- // This function will wrap the message to be delivered over to established transport to be sent over
|
|
|
- private processOutputMessage(): void {
|
|
|
- this.outgoingMessageBus.subscribe({
|
|
|
- next: (message: FisMessage) => {
|
|
|
- if (message.header.messageName == 'NotificationMessage') {
|
|
|
- // This is just here temporarily. Because The application will be concerned with whom the party is subcribed to, not this transport manager
|
|
|
- // this.transmittingService.transmit({
|
|
|
- // id: message.header.messageID,
|
|
|
- // receiverID: '',
|
|
|
- // payload: message,
|
|
|
- // event: `notification`
|
|
|
- // })
|
|
|
- }
|
|
|
- if (message.header.messageName == 'ResponseMessage') {
|
|
|
- // Map responses according to the requestID???
|
|
|
- // this.transmittingService.transmit({
|
|
|
- // id: message.header.messageID,
|
|
|
- // receiverID: '',
|
|
|
- // payload: message,
|
|
|
- // event: `resposne`
|
|
|
- // })
|
|
|
- }
|
|
|
- }
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
- // Responsibility include subscribing to event notification as well as tranpport instance for message transmission
|
|
|
- private handleNewReceiver(receiver: ReceiverProfile) {
|
|
|
- receiver.eventNotification.subscribe({
|
|
|
- next: event => {
|
|
|
- // new request will be handled. And then manager will here will pick it up from eventNotification and respond accordingly if there's a need for it
|
|
|
- if (event.event == `Disconnection`) {
|
|
|
- console.error(`Client ${event.data?.receiverID ?? 'undefined'} disconnected.`)
|
|
|
- receiver.eventNotification.complete() // no need to listen since it's disconnected. A new one will be established when it reconnects again
|
|
|
- }
|
|
|
- if (event.event == `New Message`) {
|
|
|
- console.log(`New Client request ${event.data?.payload.header.messageID} at receiverID: ${receiver.uuid}`)
|
|
|
- }
|
|
|
- },
|
|
|
- error: error => {
|
|
|
- console.error(error)
|
|
|
- receiver.eventNotification.error(error)
|
|
|
- }
|
|
|
- })
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-
|