2 Commits 3fb0761748 ... 12dea90738

Author SHA1 Message Date
  Enzo 12dea90738 minor correction 2 weeks ago
  Enzo 764947d4b1 additional changes. Setting up test case to test the overall 2 weeks ago

+ 31 - 0
doc/explanation.txt

@@ -0,0 +1,31 @@
+Currently the way it works is, tranport service, which is the websocket or http or whatever transport protocol that is selected will have a universal emit and subscribe public method 
+so, the concept of having bi directional will be somewhat weaved into every class object of said transport service. What it will do is that, when a client is connected, i mean it 
+doesn't have to be literally like in the case of http, we will assume a logical channel so to speak. So when a client is connected, the connected client will be assinged an ID,
+one that will be used by the connection manager to instantiate a set of adapter with the assigned id, to identify the client as well as the type of transport it used. 
+Essentially, a set of adapter is tied to one client and it's associated transport type.
+
+There the core of this transport interface, or at least that what I will choose to call it, will communicate with transmission manager, which in turns will be communicating with 
+connection manager to handle how the message is received and transmitted. The need for this is to allow the integration for re-transmission service, so that messagecan be 
+re-transmitted over, should there be an interuption of sorts. 
+
+Now, in this context, we are talking about a server that can have multiple client connected to it, so that means there is a need to manage multiple 'traffic' with these connectedc
+clietns. To remember their credentials so as to be able to handle retransmission should they lost connection for whatever reasons, while dealing with various type of transport.
+FOr now, the example provided is only using websocket at the moment, another transport sample, which is http will be prepared at a later stage when i get this websocket working,
+with multiple clients as well as the server acting as a receiver itself. 
+
+So, when an application starts, which assume to be the provider, will instantiate a transmission manager. The transmission manager will in turn instantiate a connection mnager ,
+and the connection manager will also instantiate a transport service specified at .env file. Once that is done, the connection manager will then subscribe to the transportevent 
+also instantiated within it's component, and be exposed to transmission manager as well, so that all the concerned parties that are subscribed to transport event, will know what's 
+happening. So, in this websocket's context: A client is connected, a transport event is reported and announced. Connection manager will listen to this type of event and 
+instantiate a set of adapters to cater for this specific client. It will keep a copy with it's class, and can be retrieved by transmission transmitter and receiver and request response
+class. They too will also be listening to new adapter set event so to speak, and store the reference pointer in it's own local memory array. 
+So here's how it is, the app which is 
+the producer, once calls for transmission manager to instantiate a set of transmission methods, which is the message transmitter, receiver and request response class, which doesn't 
+care what the underlying transport that is being used, will use the instantiated objects to perfrom all the available methods to transmit and receive message, even as far as 
+emulating request response behaviour. So, when the producer wants to send a message, it will send through the message transmitter instantiated earlier. By the way, forgot to mention,
+when a client is connected, an ID will be assinged to the client, the same id that was assigned is also assigned to the adapter, which makes it easier to be managed later as I continue
+to develop this point. I probably mentioned this a few paragraphs, but anyway, as I was sating the message transmitter will read the message and see what adapter it's using,
+and search through it's array adapter ref, to find the matching adapterID, and then using that adapter to pass along the message. The adapter tha was chosen would have already been
+mapped with it's associated transport service to send the message out to the respective clients. At least that whole gist of it. 
+With that in mind, the retransmission service will sit at the adapter side, since each adapter is tied to one client and it's respective transport type, so it's easier to manage.
+All events or activities will go through one, well "global" subject, which is the transport event instantiated at connection manager.

+ 6 - 2
src/connector/connector.base.ts

@@ -1,12 +1,13 @@
 import { BehaviorSubject, Observable, Observer, Subject, Subscriber, Unsubscribable } from "rxjs";
 import { BehaviorSubject, Observable, Observer, Subject, Subscriber, Unsubscribable } from "rxjs";
 import dotenv from 'dotenv';
 import dotenv from 'dotenv';
