Forráskód Böngészése

some refactoring but not done

Enzo 2 hete
szülő
commit
4b09f2547c

+ 7 - 24
src/connector/connector.base.ts

@@ -1,28 +1,21 @@
-import { Observable, Observer, Subject, Subscriber, Unsubscribable } from "rxjs";
+import { BehaviorSubject, Observable, Observer, Subject, Subscriber, Unsubscribable } from "rxjs";
 import dotenv from 'dotenv';
-import { AdaptorTransmissionRole, ConnectionAdaptorBase, ConnectionState } from "../interface/connector.interface";
+import { AdapterProfile, AdaptorTransmissionRole, ConnectionAdaptorBase, ConnectionState, TransportMessage } from "../interface/connector.interface";
 import { Bus, FisAppActor, FisMessage } from "../interface/transport.interface";
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
-export class ConnectionAdapter implements FisAppActor, ConnectionAdaptorBase {
-    incomingMessageBus: Subject<any> = new Subject()
-    outgoingMessageBus: Subject<any> = new Subject()
+export class ConnectionAdapter implements ConnectionAdaptorBase {
     connector: any;
-    connectorProfile: any;
-    connectionState!: ConnectionState;
-    connectionStateBus!: Observable<ConnectionState>;
+    connectorProfile!: AdapterProfile;
+    connectionStateBus!: BehaviorSubject<ConnectionState>;
     adaptorTransmissionRole!: AdaptorTransmissionRole;
 
     constructor(port?: number, url?: string) {
         if (!port && !url) console.error(`No role has assigned for transport service due to lack of information provided!`)
     }
 
-    getMessageBus(bus: Bus): Observable<any> {
-        throw new Error("Method not implemented.");
-    }
-
     subscribeConnectionState(): Observable<ConnectionState> {
         throw new Error("Method not implemented.");
     }
@@ -35,22 +28,12 @@ export class ConnectionAdapter implements FisAppActor, ConnectionAdaptorBase {
     disconnect(): void {
         throw new Error("Method not implemented.");
     }
-    send(message: any): Observable<any> {
-        throw new Error(`Method not implemented.`)
-    }
-    emit(message: any): void {
-        // logic here
-    }
-    emitStream(message: any): void {
+    getMessageBus(bus: Bus): Observable<any> {
         throw new Error("Method not implemented.");
-    }
+    }    
     subscribeMessages(messageFilter: any): Observable<any> {
         throw new Error("Method not implemented.");
     }
-    subscribe(subscriber: Subscriber<any>): Unsubscribable {
-        throw new Error("Method not implemented.");
-    }
-
 
 
 }

+ 58 - 8
src/connector/connector.manager.ts

@@ -1,23 +1,73 @@
 import { v4 as uuidv4 } from 'uuid'
-import { ConnectionManager as ConnectionManagerInterface, RequestResponseConnectionAdapter } from "../interface/connector.interface"
+import { ConnectionEvent, ConnectionManager as ConnectionManagerInterface, ConnectionSet, Transport } from "../interface/connector.interface"
 import { TransmitterConnectionAdapter } from './connector.transmitter'
 import { ReceiverConnectionAdapter } from './connector.receiver'
-import { TransmissionProfile } from '../interface/transport.interface'
+import { TransmissionEvent } from '../interface/transport.interface'
+import { Subject } from 'rxjs'
+import { WebsocketTransportService } from '../transport/websocket'
+import { RequestResponseConnectionAdapter } from './connector.request.response'
+import { HttpTransportService } from '../transport/http'
 
 export class ConnectionManager implements ConnectionManagerInterface {
+    transportService: any;
+    transmissionEvent: Subject<TransmissionEvent> = new Subject()
+    connectionEvent: Subject<ConnectionEvent> = new Subject()
+    transmissionSet: ConnectionSet[] = []
 
-    constructor(transmissionProfile: TransmissionProfile) {
+    constructor(messageTransmissionEvent: Subject<TransmissionEvent>) {
+        messageTransmissionEvent.subscribe(this.transmissionEvent)
         // logic here
+        this.getTransportService(process.env.Transport as unknown as Transport)
     }
-    getRequestResponseConnectionAdapter(): RequestResponseConnectionAdapter {
-        throw new Error('Method not implemented.')
-    }
+
     getTransmitterConnectionAdapter(): TransmitterConnectionAdapter {
-        throw new Error("Method not implemented.")
+        if (this.transportService) {
+            let adapter: TransmitterConnectionAdapter = new TransmitterConnectionAdapter(this.transportService)
+            return adapter
+        } else {
+            throw new Error(`Transmitter Transport NOT initialized.`)
+        }
     }
+
     getReceiverConnectionAdapter(): ReceiverConnectionAdapter {
-        throw new Error("Method not implemented.")
+        if (this.transportService) {
+            let adapter: ReceiverConnectionAdapter = new ReceiverConnectionAdapter(this.transportService)
+            return adapter
+        } else {
+            throw new Error(`Receiver Transport NOT initialized.`)
+        }
     }
 
+    getRequestResponseConnectionAdapter(transmitterAdapter: TransmitterConnectionAdapter, receiverAdapter: ReceiverConnectionAdapter): RequestResponseConnectionAdapter {
+        if (this.transportService) {
+            let adapter: RequestResponseConnectionAdapter = new RequestResponseConnectionAdapter(transmitterAdapter, receiverAdapter)
+            return adapter
+        } else {
+            throw new Error(`Request Response Transport NOT initialized.`)
+        }
+    }
 
+    getTransportService(transportType: Transport): any {
+        console.log(`Getting this to work ${transportType}`)
+        if (transportType == Transport.Websocket) {
+            this.transportService = new WebsocketTransportService()
+            this.connectionEvent.next({
+                id: uuidv4(),
+                event: 'Setup',
+                data: {
+                    message: `Setting up Websocket Transport Service`
+                }
+            })
+        }
+        if (transportType == Transport.Http) {
+            this.transportService = new HttpTransportService()
+            this.connectionEvent.next({
+                id: uuidv4(),
+                event: 'Setup',
+                data: {
+                    message: `Setting up Http Transport Service`
+                }
+            })
+        }
+    }
 }

+ 4 - 2
src/connector/connector.receiver.ts

@@ -1,12 +1,14 @@
 import dotenv from 'dotenv';
 import { FisAppActor, FisMessage } from "../interface/transport.interface";
 import { ConnectionAdapter } from "./connector.base";
-import { ReceiverConnectionAdapterBase } from "../interface/connector.interface";
+import { ConnectionState, ReceiverConnectionAdapter as ReceiverConnectionAdapterInterface } from "../interface/connector.interface";
+import { BehaviorSubject } from 'rxjs';
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
-export class ReceiverConnectionAdapter extends ConnectionAdapter implements FisAppActor, ReceiverConnectionAdapterBase {
+export class ReceiverConnectionAdapter extends ConnectionAdapter implements  ReceiverConnectionAdapterInterface {
+    connectionStateBus: BehaviorSubject<ConnectionState> = new BehaviorSubject('OFFLINE' as ConnectionState)
 
     constructor(url: string) {
         super()

+ 33 - 0
src/connector/connector.request.response.ts

@@ -0,0 +1,33 @@
+import dotenv from 'dotenv';
+import { FisAppActor, FisMessage } from "../interface/transport.interface";
+import { ConnectionAdapter } from "./connector.base";
+import { RequestResponseConnectionAdapter as RequestResponseConnectionAdapterInterface, TransportMessage } from "../interface/connector.interface";
+import { TransmitterConnectionAdapter } from './connector.transmitter';
+import { ReceiverConnectionAdapter } from './connector.receiver';
+import { Observable, Observer } from 'rxjs';
+
+dotenv.config();
+/* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
+So how?: */
+export class RequestResponseConnectionAdapter extends ConnectionAdapter implements RequestResponseConnectionAdapterInterface {
+
+    constructor(transmitterAdapter: TransmitterConnectionAdapter, receiverAdapter: ReceiverConnectionAdapter) {
+        super()
+        // logic here
+    }
+    
+    emit(message: TransportMessage): void {
+        throw new Error('Method not implemented.');
+    }
+    emitStream(message: TransportMessage): void {
+        throw new Error('Method not implemented.');
+    }
+
+    send(message: TransportMessage): Observable<TransportMessage> {
+        return new Observable((response: Observer<TransportMessage>) => {
+            // logic here
+        })
+    }
+}
+
+

+ 22 - 5
src/connector/connector.transmitter.ts

@@ -1,19 +1,36 @@
 import dotenv from 'dotenv';
-import { FisAppActor, FisMessage } from "../interface/transport.interface";
+import { FisAppActor, FisMessage, TransmissionMessage } from "../interface/transport.interface";
 import { ConnectionAdapter } from "./connector.base";
-import { TransmitterConnectionAdapterBase } from '../interface/connector.interface';
+import { AdaptorTransmissionRole, ConnectionState, TransmitterConnectionAdapter as TransmitterConnectionAdapterInterface, Transport, TransportMessage, TransportService } from '../interface/connector.interface';
+import { BehaviorSubject, Observable, Observer } from 'rxjs';
+import { v4 as uuidv4 } from 'uuid'
+import { WebsocketTransportService } from '../transport/websocket';
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
-export class TransmitterConnectionAdapter extends ConnectionAdapter implements FisAppActor, TransmitterConnectionAdapterBase {
+export class TransmitterConnectionAdapter extends ConnectionAdapter implements TransmitterConnectionAdapterInterface {
+    connectionStateBus: BehaviorSubject<ConnectionState> = new BehaviorSubject('OFFLINE' as ConnectionState)
 
-    constructor(port: number) {
+    constructor(transportService: TransportService) {
         super()
         // logic here
-        
+        this.connector = transportService
     }
 
+    emitStream(message: TransportMessage): void {
+        throw new Error('Method not implemented.');
+    }
+
+    emit(message: TransmissionMessage): void {
+        (this.connector as TransportService).emit({
+            id: uuidv4(),
+            dateCreated: new Date(),
+            transport: Transport.Websocket,
+            target: message.receiver?.id,
+            payload: message
+        } as TransportMessage)
+    }
 }
 
 

+ 49 - 23
src/interface/connector.interface.ts

@@ -1,5 +1,5 @@
-import { Observable, Observer, PartialObserver, Subscribable, Subscriber, Subscription, Unsubscribable } from "rxjs"
-import { Bus, FisAppActor, ReceiverProfile, TransmisionMessage, TransmissionProfile, TransmitterProfile } from "./transport.interface"
+import { BehaviorSubject, Observable, Subject } from "rxjs"
+import { Bus, FisAppActor, ReceiverProfile, TransmissionEvent, TransmissionProfile, TransmitterProfile } from "./transport.interface"
 
 
 export type TYPE = {
@@ -10,45 +10,49 @@ export type TYPE = {
 }
 
 export interface AdapterProfile {
-
+    id: string,
+    type: Transport
 }
 
 
 export interface ConnectionManager {
-    getTransmitterConnectionAdapter(): TransmitterConnectionAdapterBase
-    getReceiverConnectionAdapter(): ReceiverConnectionAdapterBase
-    getRequestResponseConnectionAdapter(): RequestResponseConnectionAdapter
+    transportService: any
+    // to get notified on what's going on in Transmission Manager
+    transmissionEvent: Subject<TransmissionEvent>
+    connectionEvent: Subject<ConnectionEvent>
+    // list of connection
+    transmissionSet: ConnectionSet[]
+    // Called by transmission manager to have an instance of these adapters
+    getTransmitterConnectionAdapter(): TransmitterConnectionAdapter
+    getReceiverConnectionAdapter(): ReceiverConnectionAdapter
+    getRequestResponseConnectionAdapter(transmitterAdapter: TransmitterConnectionAdapter, receiverConnectionAdapter: ReceiverConnectionAdapter): RequestResponseConnectionAdapter
 }
 
-export interface ConnectionAdaptorBase extends FisAppActor {
-    connector: any
-    connectorProfile: AdapterProfile | any
-    connectionState: ConnectionState
-    connectionStateBus: Observable<ConnectionState>
+export interface ConnectionAdaptorBase  {
+    connector: TransportService // this one will refer to the actual tranpsort service like websocket and so on
+    connectorProfile: AdapterProfile
+    connectionStateBus: BehaviorSubject<ConnectionState>
     adaptorTransmissionRole: AdaptorTransmissionRole
 
     subscribeConnectionState(): Observable<ConnectionState>
     publishConnectionState(): void
     connect(): void
     disconnect(): void
-    send(message: any): Observable<any>
-    emit(message: any): void
-    emitStream(message: any): void
-    subscribeMessages(messageFilter: any): Observable<any>
     getMessageBus(bus: Bus): Observable<any>
 }
 
 
-export interface TransmitterConnectionAdapterBase extends ConnectionAdaptorBase {
-
+export interface TransmitterConnectionAdapter extends ConnectionAdaptorBase {
+    emit(message: TransportMessage): void
+    emitStream(message: TransportMessage): void
 }
 
-export interface ReceiverConnectionAdapterBase extends ConnectionAdaptorBase {
-
+export interface ReceiverConnectionAdapter extends ConnectionAdaptorBase {
+    subscribeMessages(messageFilter: any): Observable<any>
 }
 
-export interface RequestResponseConnectionAdapter extends TransmitterConnectionAdapterBase, ReceiverConnectionAdapterBase {
-
+export interface RequestResponseConnectionAdapter extends TransmitterConnectionAdapter, ReceiverConnectionAdapter {
+    send(message: TransportMessage): Observable<TransportMessage>
 }
 
 export type ConnectionState = 'ONLINE' | 'OFFLINE'
@@ -63,10 +67,32 @@ export enum Transport {
     Grpc,
     Http
 }
+
+// TO be used for transmission at the trasport level
 export interface TransportMessage {
     id: string,
     dateCreated: Date,
     transport: Transport,
-    adapter: string,
-    payload: TransmisionMessage
+    target: string,
+    payload: any
+}
+
+export interface ConnectionEvent {
+    id: string,
+    event: 'Connection' | 'Setup',
+    data: any
+}
+
+export interface TransportEvent {
+    id: string,
+    event: 'Server Started' | 'New Client' | 'Client Disconnected' | 'Client Reconnected' | `Server Disconnected` | 'New Message' | `Notification`,
+    data: any
+}
+export interface ConnectionSet {
+    // TBD
+}
+
+export interface TransportService {
+    emit(message: TransportMessage): void
+    subscribe(): Observable<TransportEvent> //all messages and whatever event will go through this, easier to implemnet across different transport protocol
 }

+ 19 - 9
src/interface/transport.interface.ts

@@ -8,10 +8,10 @@ import { TransmitterConnectionAdapter } from "../connector/connector.transmitter
 
 export interface MessageTransmissionManager {
     // what the hell is this here for
-    getTransmissionInstance(): TransmissionProfile
+    getTransmissionInstance(): MessageTransmission
 }
 
-export interface TransmissionProfile {
+export interface MessageTransmission {
     id: string,
     receiverId: string,
     transmitterId: string,
@@ -37,6 +37,7 @@ export interface MessageTransmissionBase extends FisAppActor {
     adapterService: any // just use adapter Service, since it's grammatically closer to connectionadapter
 
     getMessageBus(bus: Bus): Observable<any>
+    getInfo(): TransmissionProfile
 }
 
 export interface MessageReceiver extends MessageTransmissionBase {
@@ -66,19 +67,22 @@ export interface FisMessage {
     data: any
 }
 
-
-export interface TransmitterProfile {
+export interface TransmissionProfile {
     id: string,
     name: string,
     dateCreated: Date
 }
 
-export interface ReceiverProfile {
-    id: string,
-    name: string,
-    dateCreated: Date,
+export interface TransmitterProfile extends TransmissionProfile {
+}
+
+export interface ReceiverProfile extends TransmissionProfile {
 }
-export interface TransmisionMessage {
+
+export interface RequestResponseProfile extends TransmissionProfile {
+
+}
+export interface TransmissionMessage {
     transmitter?: TransmitterProfile,
     receiver?: ReceiverProfile,
     payload?: FisMessage
@@ -89,4 +93,10 @@ export enum Bus {
     OutgoingMessageBus,
     ErrorMessageBus,
     NotificationMessageBus
+}
+
+export interface TransmissionEvent {
+    id: string,
+    event: `NewTransmissionSet` | 'Connection',
+    data: any
 }

+ 10 - 6
src/test/transmitter.ts

@@ -1,23 +1,27 @@
 import { Observable, Subject } from "rxjs";
-import { FisAppActor, FisMessage, TransmissionProfile } from "../interface/transport.interface";
-import { MessageTransmissionBase } from "../transmission/msg.transmission.base";
+import { FisAppActor, FisMessage, MessageTransmission, TransmissionProfile } from "../interface/transport.interface";
 import dotenv from 'dotenv';
 import { MessageTransmissionManager } from "../transmission/msg.transmission.manager";
 
 /*  These are the purple fonts. Gonna interact with blue fonts to set up the credentials to establish the necessary roles.
 Assuming the primary role is server. That means we will need transmitter and multiple receiver profiles that are connected. */
 class Application implements FisAppActor {
-    incomingMessageBus: Subject<FisMessage> = new Subject()
-    outgoingMessageBus: Subject<FisMessage> = new Subject()
+    incomingMessageBus: Subject<any> = new Subject()
+    outgoingMessageBus: Subject<any> = new Subject()
     messageTransmissionManager: MessageTransmissionManager
-    transmissionInstance!: TransmissionProfile
+    transmissionInstance!: MessageTransmission
 
     constructor() {
         this.messageTransmissionManager = new MessageTransmissionManager()
         this.transmissionInstance = this.messageTransmissionManager.getTransmissionInstance()
+
+        //code here first, then refactor/clean it later.
+        this.incomingMessageBus.subscribe(item => {
+            this.transmissionInstance.transmitter.emit(item)
+        })
     }
 
-    send(message: FisMessage): Observable<FisMessage> {
+    send(message: FisMessage): Observable<any> {
         return new Observable((response) => {
             this.outgoingMessageBus.next(message)
             this.incomingMessageBus.subscribe({

+ 7 - 0
src/transmission/msg.transmission.base.ts

@@ -28,14 +28,21 @@ export class MessageTransmissionBase implements MessageTransmissionBaseInterface
     subscribe(observer: Partial<Observer<any>>): Unsubscribable {
         throw new Error('Method not implemented.');
     }
+
     emit(message: any): void {
         throw new Error('Method not implemented.');
     }
+
     emitStream(message: any): void {
         throw new Error('Method not implemented.');
     }
+    
     subscribeMessages(messageFilter: any): Observable<any> {
         throw new Error('Method not implemented.');
     }
 
+    getInfo(): any {
+
+    }
+
 }

+ 36 - 19
src/transmission/msg.transmission.manager.ts

@@ -1,57 +1,74 @@
 import { MessageTransmissionTransmitter } from "./msg.transmission.transmitter";
 import { MessageTransmissionReceiver } from "./msg.transmission.receiver";
 import { ConnectionManager } from "../connector/connector.manager";
-import { MessageTransmissionManager as MessageTransmissionManagerInterface, ReceiverProfile, TransmissionProfile, TransmitterProfile } from "../interface/transport.interface";
+import { MessageTransmission, MessageTransmissionManager as MessageTransmissionManagerInterface, ReceiverProfile, TransmissionEvent, TransmissionProfile, TransmitterProfile } from "../interface/transport.interface";
 import { v4 as uuidv4 } from 'uuid'
-import { AdaptorTransmissionRole } from "../interface/connector.interface";
+import { AdaptorTransmissionRole, RequestResponseConnectionAdapter } from "../interface/connector.interface";
 import { MessageTransmissionRequestResponse } from "./msg.transmission.request-response";
+import { TransmitterConnectionAdapter } from "../connector/connector.transmitter";
+import { ReceiverConnectionAdapter } from "../connector/connector.receiver";
+import { filter, Subject } from "rxjs";
 /* FOr now, to fill the holes/gaps so to speak, the FIS actor will interact wtih Message Transmission Manager, without having to wory about
 the underyling transport protocol as well as managing client control, when it's acting as a transmitter. */
 export class MessageTransmissionManager implements MessageTransmissionManagerInterface {
-    transmission: TransmissionProfile[] = []
+    // transmission: MessageTransmission[] = []
     connectionManager!: ConnectionManager
+    messageTransmissionTransmitters: MessageTransmissionTransmitter[] = []
+    messageTransmissionReceiver: MessageTransmissionReceiver[] = []
+    messageTransmissionRequestResponse: MessageTransmissionRequestResponse[] = []
+    transmissionEvent: Subject<TransmissionEvent> = new Subject()
 
     constructor() {
-        // logic here
+        // this.transmissionEvent.pipe(
+        //     filter(event => event.event == 'NewTransmissionSet') // filter out odd numbers
+        // ).subscribe((transmissionData: TransmissionEvent) => {
+        //     this.transmission.push(transmissionData.data as MessageTransmission)
+        // })
     }
 
     // but this function also needs to talk to connection adaptor manager to instantiate the necessary adapter.
-    getTransmissionInstance(): TransmissionProfile {
-        let transmitter = this.getTransmitter()
-        let receiver = this.getReceiver()
-        let transmission: TransmissionProfile = {
+    getTransmissionInstance(): MessageTransmission {
+        let transmitterAdapter: TransmitterConnectionAdapter = this.connectionManager.getTransmitterConnectionAdapter()
+        let receiverAdapter: ReceiverConnectionAdapter = this.connectionManager.getReceiverConnectionAdapter()
+        let transmitter: MessageTransmissionTransmitter = this.getTransmitter(transmitterAdapter)
+        let receiver: MessageTransmissionReceiver = this.getReceiver(receiverAdapter)
+        let transmission: MessageTransmission = {
             id: uuidv4(),
-            receiverId: uuidv4(),
-            transmitterId: uuidv4(),
+            receiverId: transmitter.getInfo().id,
+            transmitterId: receiver.getInfo().id,
             transmitter: transmitter,
             receiver: receiver,
-            requestResponse: new MessageTransmissionRequestResponse(transmitter, receiver, AdaptorTransmissionRole.RequestResponse, this.connectionManager.getRequestResponseConnectionAdapter())
+            requestResponse: this.getRequestResponse(transmitter, receiver, this.connectionManager.getRequestResponseConnectionAdapter(transmitterAdapter, receiverAdapter))
         }
-        this.instantiateConnectionManager(transmission) // start an adapter
-        this.transmission.push(transmission)
+        this.instantiateConnectionManager() // start an adapter
+        // this.transmission.push(transmission)
         return transmission
     }
 
-    private getTransmitter(): MessageTransmissionTransmitter {
+    private getTransmitter(connectionAdapter: TransmitterConnectionAdapter): MessageTransmissionTransmitter {
         let transmitterProfile: TransmitterProfile = {
             id: uuidv4(),
             name: '', // for now make it empty. We will use the assigned uuid here
             dateCreated: new Date()
         }
-        return new MessageTransmissionTransmitter(transmitterProfile, AdaptorTransmissionRole.Transmitter, this.connectionManager.getTransmitterConnectionAdapter())
+        return new MessageTransmissionTransmitter(transmitterProfile, AdaptorTransmissionRole.Transmitter, connectionAdapter)
     }
 
-    private getReceiver(): MessageTransmissionReceiver {
+    private getReceiver(connectionAdapter: ReceiverConnectionAdapter): MessageTransmissionReceiver {
         let receiverProfile: ReceiverProfile = {
             id: uuidv4(),
             name: '', // for now make it empty. We will use the assigned uuid here
             dateCreated: new Date()
         }
-        return new MessageTransmissionReceiver(receiverProfile, AdaptorTransmissionRole.Receiver, this.connectionManager.getReceiverConnectionAdapter())
+        return new MessageTransmissionReceiver(receiverProfile, AdaptorTransmissionRole.Receiver, connectionAdapter)
     }
 
-    private instantiateConnectionManager(transmissionProfile: TransmissionProfile): void {
-        this.connectionManager = new ConnectionManager(transmissionProfile)
+    private getRequestResponse(transmitterInstance: MessageTransmissionTransmitter, receiverInstance: MessageTransmissionReceiver, connectionAdaptor: RequestResponseConnectionAdapter): MessageTransmissionRequestResponse {
+        return new MessageTransmissionRequestResponse(transmitterInstance, receiverInstance, AdaptorTransmissionRole.RequestResponse, connectionAdaptor)
+    }
+
+    private instantiateConnectionManager(): void {
+        this.connectionManager = new ConnectionManager(this.transmissionEvent)
     }
 
 }

+ 7 - 7
src/transmission/msg.transmission.transmitter.ts

@@ -1,6 +1,6 @@
 import { MessageTransmissionBase } from "./msg.transmission.base";
-import { FisMessage,  MessageTransmitter as MessageTransmitterInterface, TransmitterProfile } from '../interface/transport.interface'
-import { AdaptorTransmissionRole } from "../interface/connector.interface";
+import { FisMessage,  MessageTransmitter as MessageTransmitterInterface, TransmissionMessage, TransmitterProfile } from '../interface/transport.interface'
+import { AdaptorTransmissionRole, TransportMessage } from "../interface/connector.interface";
 import { v4 as uuidv4 } from 'uuid'
 import { TransmitterConnectionAdapter } from "../connector/connector.transmitter";
 
@@ -19,10 +19,10 @@ export class MessageTransmissionTransmitter extends MessageTransmissionBase impl
     }
 
     emit(message: FisMessage): void {
-        throw new Error(`Method not implemented`)
-    }
-
-    getTransmitterInfo(): TransmitterProfile {
-        return this.transmitterProfile
+        (this.adapterService as TransmitterConnectionAdapter).emit({
+            transmitter: this.transmitterProfile,
+            receiver: '', // that means this information is vital
+            payload: message
+        } as TransmissionMessage)
     }
 }

+ 52 - 43
src/transport/websocket.ts

@@ -1,57 +1,66 @@
-import { Observable, Subject } from "rxjs";
-import { Socket as ClientSocket } from "socket.io-client";
+import { filter, Observable, Subject } from "rxjs";
+import { Socket as ClientSocket } from 'socket.io-client'
 import { Socket as SocketForConnectedClient } from "socket.io"
 import { handleClientSocketConnection, handleNewSocketClient, startClientSocketConnection, startSocketServer } from "../utils/socket.utils";
-import { ReceiverProfile, TransportEventNotification, TransportMessage, TransportSettings } from "../interface/ITransport.interface";
-import { FisMessage } from "../interface/transport.interface";
+import { TransportEvent, TransportMessage, TransportService } from "../interface/connector.interface";
 
 /* Just code in the context that this websocket service will be handling multiple UI clients. Can think about the server communication at a later time. */
-export class WebsocketTransportService {
-    private selfSocket!: ClientSocket
-    private socketReceiverProfile: ReceiverProfile[] = []
+export class WebsocketTransportService implements TransportService {
+    private connectedServer: ConnectedServerSocket[] = []
+    private connectedClientSocket: ConnectedClientSocket[] = []
     // private incomingMessage: Subject<TransportMessage> = new Subject() // this is only for client roles only atm
-    private eventNotification: Subject<TransportEventNotification> = new Subject()
-
-    constructor(setting: TransportSettings) {
-        if (setting.profileInfo.port) {
-            startSocketServer(setting.profileInfo.port as number).subscribe({
-                next: (connectedClient: SocketForConnectedClient) => {
-                    handleNewSocketClient(connectedClient, this.socketReceiverProfile).subscribe(this.eventNotification)
-                },
-                error: error => console.error(error),
-            })
-        }
-
-        // this is for those who wants to act as receiver. Usually for web browser, but servers can use this too.
-        if (setting.profileInfo.url) {
-            startClientSocketConnection(setting.profileInfo.url).then((socket: ClientSocket) => {
-                this.selfSocket = socket
-                handleClientSocketConnection(socket).subscribe(this.eventNotification)
-            }).catch((error) => {
-                console.error(`WebsocketTransport ERROR:`, error)
-            })
-        }
+    private transportEvent: Subject<TransportEvent> = new Subject()
+
+    constructor() {
+        // logic here
     }
 
-    // for transmission(Server Only, not applicable for client Socket)
-    public async transmit(message: TransportMessage): Promise<string> {
-        return new Promise((resolve, reject) => {
-            let receiverInstance = this.socketReceiverProfile.find(obj => obj.uuid === message.receiverID);
-            if (receiverInstance) {
-                (receiverInstance?.instance as SocketForConnectedClient).emit(message.event, message.payload)
-                resolve('Message called for delivery...')
-            } else {
-                reject(`Receiver cannot be found or doesn't exist...`)
-            }
+    public startServer(port: number): void {
+        // logic here
+        startSocketServer(port).subscribe({
+            next: (connectedClient: SocketForConnectedClient) => {
+                handleNewSocketClient(connectedClient, this.connectedClientSocket).subscribe(this.transportEvent)
+            },
+            error: error => console.error(error),
+        })
+    }
+
+    public startClient(url: string): void {
+        // logic here
+        startClientSocketConnection(url).then((socket: ClientSocket) => {
+            handleClientSocketConnection(socket, this.connectedServer).subscribe(this.transportEvent)
+        }).catch((error) => {
+            console.error(`WebsocketTransport ERROR:`, error)
         })
     }
 
-    public getTransportEventNotification(): Observable<TransportEventNotification> {
-        return this.eventNotification as Observable<TransportEventNotification>
+    public getTransportEvent(): Observable<TransportEvent> {
+        return this.transportEvent.asObservable()
     }
 
-    // for client UI eg
-    public clientEmit(request: FisMessage): void {
-        this.selfSocket.emit(`message`, request)
+    
+    // for transmission(Server Only, not applicable for client Socket)
+    public emit(message: TransportMessage): void {
+        // send message
     }
+    
+    public subscribe(): Observable<TransportEvent> {
+        return this.transportEvent.asObservable()
+    }
+
+
 }
+
+
+
+interface ConnectedSocket {
+    id: string,
+    dateCreated: Date,
+}
+export interface ConnectedClientSocket extends ConnectedSocket {
+    socketInstance: SocketForConnectedClient
+}
+
+export interface ConnectedServerSocket extends ConnectedSocket {
+    socketInstance: ClientSocket
+}

+ 84 - 105
src/utils/socket.utils.ts

@@ -3,8 +3,9 @@ import { createServer } from 'http';
 import { Server, Socket as SocketForConnectedClient } from 'socket.io';
 import { io, Socket as ClientSocket } from 'socket.io-client';
 import * as fs from 'fs'
-import { ReceiverProfile, TransportEventNotification, TransportMessage } from '../interface/ITransport.interface';
 import { v4 as uuidv4 } from 'uuid'
+import { TransportEvent } from '../interface/connector.interface';
+import { ConnectedClientSocket, ConnectedServerSocket } from '../transport/websocket';
 
 export function startSocketServer(port: number): Observable<SocketForConnectedClient> {
     return new Observable((observer) => {
@@ -36,7 +37,7 @@ export async function startClientSocketConnection(serverUrl: string): Promise<Cl
     return new Promise((resolve, reject) => {
         try {
             // let clientSocket = io(serverUrl)
-            let clientSocket = io(serverUrl, {
+            let clientSocket: ClientSocket = io(serverUrl, {
                 reconnection: true,              // Enable automatic reconnections
                 reconnectionAttempts: 1000,       // Retry up to 10 times
                 reconnectionDelay: 500,          // Start with a 500ms delay
@@ -52,17 +53,22 @@ export async function startClientSocketConnection(serverUrl: string): Promise<Cl
 }
 
 // After establishing connection to the server, set up the credentials, confirm whether or not if there's any credentials, if not ask for one from the server
-export function handleClientSocketConnection(socket: ClientSocket): Observable<TransportEventNotification> {
-    return new Observable((eventNotification: Observer<TransportEventNotification>) => {
+export function handleClientSocketConnection(socket: ClientSocket, serversConnected: ConnectedServerSocket[]): Observable<TransportEvent> {
+    return new Observable((eventNotification: Observer<TransportEvent>) => {
         let clientName!: string
         let buffer: any[] = []
-        let receiverProfileInfo!: ReceiverProfile
+        let receiverProfileInfo!: ConnectedClientSocket
 
         // Listen for a connection event
         socket.on('connect', () => {
             console.log('Connected to the server:', socket.id)
+            serversConnected.push({
+                id: uuidv4(),
+                dateCreated: new Date(),
+                socketInstance: socket
+            })
             if (clientName) {
-                checkOwnClientInfo(clientName).then((profile: ReceiverProfile) => {
+                checkOwnClientInfo(clientName).then((profile: ConnectedClientSocket) => {
                     receiverProfileInfo = profile
                     socket.emit('profile', {
                         name: 'Old Client',
@@ -86,23 +92,7 @@ export function handleClientSocketConnection(socket: ClientSocket): Observable<T
         socket.on('message', (msg: any) => {
             console.log(`Websocket Client Transport Receieve Msg`, msg.id)
             if (receiverProfileInfo) {
-                eventNotification.next({
-                    event: 'New Message',
-                    description: 'Received new message',
-                    transportType: 'WEBSOCKET',
-                    data: {
-                        receiverID: receiverProfileInfo.uuid,
-                        receiverName: receiverProfileInfo.name,
-                        date: new Date(),
-                        payload: msg
-                    }
-                })
-                // incomingMessage.next({
-                //     id: msg.header.MessageID,
-                //     receiverID: receiverProfileInfo.uuid,
-                //     payload: msg,
-                //     event: 'New Message'
-                // })
+                // publish to event
             } else {
                 // Do nothing. just store in local array first. Cannot process without information. but then again, don['t need information if acting as client
                 // but for consistency sake, will impose the standard 
@@ -113,33 +103,23 @@ export function handleClientSocketConnection(socket: ClientSocket): Observable<T
         socket.on('profile', (data: { name: string, message: any }) => {
             // console.log(data)
             if (data.name == 'New Profile') {
-                console.log(`Assigned client Name: ${(data.message as ReceiverProfile).name}`)
-                receiverProfileInfo = data.message as ReceiverProfile
-                writeFile(data.message as ReceiverProfile, (data.message as ReceiverProfile).name).then(() => {
-                    clientName = receiverProfileInfo.name
+                console.log(`Assigned client Name: ${(data.message as ConnectedClientSocket).id}`)
+                receiverProfileInfo = data.message as ConnectedClientSocket
+                writeFile(data.message as ConnectedClientSocket, (data.message as ConnectedClientSocket).id).then(() => {
+                    clientName = receiverProfileInfo.id
                     // broadcast event to allow retransmission to release buffer
-                    eventNotification.next({
-                        event: 'Connection',
-                        description: 'Profile acquired || updated and stored',
-                        transportType: 'WEBSOCKET',
-                        data: {
-                            receiverID: receiverProfileInfo.uuid,
-                            receiverName: receiverProfileInfo.name,
-                            date: new Date(),
-                            payload: receiverProfileInfo
-                        }
-                    })
+
                 }).catch((error) => { }) // do nothing at the moment. 
             }
             if (data.name == 'Adjusted Profile') {
-                console.log(`Assigned client Name: ${(data.message as ReceiverProfile).name}`)
-                receiverProfileInfo = data.message as ReceiverProfile
-                writeFile(data.message as ReceiverProfile, (data.message as ReceiverProfile).name).then(() => {
+                console.log(`Assigned client Name: ${(data.message as ConnectedClientSocket).id}`)
+                receiverProfileInfo = data.message as ConnectedClientSocket
+                writeFile(data.message as ConnectedClientSocket, (data.message as ConnectedClientSocket).id).then(() => {
                     // broadcast event to allow retransmission to release buffer
                     eventNotification.next({
-                        event: 'Connection',
-                        description: 'Profile acquired || updated and stored',
-                        transportType: 'WEBSOCKET',
+                        id: uuidv4(),
+                        event: 'Client Reconnected',
+                        data: ''
                     })
                 }).catch((error) => { }) // do nothing at the moment. 
             }
@@ -160,9 +140,9 @@ export function handleClientSocketConnection(socket: ClientSocket): Observable<T
             console.log('Websocket Client disconnected from the server');
             if (receiverProfileInfo) {
                 eventNotification.next({
-                    event: 'Disconnection',
-                    description: 'Disconnected from the server',
-                    transportType: 'WEBSOCKET'
+                    id: uuidv4(),
+                    event: `Client Disconnected`,
+                    data: ''
                 })
             }
         });
@@ -170,55 +150,39 @@ export function handleClientSocketConnection(socket: ClientSocket): Observable<T
 }
 
 // For SERVER Usage: set up socket listeners to start listening for different events
-export function handleNewSocketClient(socket: SocketForConnectedClient, socketReceiverProfile: ReceiverProfile[]): Observable<TransportEventNotification> {
-    return new Observable((event: Observer<TransportEventNotification>) => {
+export function handleNewSocketClient(socket: SocketForConnectedClient, connectedClientSocket: ConnectedClientSocket[]): Observable<TransportEvent> {
+    return new Observable((event: Observer<TransportEvent>) => {
         console.log(`Setting up listeners for socket:${socket.id}`)
         // returns the socket client instance 
         // listen to receiver's initiotion first before assigning 'credentials'
-        socket.on(`profile`, (message: { name: string, data: ReceiverProfile }) => {
+        socket.on(`profile`, (message: { name: string, data: any }) => {
             if (message.name == 'New Client') {
-                let receiverProfile: ReceiverProfile = {
-                    uuid: uuidv4(),
-                    name: `Client${uuidv4()}`,
+                let clientInstance: ConnectedClientSocket = {
+                    id: uuidv4(),
                     dateCreated: new Date(),
-                    transportType: `WEBSOCKET`,
-                    eventNotification: new Subject(),
-                    instance: socket
+                    socketInstance: socket
                 }
                 // publish first event notification
                 event.next({
-                    event: 'Connection',
-                    description: 'New Client Connected',
-                    transportType: 'WEBSOCKET',
-                    data: {
-                        receiverID: receiverProfile.uuid,
-                        receiverName: receiverProfile.name,
-                        date: new Date(),
-                        payload: receiverProfile
-                    }
+                    id: uuidv4(),
+                    event: `New Client`,
+                    data: clientInstance
                 })
                 // send to receiver for reference
                 socket.emit('profile', {
-                    name: `New Profile`, message: {
-                        uuid: receiverProfile.uuid,
-                        name: receiverProfile.name,
-                        dateCreated: receiverProfile.dateCreated,
-                        transportType: `WEBSOCKET`,
-                        eventNotification: null,
-                        instance: null // have to put null, otherwise circular reference maximum stack error
-                    }
+                    name: `New Profile`, message: clientInstance
                 })
-                socketReceiverProfile.push(receiverProfile)
-                startListening(socket, receiverProfile)
+                // Update connected clientInstance info to adapter
+                connectedClientSocket.push(clientInstance)
+                startListening(socket, clientInstance, event)
             } else {
                 // update first
-                let receiverProfile: ReceiverProfile | undefined = socketReceiverProfile.find(obj => obj.uuid === message.data.uuid)
-                if (receiverProfile) {
-                    console.log(`Profile ${receiverProfile.uuid} Found`)
-                    receiverProfile.instance = socket
-                    socket.emit('profile', { name: 'Adjusted Profile', message: receiverProfile })
+                let clientInstance: ConnectedClientSocket | undefined = connectedClientSocket.find(obj => obj.id === message.data.id)
+                if (clientInstance) {
+                    console.log(`Socket Client ${clientInstance.id} Found`)
+                    socket.emit('profile', { name: 'Adjusted Profile', message: clientInstance })
                     // need to start listening again, because it's assigned a different socket instance this time round
-                    startListening(socket, receiverProfile)
+                    startListening(socket, clientInstance, event)
                 } else {
                     console.log(`Profile Not Found`)
                     socket.emit('profile', { name: 'Error', message: 'Receiver Profile Not found' })
@@ -230,7 +194,7 @@ export function handleNewSocketClient(socket: SocketForConnectedClient, socketRe
 
 
 // Specifically to write receiver profile information
-export async function writeFile(data: ReceiverProfile, filename: string): Promise<boolean> {
+export async function writeFile(data: ConnectedClientSocket, filename: string): Promise<boolean> {
     return new Promise((resolve, reject) => {
         // Write JSON data to a file
         fs.writeFile(`${filename}.json`, JSON.stringify(data, null, 2), (err) => {
@@ -248,7 +212,7 @@ export async function writeFile(data: ReceiverProfile, filename: string): Promis
 
 
 // Check if filename exists. Return profile information if there's any
-export async function checkOwnClientInfo(filename: string): Promise<ReceiverProfile> {
+export async function checkOwnClientInfo(filename: string): Promise<ConnectedClientSocket> {
     return new Promise((resolve, reject) => {
         // Check if the file exists
         if (fs.existsSync(`${filename}.json`)) {
@@ -277,43 +241,58 @@ export async function checkOwnClientInfo(filename: string): Promise<ReceiverProf
     })
 }
 
-export function startListening(socket: SocketForConnectedClient, receiverProfile: ReceiverProfile): void {
+export function startListening(socket: SocketForConnectedClient, client: ConnectedClientSocket, eventListener: Observer<TransportEvent>): void {
     /* Generally, we don't need this unless in the case of being the receiver */
     socket.on('message', (message: any) => {
-        // here
+        eventListener.next({
+            id: uuidv4(),
+            event: 'New Message',
+            data: {
+                clientID: client.id,
+                dateReceived: new Date(),
+                payload: message
+            }
+        })
     })
 
-    socket.on('request', (request: any) => {
+    socket.on('request', (message: any) => {
         // here : Let's say there's a subcsription request here
-        receiverProfile.eventNotification.next({
+        eventListener.next({
+            id: uuidv4(),
             event: 'New Message',
-            description: 'Incoming request',
-            transportType: 'WEBSOCKET',
             data: {
-                receiverID: receiverProfile.uuid,
-                receiverName: receiverProfile.name,
-                date: new Date(),
-                payload: request
+                clientID: client.id,
+                dateReceived: new Date(),
+                payload: message
             }
         })
     })
 
     socket.on('notification', (notification: any) => {
         // logic here
+        eventListener.next({
+            id: uuidv4(),
+            event: `Notification`,
+            data: {
+                clientID: client.id,
+                dateReceived: new Date(),
+                payload: notification
+            }
+        })
     })
 
     socket.on('disconnect', () => {
-        receiverProfile.eventNotification.next(
-            {
-                event: 'Disconnection',
-                description: `Existing Client ${receiverProfile.uuid} disonnected`,
-                transportType: `WEBSOCKET`,
-                data: {
-                    receiverID: receiverProfile.uuid,
-                    receiverName: receiverProfile.name,
-                    date: new Date(),
-                }
+        eventListener.next({
+            id: uuidv4(),
+            event: 'Server Disconnected',
+            data: {
+                clientID: client.id,
+                time: new Date()
             }
-        )
+        })
+        eventListener.error(`Client ${client.id} disconnected. Terminating this observable event for this client socket...`)
+        eventListener.complete()
     })
-}
+}
+
+