Browse Source

additional changes. Setting up test case to test the overall

Enzo 1 week ago
parent
commit
764947d4b1

+ 31 - 0
doc/explanation.txt

@@ -0,0 +1,31 @@
+Currently the way it works is, tranport service, which is the websocket or http or whatever transport protocol that is selected will have a universal emit and subscribe public method 
+so, the concept of having bi directional will be somewhat weaved into every class object of said transport service. What it will do is that, when a client is connected, i mean it 
+doesn't have to be literally like in the case of http, we will assume a logical channel so to speak. So when a client is connected, the connected client will be assinged an ID,
+one that will be used by the connection manager to instantiate a set of adapter with the assigned id, to identify the client as well as the type of transport it used. 
+Essentially, a set of adapter is tied to one client and it's associated transport type.
+
+There the core of this transport interface, or at least that what I will choose to call it, will communicate with transmission manager, which in turns will be communicating with 
+connection manager to handle how the message is received and transmitted. The need for this is to allow the integration for re-transmission service, so that messagecan be 
+re-transmitted over, should there be an interuption of sorts. 
+
+Now, in this context, we are talking about a server that can have multiple client connected to it, so that means there is a need to manage multiple 'traffic' with these connectedc
+clietns. To remember their credentials so as to be able to handle retransmission should they lost connection for whatever reasons, while dealing with various type of transport.
+FOr now, the example provided is only using websocket at the moment, another transport sample, which is http will be prepared at a later stage when i get this websocket working,
+with multiple clients as well as the server acting as a receiver itself. 
+
+So, when an application starts, which assume to be the provider, will instantiate a transmission manager. The transmission manager will in turn instantiate a connection mnager ,
+and the connection manager will also instantiate a transport service specified at .env file. Once that is done, the connection manager will then subscribe to the transportevent 
+also instantiated within it's component, and be exposed to transmission manager as well, so that all the concerned parties that are subscribed to transport event, will know what's 
+happening. So, in this websocket's context: A client is connected, a transport event is reported and announced. Connection manager will listen to this type of event and 
+instantiate a set of adapters to cater for this specific client. It will keep a copy with it's class, and can be retrieved by transmission transmitter and receiver and request response
+class. They too will also be listening to new adapter set event so to speak, and store the reference pointer in it's own local memory array. 
+So here's how it is, the app which is 
+the producer, once calls for transmission manager to instantiate a set of transmission methods, which is the message transmitter, receiver and request response class, which doesn't 
+care what the underlying transport that is being used, will use the instantiated objects to perfrom all the available methods to transmit and receive message, even as far as 
+emulating request response behaviour. So, when the producer wants to send a message, it will send through the message transmitter instantiated earlier. By the way, forgot to mention,
+when a client is connected, an ID will be assinged to the client, the same id that was assigned is also assigned to the adapter, which makes it easier to be managed later as I continue
+to develop this point. I probably mentioned this a few paragraphs, but anyway, as I was sating the message transmitter will read the message and see what adapter it's using,
+and search through it's array adapter ref, to find the matching adapterID, and then using that adapter to pass along the message. The adapter tha was chosen would have already been
+mapped with it's associated transport service to send the message out to the respective clients. At least that whole gist of it. 
+With that in mind, the retransmission service will sit at the adapter side, since each adapter is tied to one client and it's respective transport type, so it's easier to manage.
+All events or activities will go through one, well "global" subject, which is the transport event instantiated at connection manager.

+ 6 - 2
src/connector/connector.base.ts

@@ -1,12 +1,13 @@
 import { BehaviorSubject, Observable, Observer, Subject, Subscriber, Unsubscribable } from "rxjs";
 import dotenv from 'dotenv';