-import { AdapterProfile, AdaptorTransmissionRole, ConnectionAdaptorBase, ConnectionState, TransportMessage } from "../interface/connector.interface";
+import { AdapterProfile, AdaptorTransmissionRole, ConnectionAdaptorBase, ConnectionState, TransportEvent, TransportMessage } from "../interface/connector.interface";
 import { Bus, FisAppActor, FisMessage } from "../interface/transport.interface";
 import { Bus, FisAppActor, FisMessage } from "../interface/transport.interface";
 
 
 dotenv.config();
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
 So how?: */
 export class ConnectionAdapter implements ConnectionAdaptorBase {
 export class ConnectionAdapter implements ConnectionAdaptorBase {
+    event!: Subject<TransportEvent>
     connector: any;
     connector: any;
     connectorProfile!: AdapterProfile;
     connectorProfile!: AdapterProfile;
     connectionStateBus!: BehaviorSubject<ConnectionState>;
     connectionStateBus!: BehaviorSubject<ConnectionState>;
@@ -15,6 +16,9 @@ export class ConnectionAdapter implements ConnectionAdaptorBase {
     constructor(port?: number, url?: string) {
     constructor(port?: number, url?: string) {
         if (!port && !url) console.error(`No role has assigned for transport service due to lack of information provided!`)
         if (!port && !url) console.error(`No role has assigned for transport service due to lack of information provided!`)
     }
     }
+    setAdapterProfile(adapterId: string): void {
+        throw new Error("Method not implemented.")
+    }
     getInfo(): any {
     getInfo(): any {
         throw new Error("Method not implemented.");
         throw new Error("Method not implemented.");
     }
     }
@@ -32,7 +36,7 @@ export class ConnectionAdapter implements ConnectionAdaptorBase {
     }
     }
     getMessageBus(bus: Bus): Observable<any> {
     getMessageBus(bus: Bus): Observable<any> {
         throw new Error("Method not implemented.");
         throw new Error("Method not implemented.");
-    }    
+    }
     subscribeMessages(messageFilter: any): Observable<any> {
     subscribeMessages(messageFilter: any): Observable<any> {
         throw new Error("Method not implemented.");
         throw new Error("Method not implemented.");
     }
     }

+ 38 - 35
src/connector/connector.manager.ts

@@ -1,9 +1,9 @@
 import { v4 as uuidv4 } from 'uuid'
 import { v4 as uuidv4 } from 'uuid'
 import dotenv from 'dotenv';
 import dotenv from 'dotenv';
-import { TransportEvent, ConnectionManager as ConnectionManagerInterface, ConnectionSet, Transport, TransportService } from "../interface/connector.interface"
+import { TransportEvent, ConnectionManager as ConnectionManagerInterface, AdapterSet, Transport, TransportService } from "../interface/connector.interface"
 import { TransmitterConnectionAdapter } from './connector.transmitter'
 import { TransmitterConnectionAdapter } from './connector.transmitter'
 import { ReceiverConnectionAdapter } from './connector.receiver'
 import { ReceiverConnectionAdapter } from './connector.receiver'
-import { TransmissionEvent } from '../interface/transport.interface'
+import { EventMessage } from '../interface/transport.interface'
 import { Subject } from 'rxjs'
 import { Subject } from 'rxjs'
 import { WebsocketTransportService } from '../transport/websocket'
 import { WebsocketTransportService } from '../transport/websocket'
 import { RequestResponseConnectionAdapter } from './connector.request.response'
 import { RequestResponseConnectionAdapter } from './connector.request.response'
@@ -14,50 +14,29 @@ the transmission side, must already be configured and locked to their respective
 as well as request response. Let's go along with this line for now. */
 as well as request response. Let's go along with this line for now. */
 export class ConnectionManager implements ConnectionManagerInterface {
 export class ConnectionManager implements ConnectionManagerInterface {
     transportService!: TransportService;
     transportService!: TransportService;
-    transmissionEvent: Subject<TransmissionEvent> = new Subject()
     transportEvent: Subject<TransportEvent> = new Subject() // Every event goes through this boy, and it will be expose across differnet components
     transportEvent: Subject<TransportEvent> = new Subject() // Every event goes through this boy, and it will be expose across differnet components
-    transmissionSet: ConnectionSet[] = []
+    adapterSet: AdapterSet[] = []
 
 
-    constructor(messageTransmissionEvent: Subject<TransmissionEvent>) {
-        this.transmissionEvent = messageTransmissionEvent 
+    constructor() {
         // logic here
         // logic here
         this.setUpServer(process.env.PORT as unknown as number, this.getTransportService(process.env.Transport as unknown as Transport))
         this.setUpServer(process.env.PORT as unknown as number, this.getTransportService(process.env.Transport as unknown as Transport))
 
 
-        // will also have information on events happening on transmission side
-        this.transmissionEvent.subscribe((event: TransmissionEvent) => {
-            console.log(`TransmissionEvent: `, event)
-        })
         // Everything that happens will go through here. Can also plug in logging service here at a later time
         // Everything that happens will go through here. Can also plug in logging service here at a later time
         this.transportEvent.subscribe((event: TransportEvent) => {
         this.transportEvent.subscribe((event: TransportEvent) => {
             console.log(`TranpsortEvent: `, event)
             console.log(`TranpsortEvent: `, event)
-        })
-    }
-
-    /* All three methods that return their respective instances will have it's activities reported through transportEvent */
-    getTransmitterConnectionAdapter(): TransmitterConnectionAdapter {
-        if (this.transportService) {
-            let adapter: TransmitterConnectionAdapter = new TransmitterConnectionAdapter(this.transportService)
-            return adapter
-        } else {
-            throw new Error(`Transmitter Transport NOT initialized.`)
-        }
-    }
 
 
-    getReceiverConnectionAdapter(): ReceiverConnectionAdapter {
-        if (this.transportService) {
-            let adapter: ReceiverConnectionAdapter = new ReceiverConnectionAdapter(this.transportService)
-            return adapter
-        } else {
-            throw new Error(`Receiver Transport NOT initialized.`)
-        }
+            if (event.event == 'New Client') {
+                this.handleNewClient((event.data as EventMessage).payload) // payload is connectedclientInstance
+            }
+        })
     }
     }
 
 
-    getRequestResponseConnectionAdapter(transmitterAdapter: TransmitterConnectionAdapter, receiverAdapter: ReceiverConnectionAdapter): RequestResponseConnectionAdapter {
-        if (this.transportService) {
-            let adapter: RequestResponseConnectionAdapter = new RequestResponseConnectionAdapter(transmitterAdapter, receiverAdapter)
-            return adapter
+    getAdapterSetById(adapterSetid: string): AdapterSet {
+        let adapterSet: AdapterSet | undefined = this.adapterSet.find(obj => obj.id === adapterSetid)
+        if (adapterSet) {
+            return adapterSet
         } else {
         } else {
-            throw new Error(`Request Response Transport NOT initialized.`)
+            throw new Error(`Adapter Set not found!!!`)
         }
         }
     }
     }
 
 
@@ -98,4 +77,28 @@ export class ConnectionManager implements ConnectionManagerInterface {
             //  To be Enhanced at a later time
             //  To be Enhanced at a later time
         }
         }
     }
     }
-}
+
+    private handleNewClient(clientInstance: any): void {
+        // need to setup the adapters to be called by the transmission manager
+        let transmitter: TransmitterConnectionAdapter = new TransmitterConnectionAdapter(this.transportService, clientInstance.id, this.transportEvent)
+        let receiver: ReceiverConnectionAdapter = new ReceiverConnectionAdapter(this.transportService, clientInstance.id, this.transportEvent)
+        let requestResponse: RequestResponseConnectionAdapter = new RequestResponseConnectionAdapter(transmitter, receiver, clientInstance.id)
+        let adapterSet: AdapterSet = {
+            id: clientInstance.id,
+            dateCreated: new Date(),
+            transmitterAdapter: transmitter,
+            receiverAdapter: receiver,
+            requestResponsAdapter: requestResponse
+        }
+        this.adapterSet.push(adapterSet)
+
+        this.transportEvent.next({
+            id: uuidv4(),
+            event: `New Adapter`,
+            data: {
+                adapterId: adapterSet.id,
+                message: `New Adapter Set${adapterSet.id} has been added`,
+            } as EventMessage
+        })
+    }
+}

+ 20 - 5
src/connector/connector.receiver.ts

