Browse Source

adapter and transmission updates

Enzo 2 weeks ago
parent
commit
3fb0761748

+ 3 - 1
src/connector/connector.base.ts

@@ -15,7 +15,9 @@ export class ConnectionAdapter implements ConnectionAdaptorBase {
     constructor(port?: number, url?: string) {
         if (!port && !url) console.error(`No role has assigned for transport service due to lack of information provided!`)
     }
-
+    getInfo(): any {
+        throw new Error("Method not implemented.");
+    }
     subscribeConnectionState(): Observable<ConnectionState> {
         throw new Error("Method not implemented.");
     }

+ 39 - 11
src/connector/connector.manager.ts

@@ -1,5 +1,6 @@
 import { v4 as uuidv4 } from 'uuid'
-import { ConnectionEvent, ConnectionManager as ConnectionManagerInterface, ConnectionSet, Transport } from "../interface/connector.interface"
+import dotenv from 'dotenv';
+import { TransportEvent, ConnectionManager as ConnectionManagerInterface, ConnectionSet, Transport, TransportService } from "../interface/connector.interface"
 import { TransmitterConnectionAdapter } from './connector.transmitter'
 import { ReceiverConnectionAdapter } from './connector.receiver'
 import { TransmissionEvent } from '../interface/transport.interface'
@@ -7,19 +8,32 @@ import { Subject } from 'rxjs'
 import { WebsocketTransportService } from '../transport/websocket'
 import { RequestResponseConnectionAdapter } from './connector.request.response'
 import { HttpTransportService } from '../transport/http'
-
+/* now, this connection manager, after setting up the servers, will also need to start assigning receivers so to speak.
+From Provider's perspective: When instantiating a new server, there will be no doubt multiple clients connecting in. So that means, the connector instances that will be used from
+the transmission side, must already be configured and locked to their respective target so to speak. So for one client, there would only be one transmitter and receiver connector instances
+as well as request response. Let's go along with this line for now. */
 export class ConnectionManager implements ConnectionManagerInterface {
-    transportService: any;
+    transportService!: TransportService;
     transmissionEvent: Subject<TransmissionEvent> = new Subject()
-    connectionEvent: Subject<ConnectionEvent> = new Subject()
+    transportEvent: Subject<TransportEvent> = new Subject() // Every event goes through this boy, and it will be expose across differnet components
     transmissionSet: ConnectionSet[] = []
 
     constructor(messageTransmissionEvent: Subject<TransmissionEvent>) {
-        messageTransmissionEvent.subscribe(this.transmissionEvent)
+        this.transmissionEvent = messageTransmissionEvent 
         // logic here
-        this.getTransportService(process.env.Transport as unknown as Transport)
+        this.setUpServer(process.env.PORT as unknown as number, this.getTransportService(process.env.Transport as unknown as Transport))
+
+        // will also have information on events happening on transmission side
+        this.transmissionEvent.subscribe((event: TransmissionEvent) => {
+            console.log(`TransmissionEvent: `, event)
+        })
+        // Everything that happens will go through here. Can also plug in logging service here at a later time
+        this.transportEvent.subscribe((event: TransportEvent) => {
+            console.log(`TranpsortEvent: `, event)
+        })
     }
 
+    /* All three methods that return their respective instances will have it's activities reported through transportEvent */
     getTransmitterConnectionAdapter(): TransmitterConnectionAdapter {
         if (this.transportService) {
             let adapter: TransmitterConnectionAdapter = new TransmitterConnectionAdapter(this.transportService)
@@ -48,12 +62,12 @@ export class ConnectionManager implements ConnectionManagerInterface {
     }
 
     getTransportService(transportType: Transport): any {
-        console.log(`Getting this to work ${transportType}`)
+        console.log(`Instanting ${transportType} tranport service to be used.`)
         if (transportType == Transport.Websocket) {
             this.transportService = new WebsocketTransportService()
-            this.connectionEvent.next({
+            this.transportEvent.next({
                 id: uuidv4(),
-                event: 'Setup',
+                event: `New Transport`,
                 data: {
                     message: `Setting up Websocket Transport Service`
                 }
@@ -61,13 +75,27 @@ export class ConnectionManager implements ConnectionManagerInterface {
         }
         if (transportType == Transport.Http) {
             this.transportService = new HttpTransportService()
-            this.connectionEvent.next({
+            this.transportEvent.next({
                 id: uuidv4(),
-                event: 'Setup',
+                event: `New Transport`,
                 data: {
                     message: `Setting up Http Transport Service`
                 }
             })
         }
     }
+
+    // Server to be set up as well as acquiring client information if needed. Like in the case for grpc and socket. Http not required.
+    private setUpServer(port: number, transportService: TransportService): void {
+        if ((transportService as TransportService).getInfo().transport == Transport.Websocket) {
+            (transportService as WebsocketTransportService).startServer(port);
+            // this doesn't mean all the responses come through here, but rather to record and assign new instances based on the new clinets connected
+            (transportService as WebsocketTransportService).getTransportEvent().subscribe(this.transportEvent)
+        }
+
+        if ((transportService as TransportService).getInfo().transport == Transport.Http) {
+            (transportService as HttpTransportService).startServer(port)
+            //  To be Enhanced at a later time
+        }
+    }
 }

+ 40 - 6
src/connector/connector.receiver.ts

@@ -1,20 +1,54 @@
 import dotenv from 'dotenv';
-import { FisAppActor, FisMessage } from "../interface/transport.interface";
+import { Bus, FisAppActor, FisMessage } from "../interface/transport.interface";
 import { ConnectionAdapter } from "./connector.base";
-import { ConnectionState, ReceiverConnectionAdapter as ReceiverConnectionAdapterInterface } from "../interface/connector.interface";
-import { BehaviorSubject } from 'rxjs';
+import { AdaptorTransmissionRole, ConnectionState, ReceiverConnectionAdapter as ReceiverConnectionAdapterInterface, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
+import { BehaviorSubject, filter, map, Observable, Observer, Subject, Subscription, takeWhile } from 'rxjs';
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
-export class ReceiverConnectionAdapter extends ConnectionAdapter implements  ReceiverConnectionAdapterInterface {
-    connectionStateBus: BehaviorSubject<ConnectionState> = new BehaviorSubject('OFFLINE' as ConnectionState)
+export class ReceiverConnectionAdapter extends ConnectionAdapter implements ReceiverConnectionAdapterInterface {
+    event: Subject<TransportEvent> = new Subject()
 
-    constructor(url: string) {
+    constructor(adapter: TransportService) {
         super()
         // logic here
+        this.adaptorTransmissionRole = AdaptorTransmissionRole.Receiver
+        this.connector = adapter
     }
 
+    getMessageBus(bus: Bus): Observable<any> {
+        return new Observable((observable: Observer<any>) => {
+            if (bus == Bus.ResponseMessageBus) {
+                const subscription: Subscription = this.event.pipe(
+                    filter((message: TransportEvent) => message.event === 'New Message'),
+                    // take message only specific for this adapter. Although that itself wouldn't be necessary, considerng everything goes through transportEvent. I guess it's for better management
+                    filter((message:TransportEvent) => (message.data as TransportMessage).target == this.connectorProfile.id), 
+                    takeWhile((message: TransportEvent) => {
+                        const shouldTake = ((message.data as TransportMessage).payload as FisMessage).data !== 'Complete';
+                        if (!shouldTake) {
+                            observable.complete();  // Ensure the observer is completed
+                        }
+                        return shouldTake;
+                    }),
+                    map(message => (message.data as TransportMessage).payload as FisMessage)
+                ).subscribe((message: FisMessage) => {
+                    observable.next(message);
+                });
+    
+                // Clean up on unsubscription
+                return () => {
+                    subscription.unsubscribe();
+                };
+            }
+            if (bus == Bus.NotificationMessageBus) {
+                /// logic here
+            }
+            if (bus == Bus.ErrorMessageBus) {
+                /// logic here
+            }
+        });
+    }
 }
 
 

+ 8 - 6
src/connector/connector.request.response.ts

@@ -1,29 +1,31 @@
 import dotenv from 'dotenv';
-import { FisAppActor, FisMessage } from "../interface/transport.interface";
+import { FisAppActor, FisMessage, TransmissionMessage } from "../interface/transport.interface";
 import { ConnectionAdapter } from "./connector.base";
-import { RequestResponseConnectionAdapter as RequestResponseConnectionAdapterInterface, TransportMessage } from "../interface/connector.interface";
+import { RequestResponseConnectionAdapter as RequestResponseConnectionAdapterInterface, TransportEvent, TransportMessage } from "../interface/connector.interface";
 import { TransmitterConnectionAdapter } from './connector.transmitter';
 import { ReceiverConnectionAdapter } from './connector.receiver';
-import { Observable, Observer } from 'rxjs';
+import { Observable, Observer, Subject } from 'rxjs';
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
 export class RequestResponseConnectionAdapter extends ConnectionAdapter implements RequestResponseConnectionAdapterInterface {
+    event: Subject<TransportEvent> = new Subject()
+    incomingResponses: Subject<TransportMessage> = new Subject()
 
     constructor(transmitterAdapter: TransmitterConnectionAdapter, receiverAdapter: ReceiverConnectionAdapter) {
         super()
         // logic here
     }
     
-    emit(message: TransportMessage): void {
+    emit(message: TransmissionMessage): void {
         throw new Error('Method not implemented.');
     }
-    emitStream(message: TransportMessage): void {
+    emitStream(message: TransmissionMessage): void {
         throw new Error('Method not implemented.');
     }
 
-    send(message: TransportMessage): Observable<TransportMessage> {
+    send(message: TransmissionMessage): Observable<TransportMessage> {
         return new Observable((response: Observer<TransportMessage>) => {
             // logic here
         })

+ 6 - 5
src/connector/connector.transmitter.ts

@@ -5,20 +5,20 @@ import { AdaptorTransmissionRole, ConnectionState, TransmitterConnectionAdapter
 import { BehaviorSubject, Observable, Observer } from 'rxjs';
 import { v4 as uuidv4 } from 'uuid'
 import { WebsocketTransportService } from '../transport/websocket';
+import { HttpTransportService } from '../transport/http';
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
 export class TransmitterConnectionAdapter extends ConnectionAdapter implements TransmitterConnectionAdapterInterface {
-    connectionStateBus: BehaviorSubject<ConnectionState> = new BehaviorSubject('OFFLINE' as ConnectionState)
+    connectionStateBus: BehaviorSubject<ConnectionState> = new BehaviorSubject('OFFLINE' as ConnectionState) // this cannot work, because there will be alot of clients connected presumably
 
     constructor(transportService: TransportService) {
         super()
         // logic here
-        this.connector = transportService
     }
 
-    emitStream(message: TransportMessage): void {
+    emitStream(message: TransmissionMessage): void {
         throw new Error('Method not implemented.');
     }
 
@@ -27,10 +27,11 @@ export class TransmitterConnectionAdapter extends ConnectionAdapter implements T
             id: uuidv4(),
             dateCreated: new Date(),
             transport: Transport.Websocket,
-            target: message.receiver?.id,
-            payload: message
+            target: message.receiverId,
+            payload: message.payload
         } as TransportMessage)
     }
+    
 }
 
 

+ 19 - 15
src/interface/connector.interface.ts

@@ -1,5 +1,6 @@
 import { BehaviorSubject, Observable, Subject } from "rxjs"
-import { Bus, FisAppActor, ReceiverProfile, TransmissionEvent, TransmissionProfile, TransmitterProfile } from "./transport.interface"
+import { Bus, FisAppActor, ReceiverProfile, TransmissionEvent, TransmissionMessage, TransmissionProfile, TransmitterProfile } from "./transport.interface"
+import { ConnectionAdapter } from "../connector/connector.base"
 
 
 export type TYPE = {
@@ -11,15 +12,16 @@ export type TYPE = {
 
 export interface AdapterProfile {
     id: string,
-    type: Transport
+    adapter: ConnectionAdapter,
+    connectionState: BehaviorSubject<ConnectionState>
 }
 
 
 export interface ConnectionManager {
-    transportService: any
+    transportService: TransportService
     // to get notified on what's going on in Transmission Manager
     transmissionEvent: Subject<TransmissionEvent>
-    connectionEvent: Subject<ConnectionEvent>
+    transportEvent: Subject<TransportEvent>
     // list of connection
     transmissionSet: ConnectionSet[]
     // Called by transmission manager to have an instance of these adapters
@@ -39,20 +41,22 @@ export interface ConnectionAdaptorBase  {
     connect(): void
     disconnect(): void
     getMessageBus(bus: Bus): Observable<any>
+    getInfo(): any
 }
 
 
 export interface TransmitterConnectionAdapter extends ConnectionAdaptorBase {
-    emit(message: TransportMessage): void
-    emitStream(message: TransportMessage): void
+    emit(message: TransmissionMessage): void
+    emitStream(message: TransmissionMessage): void
 }
 
 export interface ReceiverConnectionAdapter extends ConnectionAdaptorBase {
+    event: Subject<TransportEvent>
     subscribeMessages(messageFilter: any): Observable<any>
 }
 
 export interface RequestResponseConnectionAdapter extends TransmitterConnectionAdapter, ReceiverConnectionAdapter {
-    send(message: TransportMessage): Observable<TransportMessage>
+    send(message: TransmissionMessage): Observable<TransportMessage>
 }
 
 export type ConnectionState = 'ONLINE' | 'OFFLINE'
@@ -73,19 +77,13 @@ export interface TransportMessage {
     id: string,
     dateCreated: Date,
     transport: Transport,
-    target: string,
+    target?: string,
     payload: any
 }
 
-export interface ConnectionEvent {
-    id: string,
-    event: 'Connection' | 'Setup',
-    data: any
-}
-
 export interface TransportEvent {
     id: string,
-    event: 'Server Started' | 'New Client' | 'Client Disconnected' | 'Client Reconnected' | `Server Disconnected` | 'New Message' | `Notification`,
+    event: 'Server Started' | 'New Client' | 'Client Disconnected' | 'Client Reconnected' | `Server Disconnected` | 'New Message' | `Notification` | `New Server` | `New Transport`,
     data: any
 }
 export interface ConnectionSet {
@@ -93,6 +91,12 @@ export interface ConnectionSet {
 }
 
 export interface TransportService {
+    getInfo(): any
     emit(message: TransportMessage): void
     subscribe(): Observable<TransportEvent> //all messages and whatever event will go through this, easier to implemnet across different transport protocol
+}
+
+
+export interface Info {
+    transport: Transport
 }

+ 18 - 16
src/interface/transport.interface.ts

@@ -1,10 +1,7 @@
 import { Observable, Subject } from "rxjs";
-import { AdaptorTransmissionRole, RequestResponseConnectionAdapter } from "./connector.interface";
+import { AdapterProfile, AdaptorTransmissionRole, RequestResponseConnectionAdapter } from "./connector.interface";
 import { MessageTransmissionTransmitter } from "../transmission/msg.transmission.transmitter";
 import { MessageTransmissionReceiver } from "../transmission/msg.transmission.receiver";
-import { MessageTransmissionRequestResponse } from "../transmission/msg.transmission.request-response";
-import { ReceiverConnectionAdapter } from "../connector/connector.receiver";
-import { TransmitterConnectionAdapter } from "../connector/connector.transmitter";
 
 export interface MessageTransmissionManager {
     // what the hell is this here for
@@ -32,9 +29,9 @@ export interface FisAppActor {
 export interface MessageTransmissionBase extends FisAppActor {
     msgRepositoryService: any
     transmissionRole: AdaptorTransmissionRole
-    adaptorsArray: any[]
-    transmissionService: any // not sure what this is for, i guess they are interchangeable with adapterService
-    adapterService: any // just use adapter Service, since it's grammatically closer to connectionadapter
+    adaptorsArray: Array<AdapterProfile> // this is the one we'll be using to catter for multiple clients
+    transmissionService: any
+    adapterService: any
 
     getMessageBus(bus: Bus): Observable<any>
     getInfo(): TransmissionProfile
@@ -43,20 +40,20 @@ export interface MessageTransmissionBase extends FisAppActor {
 export interface MessageReceiver extends MessageTransmissionBase {
     receiverProfile: ReceiverProfile
 
-    setReceiver(receiverProfile: ReceiverProfile, role: AdaptorTransmissionRole, adapter: ReceiverConnectionAdapter): void
+    setReceiver(receiverProfile: ReceiverProfile, role: AdaptorTransmissionRole, event: TransmissionEvent): void
 }
 
 export interface MessageTransmitter extends MessageTransmissionBase {
     transmitterProfile: TransmitterProfile
 
-    setTransmitter(transmitterProfile: TransmitterProfile, role: AdaptorTransmissionRole, adapter: TransmitterConnectionAdapter): void
+    setTransmitter(transmitterProfile: TransmitterProfile, role: AdaptorTransmissionRole, event: TransmissionEvent): void
 }
 
 export interface MessageRequestResponse extends MessageTransmissionBase {
     transmitterInstance: MessageTransmissionTransmitter
     receiverInstance: MessageTransmissionReceiver
 
-    setTransmissionProfile(transmissionInfo: MessageTransmissionTransmitter, receiverInfo: MessageTransmissionReceiver, role: AdaptorTransmissionRole, adapter: RequestResponseConnectionAdapter): void
+    setTransmissionProfile(transmissionInfo: MessageTransmissionTransmitter, receiverInfo: MessageTransmissionReceiver, role: AdaptorTransmissionRole, event: TransmissionEvent): void
 }
 
 export interface FisMessage {
@@ -83,20 +80,25 @@ export interface RequestResponseProfile extends TransmissionProfile {
 
 }
 export interface TransmissionMessage {
-    transmitter?: TransmitterProfile,
-    receiver?: ReceiverProfile,
-    payload?: FisMessage
+    receiverId: string,
+    payload: FisMessage
 }
 
 export enum Bus {
-    IncomingMessageBus,
-    OutgoingMessageBus,
+    GeneralBus,
+    ResponseMessageBus,
     ErrorMessageBus,
     NotificationMessageBus
 }
 
 export interface TransmissionEvent {
     id: string,
-    event: `NewTransmissionSet` | 'Connection',
+    event: 'New Receiver' | 'New Server' | 'Incoming Message' | `Outgoing Message`,
     data: any
+}
+
+export interface EventMessage {
+    channelId: string,
+    message: string,
+    payload: any
 }

+ 7 - 6
src/transmission/msg.transmission.base.ts

@@ -1,13 +1,14 @@
 
 import { filter, Observable, Observer, Subject, Subscription, takeWhile, Unsubscribable } from 'rxjs';
-import { AdaptorTransmissionRole } from '../interface/connector.interface';
-import { Bus, FisMessage, MessageTransmissionBase as MessageTransmissionBaseInterface } from '../interface/transport.interface'
+import { AdapterProfile, AdaptorTransmissionRole } from '../interface/connector.interface';
+import { Bus, FisMessage, MessageTransmissionBase as MessageTransmissionBaseInterface, TransmissionProfile } from '../interface/transport.interface'
 import { v4 as uuidv4 } from 'uuid'
 
 export class MessageTransmissionBase implements MessageTransmissionBaseInterface {
+    transmissionProfile!: TransmissionProfile
     msgRepositoryService: any;
     transmissionRole!: AdaptorTransmissionRole;
-    adaptorsArray!: any[];
+    adaptorsArray: Array<AdapterProfile> = []
     transmissionService: any;
     adapterService: any;
     incomingMessageBus!: Subject<any>;
@@ -36,13 +37,13 @@ export class MessageTransmissionBase implements MessageTransmissionBaseInterface
     emitStream(message: any): void {
         throw new Error('Method not implemented.');
     }
-    
+
     subscribeMessages(messageFilter: any): Observable<any> {
         throw new Error('Method not implemented.');
     }
 
-    getInfo(): any {
-
+    getInfo(): TransmissionProfile {
+        return this.transmissionProfile
     }
 
 }

+ 17 - 19
src/transmission/msg.transmission.manager.ts

@@ -8,8 +8,11 @@ import { MessageTransmissionRequestResponse } from "./msg.transmission.request-r
 import { TransmitterConnectionAdapter } from "../connector/connector.transmitter";
 import { ReceiverConnectionAdapter } from "../connector/connector.receiver";
 import { filter, Subject } from "rxjs";
-/* FOr now, to fill the holes/gaps so to speak, the FIS actor will interact wtih Message Transmission Manager, without having to wory about
-the underyling transport protocol as well as managing client control, when it's acting as a transmitter. */
+/*OKay, the application don't give a shit whose request it comes from. That will be the responsibility of this manager in particular.
+So, from the transmtiter perspective, it would be as if it's only coming from one source, but the demultiplexing will occur here 
+so to speak. The plan, for one type of transport, this manager will make sure all of the connector instances travel through here.
+But of course, the transmitter or fis app actor can always instaniate more transmitter and receiver if needed, and there will be 
+options to cater for wanting to instante different types of transport services according to their needs. */
 export class MessageTransmissionManager implements MessageTransmissionManagerInterface {
     // transmission: MessageTransmission[] = []
     connectionManager!: ConnectionManager
@@ -19,52 +22,47 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
     transmissionEvent: Subject<TransmissionEvent> = new Subject()
 
     constructor() {
-        // this.transmissionEvent.pipe(
-        //     filter(event => event.event == 'NewTransmissionSet') // filter out odd numbers
-        // ).subscribe((transmissionData: TransmissionEvent) => {
-        //     this.transmission.push(transmissionData.data as MessageTransmission)
-        // })
+       // logic here
     }
 
-    // but this function also needs to talk to connection adaptor manager to instantiate the necessary adapter.
+    /* so there will be some changes here. will nto be assigning just one, but all of them dynamically to pour into this boy
+    Transmitter only have to call this once. */
     getTransmissionInstance(): MessageTransmission {
-        let transmitterAdapter: TransmitterConnectionAdapter = this.connectionManager.getTransmitterConnectionAdapter()
-        let receiverAdapter: ReceiverConnectionAdapter = this.connectionManager.getReceiverConnectionAdapter()
-        let transmitter: MessageTransmissionTransmitter = this.getTransmitter(transmitterAdapter)
-        let receiver: MessageTransmissionReceiver = this.getReceiver(receiverAdapter)
+        let transmitter: MessageTransmissionTransmitter = this.getTransmitter()
+        let receiver: MessageTransmissionReceiver = this.getReceiver()
         let transmission: MessageTransmission = {
             id: uuidv4(),
             receiverId: transmitter.getInfo().id,
             transmitterId: receiver.getInfo().id,
             transmitter: transmitter,
             receiver: receiver,
-            requestResponse: this.getRequestResponse(transmitter, receiver, this.connectionManager.getRequestResponseConnectionAdapter(transmitterAdapter, receiverAdapter))
+            requestResponse: this.getRequestResponse(transmitter, receiver)
         }
         this.instantiateConnectionManager() // start an adapter
         // this.transmission.push(transmission)
         return transmission
     }
 
-    private getTransmitter(connectionAdapter: TransmitterConnectionAdapter): MessageTransmissionTransmitter {
+    private getTransmitter(): MessageTransmissionTransmitter {
         let transmitterProfile: TransmitterProfile = {
             id: uuidv4(),
             name: '', // for now make it empty. We will use the assigned uuid here
             dateCreated: new Date()
         }
-        return new MessageTransmissionTransmitter(transmitterProfile, AdaptorTransmissionRole.Transmitter, connectionAdapter)
+        return new MessageTransmissionTransmitter(transmitterProfile, AdaptorTransmissionRole.Transmitter, this.transmissionEvent)
     }
 
-    private getReceiver(connectionAdapter: ReceiverConnectionAdapter): MessageTransmissionReceiver {
+    private getReceiver(): MessageTransmissionReceiver {
         let receiverProfile: ReceiverProfile = {
             id: uuidv4(),
             name: '', // for now make it empty. We will use the assigned uuid here
             dateCreated: new Date()
         }
-        return new MessageTransmissionReceiver(receiverProfile, AdaptorTransmissionRole.Receiver, connectionAdapter)
+        return new MessageTransmissionReceiver(receiverProfile, AdaptorTransmissionRole.Receiver, this.transmissionEvent)
     }
 
-    private getRequestResponse(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver, connectionAdaptor: RequestResponseConnectionAdapter): MessageTransmissionRequestResponse {
-        return new MessageTransmissionRequestResponse(transmitterInstance, receiverInstance, AdaptorTransmissionRole.RequestResponse, connectionAdaptor)
+    private getRequestResponse(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver): MessageTransmissionRequestResponse {
+        return new MessageTransmissionRequestResponse(transmitterInstance, receiverInstance, AdaptorTransmissionRole.RequestResponse, this.transmissionEvent)
     }
 
     private instantiateConnectionManager(): void {

+ 36 - 9
src/transmission/msg.transmission.receiver.ts

@@ -1,26 +1,53 @@
-import { PartialObserver, Subject, Subscriber, Unsubscribable } from 'rxjs';
-import { AdaptorTransmissionRole } from '../interface/connector.interface';
+import { filter, map, Observable, Observer, PartialObserver, Subject, Subscriber, Subscription, takeWhile, Unsubscribable } from 'rxjs';
+import { AdapterProfile, AdaptorTransmissionRole, TransportEvent, TransportMessage } from '../interface/connector.interface';
 import { MessageTransmissionBase } from './msg.transmission.base';
-import { MessageReceiver as MessageReceiverInterface, ReceiverProfile, TransmitterProfile } from '../interface/transport.interface'
+import { Bus, FisMessage, MessageReceiver as MessageReceiverInterface, ReceiverProfile, TransmissionEvent, TransmissionMessage, TransmitterProfile } from '../interface/transport.interface'
 import { ConnectionAdapter } from '../connector/connector.base';
 import { v4 as uuidv4 } from 'uuid'
 import { ReceiverConnectionAdapter } from '../connector/connector.receiver';
 
 export class MessageTransmissionReceiver extends MessageTransmissionBase implements MessageReceiverInterface {
     receiverProfile!: ReceiverProfile;
+    private incomingTransportMessage: Subject<TransportMessage> = new Subject()
 
-    constructor(profile: ReceiverProfile, role: AdaptorTransmissionRole, adapter: ReceiverConnectionAdapter) {
+    constructor(profile: ReceiverProfile, role: AdaptorTransmissionRole, event: Observable<TransmissionEvent>) {
         super();
-        this.setReceiver(profile, role, adapter)
+
+        this.setReceiver(profile, role)
+        this.handleEvent(event)
     }
 
-    setReceiver(receiverProfile: ReceiverProfile, role: AdaptorTransmissionRole, adapter: ReceiverConnectionAdapter): void {
+    setReceiver(receiverProfile: ReceiverProfile, role: AdaptorTransmissionRole): void {
         this.receiverProfile = receiverProfile
         this.transmissionRole = role
-        this.adapterService = adapter
     }
 
-    private instantiateConnectionAdapter(): void {
-        // logic here
+    getMessageBus(bus: Bus): Observable<any> {
+        return new Observable((observable: Observer<any>) => {
+            // logic here
+            if (bus == Bus.ResponseMessageBus) {
+                const subscription: Subscription = this.incomingTransportMessage.pipe(
+                    filter((message: TransportMessage) => message. === `Incoming Message`),
+                    map(message => message.data as TransportMessage)
+                ).subscribe((message) => {
+                    observable.next(message);
+                });
+
+                // Clean up on unsubscription
+                return () => {
+                    subscription.unsubscribe();
+                };
+            }
+        })
+    }
+
+    private handleEvent(event: Observable<TransmissionEvent>): void {
+        event.pipe(
+            filter((obj: TransmissionEvent) => obj.event == `New Receiver`),
+            map((obj: TransmissionEvent) => (obj.data as ReceiverConnectionAdapter))
+        ).subscribe((adapter: ReceiverConnectionAdapter) => {
+            this.adaptorsArray.push({ id: adapter.getInfo().id, adapter: adapter } as unknown as AdapterProfile)
+            adapter.getMessageBus(Bus.GeneralBus).subscribe(this.incomingTransportMessage)
+        })
     }
 }

+ 4 - 5
src/transmission/msg.transmission.request-response.ts

@@ -1,5 +1,5 @@
 import { MessageTransmissionBase } from "./msg.transmission.base";
-import { FisMessage, MessageReceiver, MessageRequestResponse as MessageRequestResponseInterface } from '../interface/transport.interface'
+import { FisMessage, MessageReceiver, MessageRequestResponse as MessageRequestResponseInterface, TransmissionEvent } from '../interface/transport.interface'
 import { filter, Observable, Observer, Subscription, takeWhile } from "rxjs";
 import { v4 as uuidv4 } from 'uuid'
 import { AdaptorTransmissionRole, RequestResponseConnectionAdapter } from "../interface/connector.interface";
@@ -10,16 +10,15 @@ export class MessageTransmissionRequestResponse extends MessageTransmissionBase
     transmitterInstance!: MessageTransmissionTransmitter;
     receiverInstance!: MessageTransmissionReceiver;
 
-    constructor(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver, role: AdaptorTransmissionRole, adapter: RequestResponseConnectionAdapter) {
+    constructor(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver, role: AdaptorTransmissionRole, event: Observable<TransmissionEvent>) {
         super()
-        this.setTransmissionProfile(transmitterInstance, receiverInstance, role, adapter)
+        this.setTransmissionProfile(transmitterInstance, receiverInstance, role)
     }
 
-    setTransmissionProfile(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver, role: AdaptorTransmissionRole, adapter: RequestResponseConnectionAdapter): void {
+    setTransmissionProfile(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver, role: AdaptorTransmissionRole): void {
         this.transmitterInstance = transmitterInstance
         this.receiverInstance = receiverInstance
         this.transmissionRole = role
-        this.adapterService = adapter
     }
 
     // To be Enhanced. This is actually wrong at the moment

+ 26 - 13
src/transmission/msg.transmission.transmitter.ts

@@ -1,28 +1,41 @@
 import { MessageTransmissionBase } from "./msg.transmission.base";
-import { FisMessage,  MessageTransmitter as MessageTransmitterInterface, TransmissionMessage, TransmitterProfile } from '../interface/transport.interface'
-import { AdaptorTransmissionRole, TransportMessage } from "../interface/connector.interface";
+import { FisMessage, MessageTransmitter as MessageTransmitterInterface, TransmissionEvent, TransmissionMessage, TransmitterProfile } from '../interface/transport.interface'
+import { AdapterProfile, AdaptorTransmissionRole, ConnectionState, TransportMessage, TransportService } from "../interface/connector.interface";
 import { v4 as uuidv4 } from 'uuid'
 import { TransmitterConnectionAdapter } from "../connector/connector.transmitter";
+import { BehaviorSubject, filter, map, Observable } from "rxjs";
 
+/* Take in all the messages that needs to be transported, and divide them accordingly. So the connector instances will do just that
+connectors or adapters will have their own identifier*/
 export class MessageTransmissionTransmitter extends MessageTransmissionBase implements MessageTransmitterInterface {
     transmitterProfile!: TransmitterProfile;
 
-    constructor(profile: TransmitterProfile, role: AdaptorTransmissionRole, adapter: TransmitterConnectionAdapter) {
+    constructor(profile: TransmitterProfile, role: AdaptorTransmissionRole, event: Observable<TransmissionEvent>) {
         super()
-        this.setTransmitter(profile, role, adapter)
+        this.setTransmitter(profile, role)
+
+        this.handleEvent(event)
     }
-    
-    setTransmitter(transmitterProfile: TransmitterProfile, role: AdaptorTransmissionRole, adapter: TransmitterConnectionAdapter): void {
+
+    setTransmitter(transmitterProfile: TransmitterProfile, role: AdaptorTransmissionRole): void {
         this.transmitterProfile = transmitterProfile
         this.transmissionRole = role
-        this.adapterService = adapter
     }
 
-    emit(message: FisMessage): void {
-        (this.adapterService as TransmitterConnectionAdapter).emit({
-            transmitter: this.transmitterProfile,
-            receiver: '', // that means this information is vital
-            payload: message
-        } as TransmissionMessage)
+    emit(message: TransmissionMessage): void {
+        // logic here, emit based on message
+        let adapter: AdapterProfile | undefined = this.adaptorsArray.find(obj => obj.id == message.receiverId)
+        if(adapter) {
+            (adapter.adapter as TransmitterConnectionAdapter).emit(message)
+        }
+    }
+
+    private handleEvent(event: Observable<TransmissionEvent>): void {
+        event.pipe(
+            filter((obj: TransmissionEvent) => obj.event == `New Receiver`),
+            map(obj => (obj as TransmissionEvent).data as TransmitterConnectionAdapter)
+        ).subscribe((adapter: TransmitterConnectionAdapter) => {
+            this.adaptorsArray.push({ id: adapter.getInfo().id, adapter: adapter, connectionState: new BehaviorSubject<ConnectionState>(`OFFLINE`) }) // refers to channel ID, which will be embedded in these messages that pass through
+        })
     }
 }

+ 2 - 2
src/transport/gRPC.ts

@@ -1,8 +1,8 @@
 import { Observable } from "rxjs";
-import { TransportEventNotification } from "../interface/ITransport.interface";
+import { TransportEvent } from "../interface/connector.interface";
 
 export class GrpcTransportService {
-    getTransportEventNotification(): Observable<TransportEventNotification> {
+    getTransportEventNotification(): Observable<TransportEvent> {
         throw new Error("Method not implemented.");
     }
 

+ 14 - 3
src/transport/http.ts

@@ -1,10 +1,21 @@
 import { Observable } from "rxjs";
-import { TransportEventNotification } from "../interface/ITransport.interface";
+import { TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
 
-export class HttpTransportService {
-    getTransportEventNotification(): Observable<TransportEventNotification> {
+export class HttpTransportService implements TransportService {
+    getInfo() {
         throw new Error("Method not implemented.");
     }
+    emit(message: TransportMessage): void {
+        throw new Error("Method not implemented.");
+    }
+    subscribe(): Observable<TransportEvent> {
+        throw new Error("Method not implemented.");
+    }
+    getTransportEventNotification(): Observable<TransportEvent> {
+        throw new Error("Method not implemented.");
+    }
+    startServer(port: number): void {
 
+    }
 
 }

+ 19 - 10
src/transport/websocket.ts

@@ -1,13 +1,14 @@
-import { filter, Observable, Subject } from "rxjs";
+import { BehaviorSubject, filter, Observable, Subject } from "rxjs";
 import { Socket as ClientSocket } from 'socket.io-client'
 import { Socket as SocketForConnectedClient } from "socket.io"
 import { handleClientSocketConnection, handleNewSocketClient, startClientSocketConnection, startSocketServer } from "../utils/socket.utils";
-import { TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
+import { ConnectionState, Info, Transport, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
 
 /* Just code in the context that this websocket service will be handling multiple UI clients. Can think about the server communication at a later time. */
 export class WebsocketTransportService implements TransportService {
-    private connectedServer: ConnectedServerSocket[] = []
-    private connectedClientSocket: ConnectedClientSocket[] = []
+    private info: Info = { transport: Transport.Websocket }
+    private connectedServer: ConnectedServerSocket[] = [] // to allow the possibility of having to communicate with multiple servers as a client
+    private connectedClientSocket: ConnectedClientSocket[] = [] // to keep track of the all the clients that are connected
     // private incomingMessage: Subject<TransportMessage> = new Subject() // this is only for client roles only atm
     private transportEvent: Subject<TransportEvent> = new Subject()
 
@@ -30,7 +31,7 @@ export class WebsocketTransportService implements TransportService {
         startClientSocketConnection(url).then((socket: ClientSocket) => {
             handleClientSocketConnection(socket, this.connectedServer).subscribe(this.transportEvent)
         }).catch((error) => {
-            console.error(`WebsocketTransport ERROR:`, error)
+            console.error(`WebsocketTransport ERROR:`, error) 
         })
     }
 
@@ -38,17 +39,24 @@ export class WebsocketTransportService implements TransportService {
         return this.transportEvent.asObservable()
     }
 
-    
+
     // for transmission(Server Only, not applicable for client Socket)
     public emit(message: TransportMessage): void {
-        // send message
+        /* Just a rough idea, Because this service still needs to be direct the mesage to be emiteed based on the client that send it earlier.
+        For example, if it'd doing a request response, obviosuly, it needs to be identified whose respnses it belong to. */
+        let clientObj: ConnectedClientSocket | undefined = this.connectedClientSocket.find(obj => obj.id == message.target)
+        if(clientObj && clientObj.connectionState.getValue() == 'ONLINE') {
+            clientObj.socketInstance.emit(`message`, message)
+        }
     }
-    
+
     public subscribe(): Observable<TransportEvent> {
         return this.transportEvent.asObservable()
     }
 
-
+    public getInfo(): Info {
+        return this.info
+    }
 }
 
 
@@ -56,6 +64,7 @@ export class WebsocketTransportService implements TransportService {
 interface ConnectedSocket {
     id: string,
     dateCreated: Date,
+    connectionState: BehaviorSubject<ConnectionState>
 }
 export interface ConnectedClientSocket extends ConnectedSocket {
     socketInstance: SocketForConnectedClient
@@ -63,4 +72,4 @@ export interface ConnectedClientSocket extends ConnectedSocket {
 
 export interface ConnectedServerSocket extends ConnectedSocket {
     socketInstance: ClientSocket
-}
+}

+ 68 - 51
src/utils/socket.utils.ts

@@ -1,11 +1,12 @@
-import { Observable, Observer, Subject } from 'rxjs';
+import { BehaviorSubject, Observable, Observer, Subject } from 'rxjs';
 import { createServer } from 'http';
 import { Server, Socket as SocketForConnectedClient } from 'socket.io';
 import { io, Socket as ClientSocket } from 'socket.io-client';
 import * as fs from 'fs'
 import { v4 as uuidv4 } from 'uuid'
-import { TransportEvent } from '../interface/connector.interface';
+import { ConnectionState, Transport, TransportEvent, TransportMessage } from '../interface/connector.interface';
 import { ConnectedClientSocket, ConnectedServerSocket } from '../transport/websocket';
+import { EventMessage } from '../interface/transport.interface';
 
 export function startSocketServer(port: number): Observable<SocketForConnectedClient> {
     return new Observable((observer) => {
@@ -57,18 +58,13 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
     return new Observable((eventNotification: Observer<TransportEvent>) => {
         let clientName!: string
         let buffer: any[] = []
-        let receiverProfileInfo!: ConnectedClientSocket
+        let receiverProfileInfo!: ConnectedServerSocket
 
         // Listen for a connection event
         socket.on('connect', () => {
             console.log('Connected to the server:', socket.id)
-            serversConnected.push({
-                id: uuidv4(),
-                dateCreated: new Date(),
-                socketInstance: socket
-            })
             if (clientName) {
-                checkOwnClientInfo(clientName).then((profile: ConnectedClientSocket) => {
+                checkOwnClientInfo(clientName).then((profile: ConnectedServerSocket) => {
                     receiverProfileInfo = profile
                     socket.emit('profile', {
                         name: 'Old Client',
@@ -103,25 +99,49 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
         socket.on('profile', (data: { name: string, message: any }) => {
             // console.log(data)
             if (data.name == 'New Profile') {
-                console.log(`Assigned client Name: ${(data.message as ConnectedClientSocket).id}`)
-                receiverProfileInfo = data.message as ConnectedClientSocket
-                writeFile(data.message as ConnectedClientSocket, (data.message as ConnectedClientSocket).id).then(() => {
+                console.log(`Assigned client Name: ${(data.message as ConnectedServerSocket).id}`)
+                receiverProfileInfo = data.message as ConnectedServerSocket
+                writeFile(data.message as ConnectedServerSocket, (data.message as ConnectedServerSocket).id).then(() => {
                     clientName = receiverProfileInfo.id
                     // broadcast event to allow retransmission to release buffer
+                    eventNotification.next({
+                        id: uuidv4(),
+                        event: `New Server`,
+                        data: {
+                            channelId: (data.message as ConnectedServerSocket).id,
+                            message: `New Websocket Channel ${(data.message as ConnectedServerSocket).id} established.`
+                        } as EventMessage
+                    })
 
                 }).catch((error) => { }) // do nothing at the moment. 
+                // Update websocket instance record
+                serversConnected.push({
+                    id: (data.message as ConnectedServerSocket).id,
+                    dateCreated: new Date(),
+                    socketInstance: socket,
+                    connectionState: new BehaviorSubject<ConnectionState>(`ONLINE`)
+                })
             }
             if (data.name == 'Adjusted Profile') {
-                console.log(`Assigned client Name: ${(data.message as ConnectedClientSocket).id}`)
-                receiverProfileInfo = data.message as ConnectedClientSocket
-                writeFile(data.message as ConnectedClientSocket, (data.message as ConnectedClientSocket).id).then(() => {
+                console.log(`Assigned client Name: ${(data.message as ConnectedServerSocket).id}`)
+                receiverProfileInfo = data.message as ConnectedServerSocket
+                writeFile(data.message as ConnectedServerSocket, (data.message as ConnectedServerSocket).id).then(() => {
                     // broadcast event to allow retransmission to release buffer
                     eventNotification.next({
                         id: uuidv4(),
                         event: 'Client Reconnected',
-                        data: ''
+                        data: {
+                            channelId: (data.message as ConnectedServerSocket).id,
+                            message: `Existing Websocket Channel ${(data.message as ConnectedServerSocket).id} re-established.`
+                        } as EventMessage
                     })
                 }).catch((error) => { }) // do nothing at the moment. 
+                // Update websocket instance record
+                let clientObj = serversConnected.find(obj => obj.id === (data.message as ConnectedServerSocket).id)
+                if (clientObj) {
+                    clientObj.socketInstance = receiverProfileInfo.socketInstance
+                    clientObj.connectionState.next('ONLINE')
+                }
             }
             if (data.name == 'Error') {
                 console.log(`Server cannot find credentials`, data.message)
@@ -142,8 +162,12 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
                 eventNotification.next({
                     id: uuidv4(),
                     event: `Client Disconnected`,
-                    data: ''
+                    data: {
+                        channelId: receiverProfileInfo.id,
+                        message: `Server for Channel ${receiverProfileInfo.id} disconnected.`
+                    } as EventMessage
                 })
+                receiverProfileInfo.connectionState.next(`OFFLINE`)
             }
         });
     })
@@ -160,13 +184,18 @@ export function handleNewSocketClient(socket: SocketForConnectedClient, connecte
                 let clientInstance: ConnectedClientSocket = {
                     id: uuidv4(),
                     dateCreated: new Date(),
-                    socketInstance: socket
+                    socketInstance: socket,
+                    connectionState: new BehaviorSubject<ConnectionState>(`ONLINE`)
                 }
                 // publish first event notification
                 event.next({
                     id: uuidv4(),
                     event: `New Client`,
-                    data: clientInstance
+                    data: {
+                        channelId: clientInstance.id,
+                        message: `New Client Connected. Channel ID assigned: ${clientInstance.id}`,
+                        payload: clientInstance
+                    } as EventMessage
                 })
                 // send to receiver for reference
                 socket.emit('profile', {
@@ -183,6 +212,19 @@ export function handleNewSocketClient(socket: SocketForConnectedClient, connecte
                     socket.emit('profile', { name: 'Adjusted Profile', message: clientInstance })
                     // need to start listening again, because it's assigned a different socket instance this time round
                     startListening(socket, clientInstance, event)
+                    event.next({
+                        id: uuidv4(),
+                        event: 'Client Reconnected',
+                        data: {
+                            channelId: clientInstance.id,
+                            message: `Client ${clientInstance.id} connection re-established`,
+                            payload: clientInstance
+                        } as EventMessage
+                    })
+                    // Resume operation
+                    if (clientInstance.connectionState.getValue() == 'OFFLINE') {
+                        clientInstance.connectionState.next(`ONLINE`)
+                    }
                 } else {
                     console.log(`Profile Not Found`)
                     socket.emit('profile', { name: 'Error', message: 'Receiver Profile Not found' })
@@ -194,7 +236,7 @@ export function handleNewSocketClient(socket: SocketForConnectedClient, connecte
 
 
 // Specifically to write receiver profile information
-export async function writeFile(data: ConnectedClientSocket, filename: string): Promise<boolean> {
+export async function writeFile(data: ConnectedServerSocket, filename: string): Promise<boolean> {
     return new Promise((resolve, reject) => {
         // Write JSON data to a file
         fs.writeFile(`${filename}.json`, JSON.stringify(data, null, 2), (err) => {
@@ -212,7 +254,7 @@ export async function writeFile(data: ConnectedClientSocket, filename: string):
 
 
 // Check if filename exists. Return profile information if there's any
-export async function checkOwnClientInfo(filename: string): Promise<ConnectedClientSocket> {
+export async function checkOwnClientInfo(filename: string): Promise<ConnectedServerSocket> {
     return new Promise((resolve, reject) => {
         // Check if the file exists
         if (fs.existsSync(`${filename}.json`)) {
@@ -248,36 +290,12 @@ export function startListening(socket: SocketForConnectedClient, client: Connect
             id: uuidv4(),
             event: 'New Message',
             data: {
-                clientID: client.id,
-                dateReceived: new Date(),
-                payload: message
-            }
-        })
-    })
-
-    socket.on('request', (message: any) => {
-        // here : Let's say there's a subcsription request here
-        eventListener.next({
-            id: uuidv4(),
-            event: 'New Message',
-            data: {
-                clientID: client.id,
-                dateReceived: new Date(),
+                id: uuidv4(),
+                dateCreated: new Date(),
+                transport: Transport.Websocket,
+                target: client.id, // this would have been assigned it's own adapter with the associated ID
                 payload: message
-            }
-        })
-    })
-
-    socket.on('notification', (notification: any) => {
-        // logic here
-        eventListener.next({
-            id: uuidv4(),
-            event: `Notification`,
-            data: {
-                clientID: client.id,
-                dateReceived: new Date(),
-                payload: notification
-            }
+            } as TransportMessage
         })
     })
 
@@ -295,4 +313,3 @@ export function startListening(socket: SocketForConnectedClient, client: Connect
     })
 }
 
-