-import { AdapterProfile, AdaptorTransmissionRole, ConnectionAdaptorBase, ConnectionState, TransportMessage } from "../interface/connector.interface";
+import { AdapterProfile, AdaptorTransmissionRole, ConnectionAdaptorBase, ConnectionState, TransportEvent, TransportMessage } from "../interface/connector.interface";
 import { Bus, FisAppActor, FisMessage } from "../interface/transport.interface";
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
 export class ConnectionAdapter implements ConnectionAdaptorBase {
+    event!: Subject<TransportEvent>
     connector: any;
     connectorProfile!: AdapterProfile;
     connectionStateBus!: BehaviorSubject<ConnectionState>;
@@ -15,6 +16,9 @@ export class ConnectionAdapter implements ConnectionAdaptorBase {
     constructor(port?: number, url?: string) {
         if (!port && !url) console.error(`No role has assigned for transport service due to lack of information provided!`)
     }
+    setAdapterProfile(adapterId: string): void {
+        throw new Error("Method not implemented.")
+    }
     getInfo(): any {
         throw new Error("Method not implemented.");
     }
@@ -32,7 +36,7 @@ export class ConnectionAdapter implements ConnectionAdaptorBase {
     }
     getMessageBus(bus: Bus): Observable<any> {
         throw new Error("Method not implemented.");
-    }    
+    }
     subscribeMessages(messageFilter: any): Observable<any> {
         throw new Error("Method not implemented.");
     }

+ 38 - 35
src/connector/connector.manager.ts

@@ -1,9 +1,9 @@
 import { v4 as uuidv4 } from 'uuid'
 import dotenv from 'dotenv';
-import { TransportEvent, ConnectionManager as ConnectionManagerInterface, ConnectionSet, Transport, TransportService } from "../interface/connector.interface"
+import { TransportEvent, ConnectionManager as ConnectionManagerInterface, AdapterSet, Transport, TransportService } from "../interface/connector.interface"
 import { TransmitterConnectionAdapter } from './connector.transmitter'
 import { ReceiverConnectionAdapter } from './connector.receiver'
-import { TransmissionEvent } from '../interface/transport.interface'
+import { EventMessage } from '../interface/transport.interface'
 import { Subject } from 'rxjs'
 import { WebsocketTransportService } from '../transport/websocket'
 import { RequestResponseConnectionAdapter } from './connector.request.response'
@@ -14,50 +14,29 @@ the transmission side, must already be configured and locked to their respective
 as well as request response. Let's go along with this line for now. */
 export class ConnectionManager implements ConnectionManagerInterface {
     transportService!: TransportService;
-    transmissionEvent: Subject<TransmissionEvent> = new Subject()
     transportEvent: Subject<TransportEvent> = new Subject() // Every event goes through this boy, and it will be expose across differnet components
-    transmissionSet: ConnectionSet[] = []
+    adapterSet: AdapterSet[] = []
 
-    constructor(messageTransmissionEvent: Subject<TransmissionEvent>) {
-        this.transmissionEvent = messageTransmissionEvent 
+    constructor() {
         // logic here
         this.setUpServer(process.env.PORT as unknown as number, this.getTransportService(process.env.Transport as unknown as Transport))
 
-        // will also have information on events happening on transmission side
-        this.transmissionEvent.subscribe((event: TransmissionEvent) => {
-            console.log(`TransmissionEvent: `, event)
-        })
         // Everything that happens will go through here. Can also plug in logging service here at a later time
         this.transportEvent.subscribe((event: TransportEvent) => {
             console.log(`TranpsortEvent: `, event)
-        })
-    }
-
-    /* All three methods that return their respective instances will have it's activities reported through transportEvent */
-    getTransmitterConnectionAdapter(): TransmitterConnectionAdapter {
-        if (this.transportService) {
-            let adapter: TransmitterConnectionAdapter = new TransmitterConnectionAdapter(this.transportService)
-            return adapter
-        } else {
-            throw new Error(`Transmitter Transport NOT initialized.`)
-        }
-    }
 
-    getReceiverConnectionAdapter(): ReceiverConnectionAdapter {
-        if (this.transportService) {
-            let adapter: ReceiverConnectionAdapter = new ReceiverConnectionAdapter(this.transportService)
-            return adapter
-        } else {
-            throw new Error(`Receiver Transport NOT initialized.`)
-        }
+            if (event.event == 'New Client') {
+                this.handleNewClient((event.data as EventMessage).payload) // payload is connectedclientInstance
+            }
+        })
     }
 
-    getRequestResponseConnectionAdapter(transmitterAdapter: TransmitterConnectionAdapter, receiverAdapter: ReceiverConnectionAdapter): RequestResponseConnectionAdapter {
-        if (this.transportService) {
-            let adapter: RequestResponseConnectionAdapter = new RequestResponseConnectionAdapter(transmitterAdapter, receiverAdapter)
-            return adapter
+    getAdapterSetById(adapterSetid: string): AdapterSet {
+        let adapterSet: AdapterSet | undefined = this.adapterSet.find(obj => obj.id === adapterSetid)
+        if (adapterSet) {
+            return adapterSet
         } else {
-            throw new Error(`Request Response Transport NOT initialized.`)
+            throw new Error(`Adapter Set not found!!!`)
         }
     }
 
@@ -98,4 +77,28 @@ export class ConnectionManager implements ConnectionManagerInterface {
             //  To be Enhanced at a later time
         }
     }
-}
+
+    private handleNewClient(clientInstance: any): void {
+        // need to setup the adapters to be called by the transmission manager
+        let transmitter: TransmitterConnectionAdapter = new TransmitterConnectionAdapter(this.transportService, clientInstance.id, this.transportEvent)
+        let receiver: ReceiverConnectionAdapter = new ReceiverConnectionAdapter(this.transportService, clientInstance.id, this.transportEvent)
+        let requestResponse: RequestResponseConnectionAdapter = new RequestResponseConnectionAdapter(transmitter, receiver, clientInstance.id)
+        let adapterSet: AdapterSet = {
+            id: clientInstance.id,
+            dateCreated: new Date(),
+            transmitterAdapter: transmitter,
+            receiverAdapter: receiver,
+            requestResponsAdapter: requestResponse
+        }
+        this.adapterSet.push(adapterSet)
+
+        this.transportEvent.next({
+            id: uuidv4(),
+            event: `New Adapter`,
+            data: {
+                adapterId: adapterSet.id,
+                message: `New Adapter Set${adapterSet.id} has been added`,
+            } as EventMessage
+        })
+    }
+}

+ 20 - 5
src/connector/connector.receiver.ts