@@ -3,23 +3,35 @@ import { Bus, FisAppActor, FisMessage } from "../interface/transport.interface";
 import { ConnectionAdapter } from "./connector.base";
 import { ConnectionAdapter } from "./connector.base";
 import { AdaptorTransmissionRole, ConnectionState, ReceiverConnectionAdapter as ReceiverConnectionAdapterInterface, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
 import { AdaptorTransmissionRole, ConnectionState, ReceiverConnectionAdapter as ReceiverConnectionAdapterInterface, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
 import { BehaviorSubject, filter, map, Observable, Observer, Subject, Subscription, takeWhile } from 'rxjs';
 import { BehaviorSubject, filter, map, Observable, Observer, Subject, Subscription, takeWhile } from 'rxjs';
+import { v4 as uuidv4 } from 'uuid'
 
 
 dotenv.config();
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
 So how?: */
 export class ReceiverConnectionAdapter extends ConnectionAdapter implements ReceiverConnectionAdapterInterface {
 export class ReceiverConnectionAdapter extends ConnectionAdapter implements ReceiverConnectionAdapterInterface {
-    event: Subject<TransportEvent> = new Subject()
+    event!: Subject<TransportEvent>
 
 
-    constructor(adapter: TransportService) {
+    constructor(adapter: TransportService, adapterId: string, event: Subject<TransportEvent>) {
         super()
         super()
         // logic here
         // logic here
+        this.event = event
         this.adaptorTransmissionRole = AdaptorTransmissionRole.Receiver
         this.adaptorTransmissionRole = AdaptorTransmissionRole.Receiver
         this.connector = adapter
         this.connector = adapter
+
+        this.setAdapterProfile(adapterId)
+    }
+
+    setAdapterProfile(adapterId: string): void {
+        this.connectorProfile = {
+            id: adapterId,
+            connectionState: this.connectionStateBus,
+            adapter: this.connector
+        }
     }
     }
 
 
     getMessageBus(bus: Bus): Observable<any> {
     getMessageBus(bus: Bus): Observable<any> {
         return new Observable((observable: Observer<any>) => {
         return new Observable((observable: Observer<any>) => {
-            if (bus == Bus.ResponseMessageBus) {
+            if (bus == Bus.GeneralBus) {
                 const subscription: Subscription = this.event.pipe(
                 const subscription: Subscription = this.event.pipe(
                     filter((message: TransportEvent) => message.event === 'New Message'),
                     filter((message: TransportEvent) => message.event === 'New Message'),
                     // take message only specific for this adapter. Although that itself wouldn't be necessary, considerng everything goes through transportEvent. I guess it's for better management
                     // take message only specific for this adapter. Although that itself wouldn't be necessary, considerng everything goes through transportEvent. I guess it's for better management
@@ -31,8 +43,8 @@ export class ReceiverConnectionAdapter extends ConnectionAdapter implements Rece
                         }
                         }
                         return shouldTake;
                         return shouldTake;
                     }),
                     }),
-                    map(message => (message.data as TransportMessage).payload as FisMessage)
-                ).subscribe((message: FisMessage) => {
+                    map(message => message.data as TransportMessage)
+                ).subscribe((message: TransportMessage) => {
                     observable.next(message);
                     observable.next(message);
                 });
                 });
     
     
@@ -41,6 +53,9 @@ export class ReceiverConnectionAdapter extends ConnectionAdapter implements Rece
                     subscription.unsubscribe();
                     subscription.unsubscribe();
                 };
                 };
             }
             }
+            if (bus == Bus.ResponseMessageBus) {
+                /// logic here
+            }
             if (bus == Bus.NotificationMessageBus) {
             if (bus == Bus.NotificationMessageBus) {
                 /// logic here
                 /// logic here
             }
             }

+ 38 - 5
src/connector/connector.request.response.ts

@@ -1,23 +1,34 @@
 import dotenv from 'dotenv';
 import dotenv from 'dotenv';
-import { FisAppActor, FisMessage, TransmissionMessage } from "../interface/transport.interface";
+import { Bus, FisAppActor, FisMessage, TransmissionMessage } from "../interface/transport.interface";
 import { ConnectionAdapter } from "./connector.base";
 import { ConnectionAdapter } from "./connector.base";
 import { RequestResponseConnectionAdapter as RequestResponseConnectionAdapterInterface, TransportEvent, TransportMessage } from "../interface/connector.interface";
 import { RequestResponseConnectionAdapter as RequestResponseConnectionAdapterInterface, TransportEvent, TransportMessage } from "../interface/connector.interface";
 import { TransmitterConnectionAdapter } from './connector.transmitter';
 import { TransmitterConnectionAdapter } from './connector.transmitter';
 import { ReceiverConnectionAdapter } from './connector.receiver';
 import { ReceiverConnectionAdapter } from './connector.receiver';
-import { Observable, Observer, Subject } from 'rxjs';
+import { filter, map, Observable, Observer, Subject, Subscription, takeWhile } from 'rxjs';
 
 
 dotenv.config();
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
 So how?: */
 export class RequestResponseConnectionAdapter extends ConnectionAdapter implements RequestResponseConnectionAdapterInterface {
 export class RequestResponseConnectionAdapter extends ConnectionAdapter implements RequestResponseConnectionAdapterInterface {
-    event: Subject<TransportEvent> = new Subject()
-    incomingResponses: Subject<TransportMessage> = new Subject()
+    private transmitterAdapter!: TransmitterConnectionAdapter
+    private receiverAdapter!: ReceiverConnectionAdapter
 
 
-    constructor(transmitterAdapter: TransmitterConnectionAdapter, receiverAdapter: ReceiverConnectionAdapter) {
+    constructor(transmitterAdapter: TransmitterConnectionAdapter, receiverAdapter: ReceiverConnectionAdapter, adapterId: string) {
         super()
         super()
         // logic here
         // logic here
+        this.transmitterAdapter = transmitterAdapter
+        this.receiverAdapter = receiverAdapter
+        this.setAdapterProfile(adapterId)
     }
     }
     
     
+    setAdapterProfile(adapterId: string): void {
+        this.connectorProfile = {
+            id: adapterId,
+            connectionState: this.connectionStateBus,
+            adapter: this.connector
+        }    
+    }
+
     emit(message: TransmissionMessage): void {
     emit(message: TransmissionMessage): void {
         throw new Error('Method not implemented.');
         throw new Error('Method not implemented.');
     }
     }
@@ -28,8 +39,30 @@ export class RequestResponseConnectionAdapter extends ConnectionAdapter implemen
     send(message: TransmissionMessage): Observable<TransportMessage> {
     send(message: TransmissionMessage): Observable<TransportMessage> {
         return new Observable((response: Observer<TransportMessage>) => {
         return new Observable((response: Observer<TransportMessage>) => {
             // logic here
             // logic here
+            this.transmitterAdapter.emit(message)
+            const subscription: Subscription = this.event.pipe(
+                filter((message: TransportEvent) => message.event === 'New Message'),
+                // take message only specific for this adapter. Although that itself wouldn't be necessary, considerng everything goes through transportEvent. I guess it's for better management
+                filter((message:TransportEvent) => (message.data as TransportMessage).target == this.connectorProfile.id), 
+                takeWhile((message: TransportEvent) => {
+                    const shouldTake = ((message.data as TransportMessage).payload as FisMessage).data !== 'Complete';
+                    if (!shouldTake) {
+                        response.complete();  // Ensure the observer is completed
+                    }
+                    return shouldTake;
+                }),
+                map(message => message.data as TransportMessage)
+            ).subscribe((message: TransportMessage) => {
+                response.next(message);
+            });
+
+            // Clean up on unsubscription
+            return () => {
+                subscription.unsubscribe();
+            };
         })
         })
     }
     }
+  
 }
 }
 
 
 
 

+ 15 - 6
src/connector/connector.transmitter.ts

@@ -1,11 +1,9 @@
 import dotenv from 'dotenv';
 import dotenv from 'dotenv';
 import { FisAppActor, FisMessage, TransmissionMessage } from "../interface/transport.interface";
 import { FisAppActor, FisMessage, TransmissionMessage } from "../interface/transport.interface";
 import { ConnectionAdapter } from "./connector.base";
 import { ConnectionAdapter } from "./connector.base";
-import { AdaptorTransmissionRole, ConnectionState, TransmitterConnectionAdapter as TransmitterConnectionAdapterInterface, Transport, TransportMessage, TransportService } from '../interface/connector.interface';
-import { BehaviorSubject, Observable, Observer } from 'rxjs';
+import { AdaptorTransmissionRole, ConnectionState, TransmitterConnectionAdapter as TransmitterConnectionAdapterInterface, Transport, TransportEvent, TransportMessage, TransportService } from '../interface/connector.interface';
+import { BehaviorSubject, Observable, Observer, Subject } from 'rxjs';
 import { v4 as uuidv4 } from 'uuid'
 import { v4 as uuidv4 } from 'uuid'
-import { WebsocketTransportService } from '../transport/websocket';
-import { HttpTransportService } from '../transport/http';
 
 
 dotenv.config();
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
@@ -13,9 +11,20 @@ So how?: */
 export class TransmitterConnectionAdapter extends ConnectionAdapter implements TransmitterConnectionAdapterInterface {
 export class TransmitterConnectionAdapter extends ConnectionAdapter implements TransmitterConnectionAdapterInterface {
     connectionStateBus: BehaviorSubject<ConnectionState> = new BehaviorSubject('OFFLINE' as ConnectionState) // this cannot work, because there will be alot of clients connected presumably
     connectionStateBus: BehaviorSubject<ConnectionState> = new BehaviorSubject('OFFLINE' as ConnectionState) // this cannot work, because there will be alot of clients connected presumably
 
 
-    constructor(transportService: TransportService) {
+    constructor(transportService: TransportService, adapterId: string, event: Subject<TransportEvent>) {
         super()
         super()
         // logic here
         // logic here
+        this.event = event
+        this.connector = transportService
+        this.setAdapterProfile(adapterId)
+    }
+
+    setAdapterProfile(adapterId: string): void {
+        this.connectorProfile = {
+            id: adapterId,
+            connectionState: this.connectionStateBus,
+            adapter: this.connector
+        }
     }
     }
 
 
     emitStream(message: TransmissionMessage): void {
     emitStream(message: TransmissionMessage): void {
@@ -27,7 +36,7 @@ export class TransmitterConnectionAdapter extends ConnectionAdapter implements T
             id: uuidv4(),
             id: uuidv4(),
             dateCreated: new Date(),
             dateCreated: new Date(),
             transport: Transport.Websocket,
             transport: Transport.Websocket,
-            target: message.receiverId,
+            target: message.adapterId,
             payload: message.payload
             payload: message.payload
         } as TransportMessage)
         } as TransportMessage)
     }
     }

+ 16 - 12
src/interface/connector.interface.ts

@@ -1,5 +1,5 @@
 import { BehaviorSubject, Observable, Subject } from "rxjs"
 import { BehaviorSubject, Observable, Subject } from "rxjs"
-import { Bus, FisAppActor, ReceiverProfile, TransmissionEvent, TransmissionMessage, TransmissionProfile, TransmitterProfile } from "./transport.interface"
+import { Bus, FisAppActor, ReceiverProfile, TransmissionMessage, TransmissionProfile, TransmitterProfile } from "./transport.interface"
 import { ConnectionAdapter } from "../connector/connector.base"
 import { ConnectionAdapter } from "../connector/connector.base"
 
 
 
 
@@ -20,21 +20,19 @@ export interface AdapterProfile {
 export interface ConnectionManager {
 export interface ConnectionManager {
     transportService: TransportService
     transportService: TransportService
     // to get notified on what's going on in Transmission Manager
     // to get notified on what's going on in Transmission Manager
-    transmissionEvent: Subject<TransmissionEvent>
     transportEvent: Subject<TransportEvent>
     transportEvent: Subject<TransportEvent>
     // list of connection
     // list of connection
-    transmissionSet: ConnectionSet[]
+    adapterSet: AdapterSet[]
     // Called by transmission manager to have an instance of these adapters
     // Called by transmission manager to have an instance of these adapters
-    getTransmitterConnectionAdapter(): TransmitterConnectionAdapter
-    getReceiverConnectionAdapter(): ReceiverConnectionAdapter
-    getRequestResponseConnectionAdapter(transmitterAdapter: TransmitterConnectionAdapter, receiverConnectionAdapter: ReceiverConnectionAdapter): RequestResponseConnectionAdapter
+    getAdapterSetById(adapterSetid: string): AdapterSet
 }
 }
 
 
-export interface ConnectionAdaptorBase  {
+export interface ConnectionAdaptorBase {
     connector: TransportService // this one will refer to the actual tranpsort service like websocket and so on
     connector: TransportService // this one will refer to the actual tranpsort service like websocket and so on
     connectorProfile: AdapterProfile
     connectorProfile: AdapterProfile
     connectionStateBus: BehaviorSubject<ConnectionState>
     connectionStateBus: BehaviorSubject<ConnectionState>
     adaptorTransmissionRole: AdaptorTransmissionRole
     adaptorTransmissionRole: AdaptorTransmissionRole
+    event: Subject<TransportEvent>
 
 
     subscribeConnectionState(): Observable<ConnectionState>
     subscribeConnectionState(): Observable<ConnectionState>
     publishConnectionState(): void
     publishConnectionState(): void
@@ -42,6 +40,7 @@ export interface ConnectionAdaptorBase  {
     disconnect(): void
     disconnect(): void
     getMessageBus(bus: Bus): Observable<any>
     getMessageBus(bus: Bus): Observable<any>
     getInfo(): any
     getInfo(): any
+    setAdapterProfile(id: string): void
 }
 }
 
 
 
 
@@ -51,7 +50,6 @@ export interface TransmitterConnectionAdapter extends ConnectionAdaptorBase {
 }
 }
 
 
 export interface ReceiverConnectionAdapter extends ConnectionAdaptorBase {
 export interface ReceiverConnectionAdapter extends ConnectionAdaptorBase {
-    event: Subject<TransportEvent>
     subscribeMessages(messageFilter: any): Observable<any>
     subscribeMessages(messageFilter: any): Observable<any>
 }
 }
 
 
@@ -83,12 +81,9 @@ export interface TransportMessage {
 
 
 export interface TransportEvent {
 export interface TransportEvent {
     id: string,
     id: string,
-    event: 'Server Started' | 'New Client' | 'Client Disconnected' | 'Client Reconnected' | `Server Disconnected` | 'New Message' | `Notification` | `New Server` | `New Transport`,
+    event: 'Server Started' | 'New Client' | 'Client Disconnected' | 'Client Reconnected' | `Server Disconnected` | 'New Message' | `Notification` | `New Server` | `New Transport` | 'New Adapter',
     data: any
     data: any
 }
 }
-export interface ConnectionSet {
-    // TBD
-}
 
 
 export interface TransportService {
 export interface TransportService {
     getInfo(): any
     getInfo(): any
@@ -99,4 +94,13 @@ export interface TransportService {
 
 
 export interface Info {
 export interface Info {
     transport: Transport
     transport: Transport
+}
+
+
+export interface AdapterSet {
+    id: string,
+    dateCreated: Date,
+    transmitterAdapter: TransmitterConnectionAdapter,
+    receiverAdapter: ReceiverConnectionAdapter,
+    requestResponsAdapter: RequestResponseConnectionAdapter
 }
 }

+ 9 - 16
src/interface/transport.interface.ts

@@ -1,5 +1,5 @@
 import { Observable, Subject } from "rxjs";
 import { Observable, Subject } from "rxjs";
-import { AdapterProfile, AdaptorTransmissionRole, RequestResponseConnectionAdapter } from "./connector.interface";
+import { AdapterProfile, AdaptorTransmissionRole, RequestResponseConnectionAdapter, TransportEvent } from "./connector.interface";
 import { MessageTransmissionTransmitter } from "../transmission/msg.transmission.transmitter";
 import { MessageTransmissionTransmitter } from "../transmission/msg.transmission.transmitter";
 import { MessageTransmissionReceiver } from "../transmission/msg.transmission.receiver";
 import { MessageTransmissionReceiver } from "../transmission/msg.transmission.receiver";
 
 
@@ -10,11 +10,10 @@ export interface MessageTransmissionManager {
 
 
 export interface MessageTransmission {
 export interface MessageTransmission {
     id: string,
     id: string,
-    receiverId: string,
-    transmitterId: string,
     transmitter: MessageTransmissionTransmitter,
     transmitter: MessageTransmissionTransmitter,
     receiver: MessageTransmissionReceiver,
     receiver: MessageTransmissionReceiver,
-    requestResponse: MessageRequestResponse
+    requestResponse: MessageRequestResponse,
+    event: Observable<TransportEvent>
 }
 }
 export interface FisAppActor {
 export interface FisAppActor {
     incomingMessageBus: Subject<any>
     incomingMessageBus: Subject<any>
@@ -40,20 +39,20 @@ export interface MessageTransmissionBase extends FisAppActor {
 export interface MessageReceiver extends MessageTransmissionBase {
 export interface MessageReceiver extends MessageTransmissionBase {
     receiverProfile: ReceiverProfile
     receiverProfile: ReceiverProfile
 
 
-    setReceiver(receiverProfile: ReceiverProfile, role: AdaptorTransmissionRole, event: TransmissionEvent): void
+    setReceiver(receiverProfile: ReceiverProfile, role: AdaptorTransmissionRole, event: Subject<TransportEvent>): void
 }
 }
 
 
 export interface MessageTransmitter extends MessageTransmissionBase {
 export interface MessageTransmitter extends MessageTransmissionBase {
     transmitterProfile: TransmitterProfile
     transmitterProfile: TransmitterProfile
 
 
-    setTransmitter(transmitterProfile: TransmitterProfile, role: AdaptorTransmissionRole, event: TransmissionEvent): void
+    setTransmitter(transmitterProfile: TransmitterProfile, role: AdaptorTransmissionRole, event: Subject<TransportEvent>): void
 }
 }
 
 
 export interface MessageRequestResponse extends MessageTransmissionBase {
 export interface MessageRequestResponse extends MessageTransmissionBase {
     transmitterInstance: MessageTransmissionTransmitter
     transmitterInstance: MessageTransmissionTransmitter
     receiverInstance: MessageTransmissionReceiver
     receiverInstance: MessageTransmissionReceiver
 
 
-    setTransmissionProfile(transmissionInfo: MessageTransmissionTransmitter, receiverInfo: MessageTransmissionReceiver, role: AdaptorTransmissionRole, event: TransmissionEvent): void
+    setTransmissionProfile(transmissionInfo: MessageTransmissionTransmitter, receiverInfo: MessageTransmissionReceiver, role: AdaptorTransmissionRole, event: Subject<TransportEvent>): void
 }
 }
 
 
 export interface FisMessage {
 export interface FisMessage {
@@ -80,7 +79,7 @@ export interface RequestResponseProfile extends TransmissionProfile {
 
 
 }
 }
 export interface TransmissionMessage {
 export interface TransmissionMessage {
-    receiverId: string,
+    adapterId: string,
     payload: FisMessage
     payload: FisMessage
 }
 }
 
 
@@ -91,14 +90,8 @@ export enum Bus {
     NotificationMessageBus
     NotificationMessageBus
 }
 }
 
 
-export interface TransmissionEvent {
-    id: string,
-    event: 'New Receiver' | 'New Server' | 'Incoming Message' | `Outgoing Message`,
-    data: any
-}
-
 export interface EventMessage {
 export interface EventMessage {
-    channelId: string,
+    adapterId: string,
     message: string,
     message: string,
-    payload: any
+    payload?: any
 }
 }

+ 54 - 0
src/test/receiver.ts

@@ -0,0 +1,54 @@
+/*  This is to emulate another remote process also using socket io to connect.
+TEST: to see if it performs the necessary self check to identify itself, as well as 
+receiving all the notification and response messages */
+
+// Import the necessary modules
+import { io, Socket } from "socket.io-client";
+import { checkOwnClientInfo } from "../utils/socket.utils";
+
+class SocketClient {
+    private socket: Socket;
+    private receiverProfile!: any
+
+    constructor(url: string) {
+        // Connect to the serve
+        this.socket = io(url);
+
+        // Setup event listeners
+        this.socket.on("connect", () => {
+            console.log("Connected to server with ID:", this.socket.id);
+        });
+
+        if (this.receiverProfile) {
+            checkOwnClientInfo(this.receiverProfile.id).then((data) => {
+                this.socket.emit('profile', { name: 'Existing Profile',  data: data })
+            })
+        } else {
+            this.socket.emit('profile', { name: 'New Profile', data: null })
+        }
+
+        this.socket.on(`profile`, (info) => {
+            // logic here
+        })
+
+
+        this.socket.on("message", (data: any) => {
+            console.log("Received message:", data);
+        });
+
+        this.socket.on("disconnect", () => {
+            console.log("Disconnected from server");
+        });
+    }
+
+    // Emit a message to the server
+    public sendMessage(event: string, message: any) {
+        this.socket.emit(event, message);
+    }
+}
+
+// Usage example:
+const client = new SocketClient("http://localhost:3000");
+
+// Send a message
+client.sendMessage("message", "Hello, server!");

+ 117 - 29
src/test/transmitter.ts

@@ -1,54 +1,142 @@
-import { Observable, Subject } from "rxjs";
-import { FisAppActor, FisMessage, MessageTransmission, TransmissionProfile } from "../interface/transport.interface";
-import dotenv from 'dotenv';
+import { filter, interval, map, Observable, Observer, Subject, take } from "rxjs";
+import { Bus, EventMessage, FisMessage, MessageTransmission, TransmissionMessage } from "../interface/transport.interface";
+import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
+import { error } from "console";
+import e from "express";
+import { TransportEvent } from "../interface/connector.interface";
 
 
 /*  These are the purple fonts. Gonna interact with blue fonts to set up the credentials to establish the necessary roles.
 /*  These are the purple fonts. Gonna interact with blue fonts to set up the credentials to establish the necessary roles.
 Assuming the primary role is server. That means we will need transmitter and multiple receiver profiles that are connected. */
 Assuming the primary role is server. That means we will need transmitter and multiple receiver profiles that are connected. */
-class Application implements FisAppActor {
-    incomingMessageBus: Subject<any> = new Subject()
-    outgoingMessageBus: Subject<any> = new Subject()
+class Application {
     messageTransmissionManager: MessageTransmissionManager
     messageTransmissionManager: MessageTransmissionManager
     transmissionInstance!: MessageTransmission
     transmissionInstance!: MessageTransmission
+    generalNotification: Subject<FisMessage> = new Subject()
 
 
     constructor() {
     constructor() {
         this.messageTransmissionManager = new MessageTransmissionManager()
         this.messageTransmissionManager = new MessageTransmissionManager()
         this.transmissionInstance = this.messageTransmissionManager.getTransmissionInstance()
         this.transmissionInstance = this.messageTransmissionManager.getTransmissionInstance()
 
 
-        //code here first, then refactor/clean it later.
-        this.incomingMessageBus.subscribe(item => {
-            this.transmissionInstance.transmitter.emit(item)
-        })
+        this.generateNotifcation().subscribe(this.generalNotification)
     }
     }
 
 
-    send(message: FisMessage): Observable<any> {
+    // Emulating request response. For the case where this transmitter is acting as a receiver
+    send(message: FisMessage): Observable<FisMessage> {
         return new Observable((response) => {
         return new Observable((response) => {
-            this.outgoingMessageBus.next(message)
-            this.incomingMessageBus.subscribe({
-                next: (message: FisMessage) => {
-                    if (message.header.messageID == message.header.messageID) {
-                        response.next(message)
-                    }
-                    if (message.header.messageID == message.header.messageID && message.data == 'Complete') {
-                        response.complete()
-                    }
-                },
-                error: error => response.error(error)
+            // logic here
+        })
+    }
+
+    // Transmission only
+    emit(message: FisMessage, adapterId: string): void {
+        this.transmissionInstance.transmitter.emit({
+            adapterId: adapterId, // this should mqatch the request ID??
+            payload: message
+        })
+    }
+
+    // Receiving only
+    susbcribe(): Observable<FisMessage> {
+        return new Observable((observer: Observer<any>) => {
+            this.transmissionInstance.receiver.getMessageBus(Bus.GeneralBus).subscribe((message: TransmissionMessage) => {
+                // logic here
+                this.appProcess(message.adapterId, message.payload)
+            })
+        })
+    }
+
+    // no request needed, auto broadcast 
+    subscribeForNewClientWhoWantsNotification(): void {
+        this.transmissionInstance.event.pipe(
+            filter(obj => obj.event == 'New Adapter')
+        ).subscribe((event: TransportEvent) => {
+            this.generalNotification.subscribe((message: FisMessage) => {
+                this.emit(message, (event.data as EventMessage).adapterId)
             })
             })
         })
         })
     }
     }
 
 
-    emit(message: FisMessage): void {
-        this.outgoingMessageBus.next(message)
+    // just assume that the provide will provide 10 responses messages 
+    appProcess(adapterId: string, message: FisMessage): void {
+        this.generateMessage(10).subscribe({
+            next: (message: FisMessage) => {
+                this.emit(message, adapterId)
+            },
+            error: error => console.error(error),
+            complete: () => console.log(`All responses generated completed and passed into adapter: ${adapterId}`)
+        })
     }
     }
 
 
-    emitStream(message: FisMessage): void {
-        this.outgoingMessageBus.next(message)
+    private generateMessage(amount: number): Observable<FisMessage> {
+        return new Observable((response: Observer<FisMessage>) => {
+            const intervalMessageGeneration = interval(1000).pipe(
+                take(amount), // Ensures only 'amount' messages are generated
+                map(() => {
+                    const message: FisMessage = {
+                        header: {
+                            messageID: uuidv4(),
+                            messageName: 'ResponseMessage'
+                        },
+                        data: `Data`
+                    };
+                    return message;
+                })
+            );
+
+            const subscription = intervalMessageGeneration.subscribe({
+                next: message => response.next(message),
+                error: error => response.error(error),
+                complete: () => {
+                    response.next({
+                        header: {
+                            messageID: uuidv4(),
+                            messageName: 'ResponseMessage'
+                        },
+                        data: `Complete`
+                    });
+                    response.complete();
+                }
+            });
+
+            // Ensure cleanup on unsubscribe
+            return () => subscription.unsubscribe();
+        });
     }
     }
 
 
-    subscribeMessages(messageFilter: any): Observable<FisMessage> {
-        throw new Error(`Unavailable for now....`)
+    private generateNotifcation(): Observable<FisMessage> {
+        return new Observable((response: Observer<FisMessage>) => {
+            const intervalMessageGeneration = interval(1000).pipe(
+                map(() => {
+                    const message: FisMessage = {
+                        header: {
+                            messageID: uuidv4(),
+                            messageName: 'ResponseMessage'
+                        },
+                        data: `Data`
+                    };
+                    return message;
+                })
+            );
+
+            const subscription = intervalMessageGeneration.subscribe({
+                next: message => response.next(message),
+                error: error => response.error(error),
+                complete: () => {
+                    response.next({
+                        header: {
+                            messageID: uuidv4(),
+                            messageName: 'NotificationMessage'
+                        },
+                        data: `Complete`
+                    });
+                    response.complete();
+                }
+            });
+
+            // Ensure cleanup on unsubscribe
+            return () => subscription.unsubscribe();
+        });
     }
     }
 }
 }
 
 
-const application = new Application()
+const application = new Application()

+ 10 - 11
src/transmission/msg.transmission.manager.ts

@@ -1,9 +1,9 @@
 import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
 import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
 import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { ConnectionManager } from "../connector/connector.manager";
 import { ConnectionManager } from "../connector/connector.manager";
-import { MessageTransmission, MessageTransmissionManager as MessageTransmissionManagerInterface, ReceiverProfile, TransmissionEvent, TransmissionProfile, TransmitterProfile } from "../interface/transport.interface";
+import { MessageTransmission, MessageTransmissionManager as MessageTransmissionManagerInterface, ReceiverProfile, TransmissionProfile, TransmitterProfile } from "../interface/transport.interface";
 import { v4 as uuidv4 } from 'uuid'
 import { v4 as uuidv4 } from 'uuid'
-import { AdaptorTransmissionRole, RequestResponseConnectionAdapter } from "../interface/connector.interface";
+import { AdaptorTransmissionRole, RequestResponseConnectionAdapter, TransportEvent } from "../interface/connector.interface";
 import { MessageTransmissionRequestResponse } from "./msg.transmission.request-response";
 import { MessageTransmissionRequestResponse } from "./msg.transmission.request-response";
 import { TransmitterConnectionAdapter } from "../connector/connector.transmitter";
 import { TransmitterConnectionAdapter } from "../connector/connector.transmitter";
 import { ReceiverConnectionAdapter } from "../connector/connector.receiver";
 import { ReceiverConnectionAdapter } from "../connector/connector.receiver";
@@ -19,10 +19,10 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
     messageTransmissionTransmitters: MessageTransmissionTransmitter[] = []
     messageTransmissionTransmitters: MessageTransmissionTransmitter[] = []
     messageTransmissionReceiver: MessageTransmissionReceiver[] = []
     messageTransmissionReceiver: MessageTransmissionReceiver[] = []
     messageTransmissionRequestResponse: MessageTransmissionRequestResponse[] = []
     messageTransmissionRequestResponse: MessageTransmissionRequestResponse[] = []
-    transmissionEvent: Subject<TransmissionEvent> = new Subject()
 
 
     constructor() {
     constructor() {
        // logic here
        // logic here
+       this.instantiateConnectionManager()
     }
     }
 
 
     /* so there will be some changes here. will nto be assigning just one, but all of them dynamically to pour into this boy
     /* so there will be some changes here. will nto be assigning just one, but all of them dynamically to pour into this boy
@@ -30,15 +30,14 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
     getTransmissionInstance(): MessageTransmission {
     getTransmissionInstance(): MessageTransmission {
         let transmitter: MessageTransmissionTransmitter = this.getTransmitter()
         let transmitter: MessageTransmissionTransmitter = this.getTransmitter()
         let receiver: MessageTransmissionReceiver = this.getReceiver()
         let receiver: MessageTransmissionReceiver = this.getReceiver()
+        let requestResponse: MessageTransmissionRequestResponse = this.getRequestResponse(transmitter, receiver)
         let transmission: MessageTransmission = {
         let transmission: MessageTransmission = {
             id: uuidv4(),
             id: uuidv4(),
-            receiverId: transmitter.getInfo().id,
-            transmitterId: receiver.getInfo().id,
             transmitter: transmitter,
             transmitter: transmitter,
             receiver: receiver,
             receiver: receiver,
-            requestResponse: this.getRequestResponse(transmitter, receiver)
+            requestResponse: requestResponse,
+            event: this.connectionManager.transportEvent.asObservable()
         }
         }
-        this.instantiateConnectionManager() // start an adapter
         // this.transmission.push(transmission)
         // this.transmission.push(transmission)
         return transmission
         return transmission
     }
     }
@@ -49,7 +48,7 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
             name: '', // for now make it empty. We will use the assigned uuid here
             name: '', // for now make it empty. We will use the assigned uuid here
             dateCreated: new Date()
             dateCreated: new Date()
         }
         }
-        return new MessageTransmissionTransmitter(transmitterProfile, AdaptorTransmissionRole.Transmitter, this.transmissionEvent)
+        return new MessageTransmissionTransmitter(transmitterProfile, AdaptorTransmissionRole.Transmitter, this.connectionManager.transportEvent)
     }
     }
 
 
     private getReceiver(): MessageTransmissionReceiver {
     private getReceiver(): MessageTransmissionReceiver {
@@ -58,15 +57,15 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
             name: '', // for now make it empty. We will use the assigned uuid here
             name: '', // for now make it empty. We will use the assigned uuid here
             dateCreated: new Date()
             dateCreated: new Date()
         }
         }
-        return new MessageTransmissionReceiver(receiverProfile, AdaptorTransmissionRole.Receiver, this.transmissionEvent)
+        return new MessageTransmissionReceiver(receiverProfile, AdaptorTransmissionRole.Receiver, this.connectionManager.transportEvent)
     }
     }
 
 
     private getRequestResponse(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver): MessageTransmissionRequestResponse {
     private getRequestResponse(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver): MessageTransmissionRequestResponse {
-        return new MessageTransmissionRequestResponse(transmitterInstance, receiverInstance, AdaptorTransmissionRole.RequestResponse, this.transmissionEvent)
+        return new MessageTransmissionRequestResponse(transmitterInstance, receiverInstance, AdaptorTransmissionRole.RequestResponse, this.connectionManager.transportEvent)
     }
     }
 
 
     private instantiateConnectionManager(): void {
     private instantiateConnectionManager(): void {
-        this.connectionManager = new ConnectionManager(this.transmissionEvent)
+        this.connectionManager = new ConnectionManager()
     }
     }
 
 
 }
 }

+ 15 - 12
src/transmission/msg.transmission.receiver.ts

@@ -1,7 +1,7 @@
 import { filter, map, Observable, Observer, PartialObserver, Subject, Subscriber, Subscription, takeWhile, Unsubscribable } from 'rxjs';
 import { filter, map, Observable, Observer, PartialObserver, Subject, Subscriber, Subscription, takeWhile, Unsubscribable } from 'rxjs';
 import { AdapterProfile, AdaptorTransmissionRole, TransportEvent, TransportMessage } from '../interface/connector.interface';
 import { AdapterProfile, AdaptorTransmissionRole, TransportEvent, TransportMessage } from '../interface/connector.interface';
 import { MessageTransmissionBase } from './msg.transmission.base';
 import { MessageTransmissionBase } from './msg.transmission.base';
-import { Bus, FisMessage, MessageReceiver as MessageReceiverInterface, ReceiverProfile, TransmissionEvent, TransmissionMessage, TransmitterProfile } from '../interface/transport.interface'
+import { Bus, FisMessage, MessageReceiver as MessageReceiverInterface, ReceiverProfile, TransmissionMessage, TransmitterProfile } from '../interface/transport.interface'
 import { ConnectionAdapter } from '../connector/connector.base';
 import { ConnectionAdapter } from '../connector/connector.base';
 import { v4 as uuidv4 } from 'uuid'
 import { v4 as uuidv4 } from 'uuid'
 import { ReceiverConnectionAdapter } from '../connector/connector.receiver';
 import { ReceiverConnectionAdapter } from '../connector/connector.receiver';
@@ -10,7 +10,7 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
     receiverProfile!: ReceiverProfile;
     receiverProfile!: ReceiverProfile;
     private incomingTransportMessage: Subject<TransportMessage> = new Subject()
     private incomingTransportMessage: Subject<TransportMessage> = new Subject()
 
 
-    constructor(profile: ReceiverProfile, role: AdaptorTransmissionRole, event: Observable<TransmissionEvent>) {
+    constructor(profile: ReceiverProfile, role: AdaptorTransmissionRole, event: Observable<TransportEvent>) {
         super();
         super();
 
 
         this.setReceiver(profile, role)
         this.setReceiver(profile, role)
@@ -22,15 +22,18 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
         this.transmissionRole = role
         this.transmissionRole = role
     }
     }
 
 
-    getMessageBus(bus: Bus): Observable<any> {
-        return new Observable((observable: Observer<any>) => {
+    getMessageBus(bus: Bus): Observable<TransmissionMessage> {
+        return new Observable((observable: Observer<TransmissionMessage>) => {
             // logic here
             // logic here
-            if (bus == Bus.ResponseMessageBus) {
+            if (bus == Bus.GeneralBus) {
                 const subscription: Subscription = this.incomingTransportMessage.pipe(
                 const subscription: Subscription = this.incomingTransportMessage.pipe(
-                    filter((message: TransportMessage) => message. === `Incoming Message`),
-                    map(message => message.data as TransportMessage)
-                ).subscribe((message) => {
-                    observable.next(message);
+                    filter((message: TransportMessage) => message.payload === `Incoming Message`),
+                    map(message => message.payload as TransportMessage)
+                ).subscribe((message: TransportMessage) => {
+                    observable.next({
+                        adapterId: message.target as string, // assign adapter id so that it knows who to reply back
+                        payload: message.payload
+                    });
                 });
                 });
 
 
                 // Clean up on unsubscription
                 // Clean up on unsubscription
@@ -41,10 +44,10 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
         })
         })
     }
     }
 
 
-    private handleEvent(event: Observable<TransmissionEvent>): void {
+    private handleEvent(event: Observable<TransportEvent>): void {
         event.pipe(
         event.pipe(
-            filter((obj: TransmissionEvent) => obj.event == `New Receiver`),
-            map((obj: TransmissionEvent) => (obj.data as ReceiverConnectionAdapter))
+            filter((obj: TransportEvent) => obj.event == `New Adapter`),
+            map((obj: TransportEvent) => (obj.data as ReceiverConnectionAdapter))
         ).subscribe((adapter: ReceiverConnectionAdapter) => {
         ).subscribe((adapter: ReceiverConnectionAdapter) => {
             this.adaptorsArray.push({ id: adapter.getInfo().id, adapter: adapter } as unknown as AdapterProfile)
             this.adaptorsArray.push({ id: adapter.getInfo().id, adapter: adapter } as unknown as AdapterProfile)
             adapter.getMessageBus(Bus.GeneralBus).subscribe(this.incomingTransportMessage)
             adapter.getMessageBus(Bus.GeneralBus).subscribe(this.incomingTransportMessage)

+ 3 - 3
src/transmission/msg.transmission.request-response.ts

@@ -1,8 +1,8 @@
 import { MessageTransmissionBase } from "./msg.transmission.base";
 import { MessageTransmissionBase } from "./msg.transmission.base";
-import { FisMessage, MessageReceiver, MessageRequestResponse as MessageRequestResponseInterface, TransmissionEvent } from '../interface/transport.interface'
+import { FisMessage, MessageReceiver, MessageRequestResponse as MessageRequestResponseInterface } from '../interface/transport.interface'
 import { filter, Observable, Observer, Subscription, takeWhile } from "rxjs";
 import { filter, Observable, Observer, Subscription, takeWhile } from "rxjs";
 import { v4 as uuidv4 } from 'uuid'
 import { v4 as uuidv4 } from 'uuid'
-import { AdaptorTransmissionRole, RequestResponseConnectionAdapter } from "../interface/connector.interface";
+import { AdaptorTransmissionRole, RequestResponseConnectionAdapter, TransportEvent } from "../interface/connector.interface";
 import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
 import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
 
 
@@ -10,7 +10,7 @@ export class MessageTransmissionRequestResponse extends MessageTransmissionBase
     transmitterInstance!: MessageTransmissionTransmitter;
     transmitterInstance!: MessageTransmissionTransmitter;
     receiverInstance!: MessageTransmissionReceiver;
     receiverInstance!: MessageTransmissionReceiver;
 
 
-    constructor(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver, role: AdaptorTransmissionRole, event: Observable<TransmissionEvent>) {
+    constructor(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver, role: AdaptorTransmissionRole, event: Observable<TransportEvent>) {
         super()
         super()
         this.setTransmissionProfile(transmitterInstance, receiverInstance, role)
         this.setTransmissionProfile(transmitterInstance, receiverInstance, role)
     }
     }

+ 9 - 9
src/transmission/msg.transmission.transmitter.ts

@@ -1,6 +1,6 @@
 import { MessageTransmissionBase } from "./msg.transmission.base";
 import { MessageTransmissionBase } from "./msg.transmission.base";
-import { FisMessage, MessageTransmitter as MessageTransmitterInterface, TransmissionEvent, TransmissionMessage, TransmitterProfile } from '../interface/transport.interface'
-import { AdapterProfile, AdaptorTransmissionRole, ConnectionState, TransportMessage, TransportService } from "../interface/connector.interface";
+import { FisMessage, MessageTransmitter as MessageTransmitterInterface, TransmissionMessage, TransmitterProfile } from '../interface/transport.interface'
+import { AdapterProfile, AdapterSet, AdaptorTransmissionRole, ConnectionState, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
 import { v4 as uuidv4 } from 'uuid'
 import { v4 as uuidv4 } from 'uuid'
 import { TransmitterConnectionAdapter } from "../connector/connector.transmitter";
 import { TransmitterConnectionAdapter } from "../connector/connector.transmitter";
 import { BehaviorSubject, filter, map, Observable } from "rxjs";
 import { BehaviorSubject, filter, map, Observable } from "rxjs";
@@ -10,7 +10,7 @@ connectors or adapters will have their own identifier*/
 export class MessageTransmissionTransmitter extends MessageTransmissionBase implements MessageTransmitterInterface {
 export class MessageTransmissionTransmitter extends MessageTransmissionBase implements MessageTransmitterInterface {
     transmitterProfile!: TransmitterProfile;
     transmitterProfile!: TransmitterProfile;
 
 
-    constructor(profile: TransmitterProfile, role: AdaptorTransmissionRole, event: Observable<TransmissionEvent>) {
+    constructor(profile: TransmitterProfile, role: AdaptorTransmissionRole, event: Observable<TransportEvent>) {
         super()
         super()
         this.setTransmitter(profile, role)
         this.setTransmitter(profile, role)
 
 
@@ -24,18 +24,18 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
 
 
     emit(message: TransmissionMessage): void {
     emit(message: TransmissionMessage): void {
         // logic here, emit based on message
         // logic here, emit based on message
-        let adapter: AdapterProfile | undefined = this.adaptorsArray.find(obj => obj.id == message.receiverId)
+        let adapter: AdapterProfile | undefined = this.adaptorsArray.find(obj => obj.id == message.adapterId)
         if(adapter) {
         if(adapter) {
             (adapter.adapter as TransmitterConnectionAdapter).emit(message)
             (adapter.adapter as TransmitterConnectionAdapter).emit(message)
         }
         }
     }
     }
 
 
-    private handleEvent(event: Observable<TransmissionEvent>): void {
+    private handleEvent(event: Observable<TransportEvent>): void {
         event.pipe(
         event.pipe(
-            filter((obj: TransmissionEvent) => obj.event == `New Receiver`),
-            map(obj => (obj as TransmissionEvent).data as TransmitterConnectionAdapter)
-        ).subscribe((adapter: TransmitterConnectionAdapter) => {
-            this.adaptorsArray.push({ id: adapter.getInfo().id, adapter: adapter, connectionState: new BehaviorSubject<ConnectionState>(`OFFLINE`) }) // refers to channel ID, which will be embedded in these messages that pass through
+            filter((obj: TransportEvent) => obj.event == `New Adapter`),
+            map(obj => (obj as TransportEvent).data as AdapterSet)
+        ).subscribe((adapter: AdapterSet) => {
+            this.adaptorsArray.push({ id: adapter.transmitterAdapter.getInfo().id, adapter: adapter.transmitterAdapter as TransmitterConnectionAdapter, connectionState: adapter.transmitterAdapter.connectionStateBus }) // refers to channel ID, which will be embedded in these messages that pass through
         })
         })
     }
     }
 }
 }

+ 8 - 1
src/utils/message.ordering.ts

@@ -1,5 +1,4 @@
 import { Subject, takeWhile } from "rxjs";
 import { Subject, takeWhile } from "rxjs";
-import { WrappedMessage } from "../interface/ITransport.interface";
 
 
 export function sortMessageBasedOnDate(array: WrappedMessage[]): WrappedMessage[] {
 export function sortMessageBasedOnDate(array: WrappedMessage[]): WrappedMessage[] {
     console.log(`Sorting ${array.length} messages....`)
     console.log(`Sorting ${array.length} messages....`)
@@ -27,3 +26,11 @@ export async function checkMessage(message: WrappedMessage, messageChecking: Sub
         }
         }
     })
     })
 }
 }
+
+
+export interface WrappedMessage {
+    timeReceived: Date,
+    payload: any
+    thisMessageID: string,
+    previousMessageID: string | null,
+}

+ 1 - 2
src/utils/retransmission.service.ts

@@ -1,7 +1,6 @@
 import { BehaviorSubject, buffer, distinctUntilChanged, from, Observable, Subject, takeWhile } from "rxjs";
 import { BehaviorSubject, buffer, distinctUntilChanged, from, Observable, Subject, takeWhile } from "rxjs";
 import { v4 as uuidV4 } from 'uuid';
 import { v4 as uuidV4 } from 'uuid';
-import { WrappedMessage } from "../interface/ITransport.interface";
-import { sortMessageBasedOnDate } from "./message.ordering";
+import { sortMessageBasedOnDate, WrappedMessage } from "./message.ordering";
 
 
 export class RetransmissionService {
 export class RetransmissionService {
     private currentMessageId!: string | null
     private currentMessageId!: string | null

+ 9 - 7
src/utils/socket.utils.ts

@@ -108,7 +108,7 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
                         id: uuidv4(),
                         id: uuidv4(),
                         event: `New Server`,
                         event: `New Server`,
                         data: {
                         data: {
-                            channelId: (data.message as ConnectedServerSocket).id,
+                            adapterId: (data.message as ConnectedServerSocket).id,
                             message: `New Websocket Channel ${(data.message as ConnectedServerSocket).id} established.`
                             message: `New Websocket Channel ${(data.message as ConnectedServerSocket).id} established.`
                         } as EventMessage
                         } as EventMessage
                     })
                     })
@@ -131,7 +131,7 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
                         id: uuidv4(),
                         id: uuidv4(),
                         event: 'Client Reconnected',
                         event: 'Client Reconnected',
                         data: {
                         data: {
-                            channelId: (data.message as ConnectedServerSocket).id,
+                            adapterId: (data.message as ConnectedServerSocket).id,
                             message: `Existing Websocket Channel ${(data.message as ConnectedServerSocket).id} re-established.`
                             message: `Existing Websocket Channel ${(data.message as ConnectedServerSocket).id} re-established.`
                         } as EventMessage
                         } as EventMessage
                     })
                     })