@@ -3,23 +3,35 @@ import { Bus, FisAppActor, FisMessage } from "../interface/transport.interface";
 import { ConnectionAdapter } from "./connector.base";
 import { AdaptorTransmissionRole, ConnectionState, ReceiverConnectionAdapter as ReceiverConnectionAdapterInterface, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
 import { BehaviorSubject, filter, map, Observable, Observer, Subject, Subscription, takeWhile } from 'rxjs';
+import { v4 as uuidv4 } from 'uuid'
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
 export class ReceiverConnectionAdapter extends ConnectionAdapter implements ReceiverConnectionAdapterInterface {
-    event: Subject<TransportEvent> = new Subject()
+    event!: Subject<TransportEvent>
 
-    constructor(adapter: TransportService) {
+    constructor(adapter: TransportService, adapterId: string, event: Subject<TransportEvent>) {
         super()
         // logic here
+        this.event = event
         this.adaptorTransmissionRole = AdaptorTransmissionRole.Receiver
         this.connector = adapter
+
+        this.setAdapterProfile(adapterId)
+    }
+
+    setAdapterProfile(adapterId: string): void {
+        this.connectorProfile = {
+            id: adapterId,
+            connectionState: this.connectionStateBus,
+            adapter: this.connector
+        }
     }
 
     getMessageBus(bus: Bus): Observable<any> {
         return new Observable((observable: Observer<any>) => {
-            if (bus == Bus.ResponseMessageBus) {
+            if (bus == Bus.GeneralBus) {
                 const subscription: Subscription = this.event.pipe(
                     filter((message: TransportEvent) => message.event === 'New Message'),
                     // take message only specific for this adapter. Although that itself wouldn't be necessary, considerng everything goes through transportEvent. I guess it's for better management
@@ -31,8 +43,8 @@ export class ReceiverConnectionAdapter extends ConnectionAdapter implements Rece
                         }
                         return shouldTake;
                     }),
-                    map(message => (message.data as TransportMessage).payload as FisMessage)
-                ).subscribe((message: FisMessage) => {
+                    map(message => message.data as TransportMessage)
+                ).subscribe((message: TransportMessage) => {
                     observable.next(message);
                 });
     
@@ -41,6 +53,9 @@ export class ReceiverConnectionAdapter extends ConnectionAdapter implements Rece
                     subscription.unsubscribe();
                 };
             }
+            if (bus == Bus.ResponseMessageBus) {
+                /// logic here
+            }
             if (bus == Bus.NotificationMessageBus) {
                 /// logic here
             }

+ 38 - 5
src/connector/connector.request.response.ts

@@ -1,23 +1,34 @@
 import dotenv from 'dotenv';
-import { FisAppActor, FisMessage, TransmissionMessage } from "../interface/transport.interface";
+import { Bus, FisAppActor, FisMessage, TransmissionMessage } from "../interface/transport.interface";
 import { ConnectionAdapter } from "./connector.base";
 import { RequestResponseConnectionAdapter as RequestResponseConnectionAdapterInterface, TransportEvent, TransportMessage } from "../interface/connector.interface";
 import { TransmitterConnectionAdapter } from './connector.transmitter';
 import { ReceiverConnectionAdapter } from './connector.receiver';
-import { Observable, Observer, Subject } from 'rxjs';
+import { filter, map, Observable, Observer, Subject, Subscription, takeWhile } from 'rxjs';
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
 export class RequestResponseConnectionAdapter extends ConnectionAdapter implements RequestResponseConnectionAdapterInterface {
-    event: Subject<TransportEvent> = new Subject()
-    incomingResponses: Subject<TransportMessage> = new Subject()
+    private transmitterAdapter!: TransmitterConnectionAdapter
+    private receiverAdapter!: ReceiverConnectionAdapter
 
-    constructor(transmitterAdapter: TransmitterConnectionAdapter, receiverAdapter: ReceiverConnectionAdapter) {
+    constructor(transmitterAdapter: TransmitterConnectionAdapter, receiverAdapter: ReceiverConnectionAdapter, adapterId: string) {
         super()
         // logic here
+        this.transmitterAdapter = transmitterAdapter
+        this.receiverAdapter = receiverAdapter
+        this.setAdapterProfile(adapterId)
     }
     
+    setAdapterProfile(adapterId: string): void {
+        this.connectorProfile = {
+            id: adapterId,
+            connectionState: this.connectionStateBus,
+            adapter: this.connector
+        }    
+    }
+
     emit(message: TransmissionMessage): void {
         throw new Error('Method not implemented.');
     }
@@ -28,8 +39,30 @@ export class RequestResponseConnectionAdapter extends ConnectionAdapter implemen
     send(message: TransmissionMessage): Observable<TransportMessage> {
         return new Observable((response: Observer<TransportMessage>) => {
             // logic here
+            this.transmitterAdapter.emit(message)
+            const subscription: Subscription = this.event.pipe(
+                filter((message: TransportEvent) => message.event === 'New Message'),
+                // take message only specific for this adapter. Although that itself wouldn't be necessary, considerng everything goes through transportEvent. I guess it's for better management
+                filter((message:TransportEvent) => (message.data as TransportMessage).target == this.connectorProfile.id), 
+                takeWhile((message: TransportEvent) => {
+                    const shouldTake = ((message.data as TransportMessage).payload as FisMessage).data !== 'Complete';
+                    if (!shouldTake) {
+                        response.complete();  // Ensure the observer is completed
+                    }
+                    return shouldTake;
+                }),
+                map(message => message.data as TransportMessage)
+            ).subscribe((message: TransportMessage) => {
+                response.next(message);
+            });
+
+            // Clean up on unsubscription
+            return () => {
+                subscription.unsubscribe();
+            };
         })
     }
+  
 }
 
 

+ 14 - 5
src/connector/connector.transmitter.ts

@@ -1,11 +1,9 @@
 import dotenv from 'dotenv';
 import { FisAppActor, FisMessage, TransmissionMessage } from "../interface/transport.interface";
 import { ConnectionAdapter } from "./connector.base";
-import { AdaptorTransmissionRole, ConnectionState, TransmitterConnectionAdapter as TransmitterConnectionAdapterInterface, Transport, TransportMessage, TransportService } from '../interface/connector.interface';
-import { BehaviorSubject, Observable, Observer } from 'rxjs';
+import { AdaptorTransmissionRole, ConnectionState, TransmitterConnectionAdapter as TransmitterConnectionAdapterInterface, Transport, TransportEvent, TransportMessage, TransportService } from '../interface/connector.interface';
+import { BehaviorSubject, Observable, Observer, Subject } from 'rxjs';
 import { v4 as uuidv4 } from 'uuid'
-import { WebsocketTransportService } from '../transport/websocket';
-import { HttpTransportService } from '../transport/http';
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
@@ -13,9 +11,20 @@ So how?: */
 export class TransmitterConnectionAdapter extends ConnectionAdapter implements TransmitterConnectionAdapterInterface {
     connectionStateBus: BehaviorSubject<ConnectionState> = new BehaviorSubject('OFFLINE' as ConnectionState) // this cannot work, because there will be alot of clients connected presumably
 
-    constructor(transportService: TransportService) {
+    constructor(transportService: TransportService, adapterId: string, event: Subject<TransportEvent>) {
         super()
         // logic here
+        this.event = event
+        this.connector = transportService
+        this.setAdapterProfile(adapterId)
+    }
+
+    setAdapterProfile(adapterId: string): void {
+        this.connectorProfile = {
+            id: adapterId,
+            connectionState: this.connectionStateBus,
+            adapter: this.connector
+        }
     }
 
     emitStream(message: TransmissionMessage): void {

+ 16 - 12
src/interface/connector.interface.ts

@@ -1,5 +1,5 @@
 import { BehaviorSubject, Observable, Subject } from "rxjs"
-import { Bus, FisAppActor, ReceiverProfile, TransmissionEvent, TransmissionMessage, TransmissionProfile, TransmitterProfile } from "./transport.interface"
+import { Bus, FisAppActor, ReceiverProfile, TransmissionMessage, TransmissionProfile, TransmitterProfile } from "./transport.interface"
 import { ConnectionAdapter } from "../connector/connector.base"
 
 
@@ -20,21 +20,19 @@ export interface AdapterProfile {
 export interface ConnectionManager {
     transportService: TransportService
     // to get notified on what's going on in Transmission Manager
-    transmissionEvent: Subject<TransmissionEvent>
     transportEvent: Subject<TransportEvent>
     // list of connection
-    transmissionSet: ConnectionSet[]
+    adapterSet: AdapterSet[]
     // Called by transmission manager to have an instance of these adapters
-    getTransmitterConnectionAdapter(): TransmitterConnectionAdapter
-    getReceiverConnectionAdapter(): ReceiverConnectionAdapter
-    getRequestResponseConnectionAdapter(transmitterAdapter: TransmitterConnectionAdapter, receiverConnectionAdapter: ReceiverConnectionAdapter): RequestResponseConnectionAdapter
+    getAdapterSetById(adapterSetid: string): AdapterSet
 }
 
-export interface ConnectionAdaptorBase  {
+export interface ConnectionAdaptorBase {
     connector: TransportService // this one will refer to the actual tranpsort service like websocket and so on
     connectorProfile: AdapterProfile
     connectionStateBus: BehaviorSubject<ConnectionState>
     adaptorTransmissionRole: AdaptorTransmissionRole
+    event: Subject<TransportEvent>
 
     subscribeConnectionState(): Observable<ConnectionState>
     publishConnectionState(): void
@@ -42,6 +40,7 @@ export interface ConnectionAdaptorBase  {
     disconnect(): void
     getMessageBus(bus: Bus): Observable<any>
     getInfo(): any
+    setAdapterProfile(id: string): void
 }
 
 
@@ -51,7 +50,6 @@ export interface TransmitterConnectionAdapter extends ConnectionAdaptorBase {
 }
 
 export interface ReceiverConnectionAdapter extends ConnectionAdaptorBase {
-    event: Subject<TransportEvent>
     subscribeMessages(messageFilter: any): Observable<any>
 }
 
@@ -83,12 +81,9 @@ export interface TransportMessage {
 
 export interface TransportEvent {
     id: string,
-    event: 'Server Started' | 'New Client' | 'Client Disconnected' | 'Client Reconnected' | `Server Disconnected` | 'New Message' | `Notification` | `New Server` | `New Transport`,
+    event: 'Server Started' | 'New Client' | 'Client Disconnected' | 'Client Reconnected' | `Server Disconnected` | 'New Message' | `Notification` | `New Server` | `New Transport` | 'New Adapter',
     data: any
 }
-export interface ConnectionSet {
-    // TBD
-}
 
 export interface TransportService {
     getInfo(): any
@@ -99,4 +94,13 @@ export interface TransportService {
 
 export interface Info {
     transport: Transport
+}
+
+
+export interface AdapterSet {
+    id: string,
+    dateCreated: Date,
+    transmitterAdapter: TransmitterConnectionAdapter,
+    receiverAdapter: ReceiverConnectionAdapter,
+    requestResponsAdapter: RequestResponseConnectionAdapter
 }

+ 9 - 16
src/interface/transport.interface.ts

@@ -1,5 +1,5 @@
 import { Observable, Subject } from "rxjs";
-import { AdapterProfile, AdaptorTransmissionRole, RequestResponseConnectionAdapter } from "./connector.interface";
+import { AdapterProfile, AdaptorTransmissionRole, RequestResponseConnectionAdapter, TransportEvent } from "./connector.interface";
 import { MessageTransmissionTransmitter } from "../transmission/msg.transmission.transmitter";
 import { MessageTransmissionReceiver } from "../transmission/msg.transmission.receiver";
 
@@ -10,11 +10,10 @@ export interface MessageTransmissionManager {
 
 export interface MessageTransmission {
     id: string,
-    receiverId: string,
-    transmitterId: string,
     transmitter: MessageTransmissionTransmitter,
     receiver: MessageTransmissionReceiver,
-    requestResponse: MessageRequestResponse
+    requestResponse: MessageRequestResponse,
+    event: Observable<TransportEvent>
 }
 export interface FisAppActor {
     incomingMessageBus: Subject<any>
@@ -40,20 +39,20 @@ export interface MessageTransmissionBase extends FisAppActor {
 export interface MessageReceiver extends MessageTransmissionBase {
     receiverProfile: ReceiverProfile
 
-    setReceiver(receiverProfile: ReceiverProfile, role: AdaptorTransmissionRole, event: TransmissionEvent): void
+    setReceiver(receiverProfile: ReceiverProfile, role: AdaptorTransmissionRole, event: Subject<TransportEvent>): void
 }
 
 export interface MessageTransmitter extends MessageTransmissionBase {
     transmitterProfile: TransmitterProfile
 
-    setTransmitter(transmitterProfile: TransmitterProfile, role: AdaptorTransmissionRole, event: TransmissionEvent): void
+    setTransmitter(transmitterProfile: TransmitterProfile, role: AdaptorTransmissionRole, event: Subject<TransportEvent>): void
 }
 
 export interface MessageRequestResponse extends MessageTransmissionBase {
     transmitterInstance: MessageTransmissionTransmitter
     receiverInstance: MessageTransmissionReceiver
 
-    setTransmissionProfile(transmissionInfo: MessageTransmissionTransmitter, receiverInfo: MessageTransmissionReceiver, role: AdaptorTransmissionRole, event: TransmissionEvent): void
+    setTransmissionProfile(transmissionInfo: MessageTransmissionTransmitter, receiverInfo: MessageTransmissionReceiver, role: AdaptorTransmissionRole, event: Subject<TransportEvent>): void
 }
 
 export interface FisMessage {
@@ -80,7 +79,7 @@ export interface RequestResponseProfile extends TransmissionProfile {
 
 }
 export interface TransmissionMessage {
-    receiverId: string,
+    adapterId: string,
     payload: FisMessage
 }
 
@@ -91,14 +90,8 @@ export enum Bus {
     NotificationMessageBus
 }
 
-export interface TransmissionEvent {
-    id: string,
-    event: 'New Receiver' | 'New Server' | 'Incoming Message' | `Outgoing Message`,
-    data: any
-}
-
 export interface EventMessage {
-    channelId: string,
+    adapterId: string,
     message: string,
-    payload: any
+    payload?: any
 }

+ 54 - 0
src/test/receiver.ts

@@ -0,0 +1,54 @@
+/*  This is to emulate another remote process also using socket io to connect.
+TEST: to see if it performs the necessary self check to identify itself, as well as 
+receiving all the notification and response messages */
+
+// Import the necessary modules
+import { io, Socket } from "socket.io-client";
+import { checkOwnClientInfo } from "../utils/socket.utils";
+
+class SocketClient {
+    private socket: Socket;
+    private receiverProfile!: any
+
+    constructor(url: string) {
+        // Connect to the serve
+        this.socket = io(url);
+
+        // Setup event listeners
+        this.socket.on("connect", () => {
+            console.log("Connected to server with ID:", this.socket.id);
+        });
+
+        if (this.receiverProfile) {
+            checkOwnClientInfo(this.receiverProfile.id).then((data) => {
+                this.socket.emit('profile', { name: 'Existing Profile',  data: data })
+            })
+        } else {
+            this.socket.emit('profile', { name: 'New Profile', data: null })
+        }
+
+        this.socket.on(`profile`, (info) => {
+            // logic here
+        })
+
+
+        this.socket.on("message", (data: any) => {
+            console.log("Received message:", data);
+        });
+
+        this.socket.on("disconnect", () => {
+            console.log("Disconnected from server");
+        });
+    }
+
+    // Emit a message to the server
+    public sendMessage(event: string, message: any) {
+        this.socket.emit(event, message);
+    }
+}
+
+// Usage example:
+const client = new SocketClient("http://localhost:3000");
+
+// Send a message
+client.sendMessage("message", "Hello, server!");

+ 117 - 29
src/test/transmitter.ts

@@ -1,54 +1,142 @@
-import { Observable, Subject } from "rxjs";
-import { FisAppActor, FisMessage, MessageTransmission, TransmissionProfile } from "../interface/transport.interface";
-import dotenv from 'dotenv';
+import { filter, interval, map, Observable, Observer, Subject, take } from "rxjs";
+import { Bus, EventMessage, FisMessage, MessageTransmission, TransmissionMessage } from "../interface/transport.interface";
+import { v4 as uuidv4 } from 'uuid'
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
+import { error } from "console";
+import e from "express";
+import { TransportEvent } from "../interface/connector.interface";
 
 /*  These are the purple fonts. Gonna interact with blue fonts to set up the credentials to establish the necessary roles.
 Assuming the primary role is server. That means we will need transmitter and multiple receiver profiles that are connected. */
-class Application implements FisAppActor {
-    incomingMessageBus: Subject<any> = new Subject()
-    outgoingMessageBus: Subject<any> = new Subject()
+class Application {
     messageTransmissionManager: MessageTransmissionManager
     transmissionInstance!: MessageTransmission
+    generalNotification: Subject<FisMessage> = new Subject()
 
     constructor() {
         this.messageTransmissionManager = new MessageTransmissionManager()
         this.transmissionInstance = this.messageTransmissionManager.getTransmissionInstance()
 
-        //code here first, then refactor/clean it later.
-        this.incomingMessageBus.subscribe(item => {
-            this.transmissionInstance.transmitter.emit(item)
-        })
+        this.generateNotifcation().subscribe(this.generalNotification)
     }
 
-    send(message: FisMessage): Observable<any> {
+    // Emulating request response. For the case where this transmitter is acting as a receiver
+    send(message: FisMessage): Observable<FisMessage> {
         return new Observable((response) => {
-            this.outgoingMessageBus.next(message)
-            this.incomingMessageBus.subscribe({
-                next: (message: FisMessage) => {
-                    if (message.header.messageID == message.header.messageID) {
-                        response.next(message)
-                    }
-                    if (message.header.messageID == message.header.messageID && message.data == 'Complete') {
-                        response.complete()
-                    }
-                },
-                error: error => response.error(error)
+            // logic here
+        })
+    }
+
+    // Transmission only
+    emit(message: FisMessage, adapterId: string): void {
+        this.transmissionInstance.transmitter.emit({
+            adapterId: adapterId, // this should mqatch the request ID??
+            payload: message
+        })
+    }
+
+    // Receiving only
+    susbcribe(): Observable<FisMessage> {
+        return new Observable((observer: Observer<any>) => {
+            this.transmissionInstance.receiver.getMessageBus(Bus.GeneralBus).subscribe((message: TransmissionMessage) => {
+                // logic here
+                this.appProcess(message.adapterId, message.payload)
+            })
+        })
+    }
+
+    // no request needed, auto broadcast 
+    subscribeForNewClientWhoWantsNotification(): void {
+        this.transmissionInstance.event.pipe(
+            filter(obj => obj.event == 'New Adapter')
+        ).subscribe((event: TransportEvent) => {
+            this.generalNotification.subscribe((message: FisMessage) => {
+                this.emit(message, (event.data as EventMessage).adapterId)
             })
         })
     }
 
-    emit(message: FisMessage): void {
-        this.outgoingMessageBus.next(message)
+    // just assume that the provide will provide 10 responses messages 
+    appProcess(adapterId: string, message: FisMessage): void {
+        this.generateMessage(10).subscribe({
+            next: (message: FisMessage) => {
+                this.emit(message, adapterId)
+            },
+            error: error => console.error(error),
+            complete: () => console.log(`All responses generated completed and passed into adapter: ${adapterId}`)
+        })
     }
 
-    emitStream(message: FisMessage): void {
-        this.outgoingMessageBus.next(message)
+    private generateMessage(amount: number): Observable<FisMessage> {
+        return new Observable((response: Observer<FisMessage>) => {
+            const intervalMessageGeneration = interval(1000).pipe(
+                take(amount), // Ensures only 'amount' messages are generated
+                map(() => {
+                    const message: FisMessage = {
+                        header: {
+                            messageID: uuidv4(),
+                            messageName: 'ResponseMessage'
+                        },
+                        data: `Data`
+                    };
+                    return message;
+                })
+            );
+
+            const subscription = intervalMessageGeneration.subscribe({
+                next: message => response.next(message),
+                error: error => response.error(error),
+                complete: () => {
+                    response.next({
+                        header: {
+                            messageID: uuidv4(),
+                            messageName: 'ResponseMessage'
+                        },
+                        data: `Complete`
+                    });
+                    response.complete();
+                }
+            });
+
+            // Ensure cleanup on unsubscribe
+            return () => subscription.unsubscribe();
+        });
     }
 
-    subscribeMessages(messageFilter: any): Observable<FisMessage> {
-        throw new Error(`Unavailable for now....`)
+    private generateNotifcation(): Observable<FisMessage> {
+        return new Observable((response: Observer<FisMessage>) => {
+            const intervalMessageGeneration = interval(1000).pipe(
+                map(() => {
+                    const message: FisMessage = {
+                        header: {
+                            messageID: uuidv4(),
+                            messageName: 'ResponseMessage'
+                        },
+                        data: `Data`
+                    };
+                    return message;
+                })
+            );
+
+            const subscription = intervalMessageGeneration.subscribe({
+                next: message => response.next(message),
+                error: error => response.error(error),
+                complete: () => {
+                    response.next({
+                        header: {
+                            messageID: uuidv4(),
+                            messageName: 'NotificationMessage'
+                        },
+                        data: `Complete`
+                    });
+                    response.complete();
+                }
+            });
+
+            // Ensure cleanup on unsubscribe
+            return () => subscription.unsubscribe();
+        });
     }
 }
 
-const application = new Application()
+const application = new Application()

+ 10 - 11
src/transmission/msg.transmission.manager.ts

@@ -1,9 +1,9 @@
 import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
 import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { ConnectionManager } from "../connector/connector.manager";
-import { MessageTransmission, MessageTransmissionManager as MessageTransmissionManagerInterface, ReceiverProfile, TransmissionEvent, TransmissionProfile, TransmitterProfile } from "../interface/transport.interface";
+import { MessageTransmission, MessageTransmissionManager as MessageTransmissionManagerInterface, ReceiverProfile, TransmissionProfile, TransmitterProfile } from "../interface/transport.interface";
 import { v4 as uuidv4 } from 'uuid'
-import { AdaptorTransmissionRole, RequestResponseConnectionAdapter } from "../interface/connector.interface";
+import { AdaptorTransmissionRole, RequestResponseConnectionAdapter, TransportEvent } from "../interface/connector.interface";
 import { MessageTransmissionRequestResponse } from "./msg.transmission.request-response";
 import { TransmitterConnectionAdapter } from "../connector/connector.transmitter";
 import { ReceiverConnectionAdapter } from "../connector/connector.receiver";
@@ -19,10 +19,10 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
     messageTransmissionTransmitters: MessageTransmissionTransmitter[] = []
     messageTransmissionReceiver: MessageTransmissionReceiver[] = []
     messageTransmissionRequestResponse: MessageTransmissionRequestResponse[] = []
-    transmissionEvent: Subject<TransmissionEvent> = new Subject()
 
     constructor() {
        // logic here
+       this.instantiateConnectionManager()
     }
 
     /* so there will be some changes here. will nto be assigning just one, but all of them dynamically to pour into this boy
@@ -30,15 +30,14 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
     getTransmissionInstance(): MessageTransmission {
         let transmitter: MessageTransmissionTransmitter = this.getTransmitter()
         let receiver: MessageTransmissionReceiver = this.getReceiver()
+        let requestResponse: MessageTransmissionRequestResponse = this.getRequestResponse(transmitter, receiver)
         let transmission: MessageTransmission = {
             id: uuidv4(),
-            receiverId: transmitter.getInfo().id,
-            transmitterId: receiver.getInfo().id,
             transmitter: transmitter,
             receiver: receiver,
-            requestResponse: this.getRequestResponse(transmitter, receiver)
+            requestResponse: requestResponse,
+            event: this.connectionManager.transportEvent.asObservable()
         }
-        this.instantiateConnectionManager() // start an adapter
         // this.transmission.push(transmission)
         return transmission
     }
@@ -49,7 +48,7 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
             name: '', // for now make it empty. We will use the assigned uuid here
             dateCreated: new Date()
         }
-        return new MessageTransmissionTransmitter(transmitterProfile, AdaptorTransmissionRole.Transmitter, this.transmissionEvent)
+        return new MessageTransmissionTransmitter(transmitterProfile, AdaptorTransmissionRole.Transmitter, this.connectionManager.transportEvent)
     }
 
     private getReceiver(): MessageTransmissionReceiver {
@@ -58,15 +57,15 @@ export class MessageTransmissionManager implements MessageTransmissionManagerInt
             name: '', // for now make it empty. We will use the assigned uuid here
             dateCreated: new Date()
         }
-        return new MessageTransmissionReceiver(receiverProfile, AdaptorTransmissionRole.Receiver, this.transmissionEvent)
+        return new MessageTransmissionReceiver(receiverProfile, AdaptorTransmissionRole.Receiver, this.connectionManager.transportEvent)
     }
 
     private getRequestResponse(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver): MessageTransmissionRequestResponse {
-        return new MessageTransmissionRequestResponse(transmitterInstance, receiverInstance, AdaptorTransmissionRole.RequestResponse, this.transmissionEvent)
+        return new MessageTransmissionRequestResponse(transmitterInstance, receiverInstance, AdaptorTransmissionRole.RequestResponse, this.connectionManager.transportEvent)
     }
 
     private instantiateConnectionManager(): void {
-        this.connectionManager = new ConnectionManager(this.transmissionEvent)
+        this.connectionManager = new ConnectionManager()
     }
 
 }

+ 15 - 12
src/transmission/msg.transmission.receiver.ts

@@ -1,7 +1,7 @@
 import { filter, map, Observable, Observer, PartialObserver, Subject, Subscriber, Subscription, takeWhile, Unsubscribable } from 'rxjs';
 import { AdapterProfile, AdaptorTransmissionRole, TransportEvent, TransportMessage } from '../interface/connector.interface';
 import { MessageTransmissionBase } from './msg.transmission.base';
-import { Bus, FisMessage, MessageReceiver as MessageReceiverInterface, ReceiverProfile, TransmissionEvent, TransmissionMessage, TransmitterProfile } from '../interface/transport.interface'
+import { Bus, FisMessage, MessageReceiver as MessageReceiverInterface, ReceiverProfile, TransmissionMessage, TransmitterProfile } from '../interface/transport.interface'
 import { ConnectionAdapter } from '../connector/connector.base';
 import { v4 as uuidv4 } from 'uuid'
 import { ReceiverConnectionAdapter } from '../connector/connector.receiver';
@@ -10,7 +10,7 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
     receiverProfile!: ReceiverProfile;
     private incomingTransportMessage: Subject<TransportMessage> = new Subject()
 
-    constructor(profile: ReceiverProfile, role: AdaptorTransmissionRole, event: Observable<TransmissionEvent>) {
+    constructor(profile: ReceiverProfile, role: AdaptorTransmissionRole, event: Observable<TransportEvent>) {
         super();
 
         this.setReceiver(profile, role)
@@ -22,15 +22,18 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
         this.transmissionRole = role
     }
 
-    getMessageBus(bus: Bus): Observable<any> {
-        return new Observable((observable: Observer<any>) => {
+    getMessageBus(bus: Bus): Observable<TransmissionMessage> {
+        return new Observable((observable: Observer<TransmissionMessage>) => {
             // logic here
-            if (bus == Bus.ResponseMessageBus) {
+            if (bus == Bus.GeneralBus) {
                 const subscription: Subscription = this.incomingTransportMessage.pipe(
-                    filter((message: TransportMessage) => message. === `Incoming Message`),
-                    map(message => message.data as TransportMessage)
-                ).subscribe((message) => {
-                    observable.next(message);
+                    filter((message: TransportMessage) => message.payload === `Incoming Message`),
+                    map(message => message.payload as TransportMessage)
+                ).subscribe((message: TransportMessage) => {
+                    observable.next({
+                        adapterId: message.target as string, // assign adapter id so that it knows who to reply back
+                        payload: message.payload
+                    });
                 });
 
                 // Clean up on unsubscription
@@ -41,10 +44,10 @@ export class MessageTransmissionReceiver extends MessageTransmissionBase impleme
         })
     }
 
-    private handleEvent(event: Observable<TransmissionEvent>): void {
+    private handleEvent(event: Observable<TransportEvent>): void {
         event.pipe(
-            filter((obj: TransmissionEvent) => obj.event == `New Receiver`),
-            map((obj: TransmissionEvent) => (obj.data as ReceiverConnectionAdapter))
+            filter((obj: TransportEvent) => obj.event == `New Adapter`),
+            map((obj: TransportEvent) => (obj.data as ReceiverConnectionAdapter))
         ).subscribe((adapter: ReceiverConnectionAdapter) => {
             this.adaptorsArray.push({ id: adapter.getInfo().id, adapter: adapter } as unknown as AdapterProfile)
             adapter.getMessageBus(Bus.GeneralBus).subscribe(this.incomingTransportMessage)

+ 3 - 3
src/transmission/msg.transmission.request-response.ts

@@ -1,8 +1,8 @@
 import { MessageTransmissionBase } from "./msg.transmission.base";
-import { FisMessage, MessageReceiver, MessageRequestResponse as MessageRequestResponseInterface, TransmissionEvent } from '../interface/transport.interface'
+import { FisMessage, MessageReceiver, MessageRequestResponse as MessageRequestResponseInterface } from '../interface/transport.interface'
 import { filter, Observable, Observer, Subscription, takeWhile } from "rxjs";
 import { v4 as uuidv4 } from 'uuid'
-import { AdaptorTransmissionRole, RequestResponseConnectionAdapter } from "../interface/connector.interface";
+import { AdaptorTransmissionRole, RequestResponseConnectionAdapter, TransportEvent } from "../interface/connector.interface";
 import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
 
@@ -10,7 +10,7 @@ export class MessageTransmissionRequestResponse extends MessageTransmissionBase
     transmitterInstance!: MessageTransmissionTransmitter;
     receiverInstance!: MessageTransmissionReceiver;
 
-    constructor(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver, role: AdaptorTransmissionRole, event: Observable<TransmissionEvent>) {
+    constructor(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver, role: AdaptorTransmissionRole, event: Observable<TransportEvent>) {
         super()
         this.setTransmissionProfile(transmitterInstance, receiverInstance, role)
     }

+ 9 - 9
src/transmission/msg.transmission.transmitter.ts

@@ -1,6 +1,6 @@
 import { MessageTransmissionBase } from "./msg.transmission.base";
-import { FisMessage, MessageTransmitter as MessageTransmitterInterface, TransmissionEvent, TransmissionMessage, TransmitterProfile } from '../interface/transport.interface'
-import { AdapterProfile, AdaptorTransmissionRole, ConnectionState, TransportMessage, TransportService } from "../interface/connector.interface";
+import { FisMessage, MessageTransmitter as MessageTransmitterInterface, TransmissionMessage, TransmitterProfile } from '../interface/transport.interface'
+import { AdapterProfile, AdapterSet, AdaptorTransmissionRole, ConnectionState, TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
 import { v4 as uuidv4 } from 'uuid'
 import { TransmitterConnectionAdapter } from "../connector/connector.transmitter";
 import { BehaviorSubject, filter, map, Observable } from "rxjs";
@@ -10,7 +10,7 @@ connectors or adapters will have their own identifier*/
 export class MessageTransmissionTransmitter extends MessageTransmissionBase implements MessageTransmitterInterface {
     transmitterProfile!: TransmitterProfile;
 
-    constructor(profile: TransmitterProfile, role: AdaptorTransmissionRole, event: Observable<TransmissionEvent>) {
+    constructor(profile: TransmitterProfile, role: AdaptorTransmissionRole, event: Observable<TransportEvent>) {
         super()
         this.setTransmitter(profile, role)
 
@@ -24,18 +24,18 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
 
     emit(message: TransmissionMessage): void {
         // logic here, emit based on message
-        let adapter: AdapterProfile | undefined = this.adaptorsArray.find(obj => obj.id == message.receiverId)
+        let adapter: AdapterProfile | undefined = this.adaptorsArray.find(obj => obj.id == message.adapterId)
         if(adapter) {
             (adapter.adapter as TransmitterConnectionAdapter).emit(message)
         }
     }
 
-    private handleEvent(event: Observable<TransmissionEvent>): void {
+    private handleEvent(event: Observable<TransportEvent>): void {
         event.pipe(
-            filter((obj: TransmissionEvent) => obj.event == `New Receiver`),
-            map(obj => (obj as TransmissionEvent).data as TransmitterConnectionAdapter)
-        ).subscribe((adapter: TransmitterConnectionAdapter) => {
-            this.adaptorsArray.push({ id: adapter.getInfo().id, adapter: adapter, connectionState: new BehaviorSubject<ConnectionState>(`OFFLINE`) }) // refers to channel ID, which will be embedded in these messages that pass through
+            filter((obj: TransportEvent) => obj.event == `New Adapter`),
+            map(obj => (obj as TransportEvent).data as AdapterSet)
+        ).subscribe((adapter: AdapterSet) => {
+            this.adaptorsArray.push({ id: adapter.transmitterAdapter.getInfo().id, adapter: adapter.transmitterAdapter as TransmitterConnectionAdapter, connectionState: adapter.transmitterAdapter.connectionStateBus }) // refers to channel ID, which will be embedded in these messages that pass through
         })
     }
 }

+ 8 - 1
src/utils/message.ordering.ts

@@ -1,5 +1,4 @@
 import { Subject, takeWhile } from "rxjs";
-import { WrappedMessage } from "../interface/ITransport.interface";
 
 export function sortMessageBasedOnDate(array: WrappedMessage[]): WrappedMessage[] {
     console.log(`Sorting ${array.length} messages....`)
@@ -27,3 +26,11 @@ export async function checkMessage(message: WrappedMessage, messageChecking: Sub
         }
     })
 }
+
+
+export interface WrappedMessage {
+    timeReceived: Date,
+    payload: any
+    thisMessageID: string,
+    previousMessageID: string | null,
+}

+ 1 - 2
src/utils/retransmission.service.ts

@@ -1,7 +1,6 @@
 import { BehaviorSubject, buffer, distinctUntilChanged, from, Observable, Subject, takeWhile } from "rxjs";
 import { v4 as uuidV4 } from 'uuid';
-import { WrappedMessage } from "../interface/ITransport.interface";
-import { sortMessageBasedOnDate } from "./message.ordering";
+import { sortMessageBasedOnDate, WrappedMessage } from "./message.ordering";
 
 export class RetransmissionService {
     private currentMessageId!: string | null

+ 9 - 7
src/utils/socket.utils.ts

@@ -108,7 +108,7 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
                         id: uuidv4(),
                         event: `New Server`,
                         data: {
-                            channelId: (data.message as ConnectedServerSocket).id,
+                            adapterId: (data.message as ConnectedServerSocket).id,
                             message: `New Websocket Channel ${(data.message as ConnectedServerSocket).id} established.`
                         } as EventMessage
                     })
@@ -131,7 +131,7 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
                         id: uuidv4(),
                         event: 'Client Reconnected',
                         data: {
-                            channelId: (data.message as ConnectedServerSocket).id,
+                            adapterId: (data.message as ConnectedServerSocket).id,
                             message: `Existing Websocket Channel ${(data.message as ConnectedServerSocket).id} re-established.`
                         } as EventMessage
                     })
@@ -163,7 +163,7 @@ export function handleClientSocketConnection(socket: ClientSocket, serversConnec
                     id: uuidv4(),
                     event: `Client Disconnected`,
                     data: {
-                        channelId: receiverProfileInfo.id,
+                        adapterId: receiverProfileInfo.id,
                         message: `Server for Channel ${receiverProfileInfo.id} disconnected.`
                     } as EventMessage
                 })
@@ -192,8 +192,8 @@ export function handleNewSocketClient(socket: SocketForConnectedClient, connecte
                     id: uuidv4(),
                     event: `New Client`,
                     data: {
-                        channelId: clientInstance.id,
-                        message: `New Client Connected. Channel ID assigned: ${clientInstance.id}`,
+                        adapterId: clientInstance.id,
+                        message: `New Client Connected. Adapter ID assigned: ${clientInstance.id}`,
                         payload: clientInstance
                     } as EventMessage
                 })
@@ -210,13 +210,15 @@ export function handleNewSocketClient(socket: SocketForConnectedClient, connecte
                 if (clientInstance) {
                     console.log(`Socket Client ${clientInstance.id} Found`)
                     socket.emit('profile', { name: 'Adjusted Profile', message: clientInstance })
+                    // replace socket instance since the previous has been terminated
+                    clientInstance.socketInstance = socket
                     // need to start listening again, because it's assigned a different socket instance this time round
                     startListening(socket, clientInstance, event)
                     event.next({
                         id: uuidv4(),
                         event: 'Client Reconnected',
                         data: {
-                            channelId: clientInstance.id,
+                            adapterId: clientInstance.id,
                             message: `Client ${clientInstance.id} connection re-established`,
                             payload: clientInstance
                         } as EventMessage
@@ -254,7 +256,7 @@ export async function writeFile(data: ConnectedServerSocket, filename: string):
 
 
 // Check if filename exists. Return profile information if there's any
-export async function checkOwnClientInfo(filename: string): Promise<ConnectedServerSocket> {
+export async function checkOwnClientInfo(filename?: string): Promise<ConnectedServerSocket> {
     return new Promise((resolve, reject) => {
         // Check if the file exists
         if (fs.existsSync(`${filename}.json`)) {