@@ -163,7 +163,7 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
                     id: uuidv4(),
                     id: uuidv4(),
                     event: `Client Disconnected`,
                     event: `Client Disconnected`,
                     data: {
                     data: {
-                        channelId: receiverProfileInfo.id,
+                        adapterId: receiverProfileInfo.id,
                         message: `Server for Channel ${receiverProfileInfo.id} disconnected.`
                         message: `Server for Channel ${receiverProfileInfo.id} disconnected.`
                     } as EventMessage
                     } as EventMessage
                 })
                 })
@@ -192,8 +192,8 @@ export function handleNewSocketClient(socket: SocketForConnectedClient, connecte
                     id: uuidv4(),
                     id: uuidv4(),
                     event: `New Client`,
                     event: `New Client`,
                     data: {
                     data: {
-                        channelId: clientInstance.id,
-                        message: `New Client Connected. Channel ID assigned: ${clientInstance.id}`,
+                        adapterId: clientInstance.id,
+                        message: `New Client Connected. Adapter ID assigned: ${clientInstance.id}`,
                         payload: clientInstance
                         payload: clientInstance
                     } as EventMessage
                     } as EventMessage
                 })
                 })
@@ -210,13 +210,15 @@ export function handleNewSocketClient(socket: SocketForConnectedClient, connecte
                 if (clientInstance) {
                 if (clientInstance) {
                     console.log(`Socket Client ${clientInstance.id} Found`)
                     console.log(`Socket Client ${clientInstance.id} Found`)
                     socket.emit('profile', { name: 'Adjusted Profile', message: clientInstance })
                     socket.emit('profile', { name: 'Adjusted Profile', message: clientInstance })
+                    // replace socket instance since the previous has been terminated
+                    clientInstance.socketInstance = socket
                     // need to start listening again, because it's assigned a different socket instance this time round
                     // need to start listening again, because it's assigned a different socket instance this time round
                     startListening(socket, clientInstance, event)
                     startListening(socket, clientInstance, event)
                     event.next({
                     event.next({
                         id: uuidv4(),
                         id: uuidv4(),
                         event: 'Client Reconnected',
                         event: 'Client Reconnected',
                         data: {
                         data: {
-                            channelId: clientInstance.id,
+                            adapterId: clientInstance.id,
                             message: `Client ${clientInstance.id} connection re-established`,
                             message: `Client ${clientInstance.id} connection re-established`,
                             payload: clientInstance
                             payload: clientInstance
                         } as EventMessage
                         } as EventMessage
@@ -254,7 +256,7 @@ export async function writeFile(data: ConnectedServerSocket, filename: string):
 
 
 
 
 // Check if filename exists. Return profile information if there's any
 // Check if filename exists. Return profile information if there's any
-export async function checkOwnClientInfo(filename: string): Promise<ConnectedServerSocket> {
+export async function checkOwnClientInfo(filename?: string): Promise<ConnectedServerSocket> {
     return new Promise((resolve, reject) => {
     return new Promise((resolve, reject) => {
         // Check if the file exists
         // Check if the file exists
         if (fs.existsSync(`${filename}.json`)) {
         if (fs.existsSync(`${filename}.json`)) {