Enzo 3 týždňov pred
rodič
commit
03b2c99ca4

+ 68 - 56
src/connector/connector.ts

@@ -1,62 +1,84 @@
-import { Observable, Observer, Subject } from "rxjs";
+import { Observable, Observer, Subject, Subscriber, Unsubscribable } from "rxjs";
 import dotenv from 'dotenv';
 import { v4 as uuidv4 } from 'uuid';
-import { WebsocketTransportService } from "../transport/websocket";
 import { ITransportReceiving, ITransportSuper, ITransportTransmitting, ReceiverProfile, TransportEventNotification, TransportManagerEventNotification, TransportManagerInterface, TransportMessage, TransportSettings, TransportType } from "../interface/ITransport.interface";
+import { AdaptorTransmissionRole, ConnectionAdaptorBase, ConnectionState } from "../interface/connector.interface";
+import { FisAppActor, FisMessage } from "../interface/transport.interface";
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
-export class ConnectionManager {
-    private incomingMessage: Subject<TransportMessage> = new Subject() // only for client roles usage. Servers will listen to event notification for incoming requests 
-    private outGoingMessage: Subject<FisMessage> = new Subject()
+export class ConnectionAdapter implements FisAppActor, ConnectionAdaptorBase<any> {
+    incomingMessageBus: Subject<any> = new Subject()
+    outgoingMessageBus: Subject<any> = new Subject()
+    connector: any;
+    connectorProfile: any;
+    connectionState!: ConnectionState;
+    connectionStateBus!: Observable<ConnectionState>;
+    adaptorTransmissionRole!: AdaptorTransmissionRole;
+
+    // For Manager, TBD
     private connectedClients: ReceiverProfile[] = []
     private transportEventNotification: Subject<TransportEventNotification> = new Subject()
     // this is assuming that the application will do request response, otherwise, just one will do.
-    private transportService!: ITransportSuper
-    private transmittingService!: ITransportTransmitting
-    private receivingService!: ITransportReceiving
     private serverPort!: number
     private serverUrl!: string
 
     constructor(port?: number, url?: string) {
         if (port) {
-            this.serverPort = port
-            this.establishTransportTransmitting(process.env.Transport as TransportType).then((transmissionService: ITransportTransmitting) => {
-                transmissionService.getTransportEventNotification().subscribe((notification: TransportEventNotification) => {
-                    // Collect client information when they are connected 
-                    if (notification.event == `Connection` && notification.description == 'New Client Connected') {
-                        this.connectedClients.push(notification.data?.payload)
-                        this.handleNewReceiver(notification.data?.payload)
-                    }
-                })
-            }).catch((error) => console.error(error))
+           // instantiate the transport service. Eg: socket, http. Try to use the same service. Then return TransmitterConnectionAdapter
+            
         }
         /* For web browser ui, because this server can also act as a receiver, not just transmitter as well.  */
         if (url) {
-            this.serverUrl = url
-            this.establishTransportReceiving(process.env.Transport as TransportType).then((receivingService: ITransportReceiving) => {
-                // logic here
-            })
+            // if no transport service is instantiated, then instantiate a new one, otherwise use existying one.
         }
+
         if (!port && !url) console.error(`No role has assigned for transport service due to lack of information provided!`)
         this.processOutputMessage()
     }
 
-    // use for as a transmitter
-    public emit(message: TransportMessage): void {
-        if(this.transmittingService) {
-            this.transmittingService.transmit(message)
-        }
+    subscribeConnectionState(): Observable<ConnectionState> {
+        throw new Error("Method not implemented.");
     }
-
-    // use for as a receiver
-    public send(message: TransportMessage): Observable<TransportMessage> {
-        return new Observable((response: Observer<TransportMessage>) => {
+    publishConnectionState(): void {
+        throw new Error("Method not implemented.");
+    }
+    connect(): void {
+        throw new Error("Method not implemented.");
+    }
+    disconnect(): void {
+        throw new Error("Method not implemented.");
+    }
+    send(message: any): Observable<any> {
+        return new Observable((response) => {
             // logic here
         })
+
+    }
+    emit(message: any): void {
+        // logic here
+    }
+    emitStream(message: any): void {
+        throw new Error("Method not implemented.");
+    }
+    subscribeMessages(messageFilter: any): Observable<any> {
+        throw new Error("Method not implemented.");
+    }
+    subscribe(subscriber: Subscriber<any>): Unsubscribable {
+        // Emit some value based on T (here for demonstration)
+        subscriber.next({} as any);
+        subscriber.complete();
+
+        // Return an Unsubscribable object to allow unsubscription
+        return {
+            unsubscribe: () => {
+                console.log('Unsubscribed');
+            }
+        };
     }
 
+
     public getTransportManagerEventNotification(): Observable<TransportManagerEventNotification> {
         return new Observable((notification: Observer<TransportManagerEventNotification>) => {
             this.transportEventNotification.subscribe((transportNotification: TransportEventNotification) => {
@@ -82,9 +104,7 @@ export class ConnectionManager {
                 }
             }
             if (tranportType == 'WEBSOCKET') {
-                this.transportService = this.transportService ? this.transportService : new WebsocketTransportService(setting)
-                this.transmittingService = this.transportService
-                resolve(this.transmittingService)
+                // logic here
             } else {
                 reject(`No such Transport Type Exist...`)
             }
@@ -103,33 +123,32 @@ export class ConnectionManager {
             }
             // will be sharing one instance, This part may break the code
             if (tranportType == 'WEBSOCKET') {
-                this.receivingService = this.transportService ? this.transportService : new WebsocketTransportService(setting)
-                resolve(this.receivingService)
+                // logic here
             }
         })
     }
 
     // This function will wrap the message to be delivered over to established transport to be sent over
     private processOutputMessage(): void {
-        this.outGoingMessage.subscribe({
+        this.outgoingMessageBus.subscribe({
             next: (message: FisMessage) => {
                 if (message.header.messageName == 'NotificationMessage') {
                     // This is just here temporarily. Because The application will be concerned with whom the party is subcribed to, not this transport manager
-                    this.transmittingService.transmit({
-                        id: message.header.messageID,
-                        receiverID: '',
-                        payload: message,
-                        event: `notification`
-                    })
+                    // this.transmittingService.transmit({
+                    //     id: message.header.messageID,
+                    //     receiverID: '',
+                    //     payload: message,
+                    //     event: `notification`
+                    // })
                 }
                 if (message.header.messageName == 'ResponseMessage') {
                     // Map responses according to the requestID???
-                    this.transmittingService.transmit({
-                        id: message.header.messageID,
-                        receiverID: '',
-                        payload: message,
-                        event: `resposne`
-                    })
+                    // this.transmittingService.transmit({
+                    //     id: message.header.messageID,
+                    //     receiverID: '',
+                    //     payload: message,
+                    //     event: `resposne`
+                    // })
                 }
             }
         })
@@ -157,10 +176,3 @@ export class ConnectionManager {
 }
 
 
-export interface FisMessage {
-    header: {
-        messageName: 'RequestMessage' | 'ResponseMessage' | 'NotificationMessage',
-        messageID: string
-    },
-    data: any
-}

+ 18 - 0
src/connector/receiver.connector.ts

@@ -0,0 +1,18 @@
+import dotenv from 'dotenv';
+import { FisAppActor, FisMessage } from "../interface/transport.interface";
+import { ConnectionAdapter } from "./connector";
+import { ReceiverConnectionAdapterBase } from "../interface/connector.interface";
+
+dotenv.config();
+/* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
+So how?: */
+export class ReceiverConnectionAdapter extends ConnectionAdapter implements FisAppActor, ReceiverConnectionAdapterBase<any> {
+
+    constructor() {
+        super()
+        // logic here
+    }
+
+}
+
+

+ 19 - 0
src/connector/transmitter.connector.ts

@@ -0,0 +1,19 @@
+import dotenv from 'dotenv';
+import { FisAppActor, FisMessage } from "../interface/transport.interface";
+import { ConnectionAdapter } from "./connector";
+import { TransmitterConnectionAdapterBase } from '../interface/connector.interface';
+
+dotenv.config();
+/* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
+So how?: */
+export class TransmitterConnectionAdapter extends ConnectionAdapter implements FisAppActor, TransmitterConnectionAdapterBase<any> {
+
+    constructor(port: number) {
+        super()
+        // logic here
+        
+    }
+
+}
+
+

+ 8 - 4
src/interface/connector.interface.ts

@@ -1,4 +1,4 @@
-import { Observable } from "rxjs"
+import { Observable, Observer, PartialObserver, Subscribable, Subscriber, Unsubscribable } from "rxjs"
 import { FisAppActor } from "./transport.interface"
 
 
@@ -28,7 +28,7 @@ export interface ConnectionManager {
 
 }
 
-export interface ConnectionAdaptorBase extends FisAppActor {
+export interface ConnectionAdaptorBase<T> extends FisAppActor, Subscribable<T> {
     connector: any
     connectorProfile: AdapterProfile | any
     connectionState: ConnectionState
@@ -46,13 +46,17 @@ export interface ConnectionAdaptorBase extends FisAppActor {
 }
 
 
-export interface TransmitterAdapterBase extends ConnectionAdaptorBase {
+export interface TransmitterConnectionAdapterBase<T> extends ConnectionAdaptorBase<T> {
+
+}
+
+export interface ReceiverConnectionAdapterBase<T> extends ConnectionAdaptorBase<T> {
 
 }
 
 export type ConnectionState = 'ONLINE' | 'OFFLINE'
 export enum AdaptorTransmissionRole {
     Transmitter,
-    Receiever,
+    Receiver,
     RequestResponse
 }

+ 12 - 6
src/interface/transport.interface.ts

@@ -1,4 +1,4 @@
-import { Observable, Subject } from "rxjs";
+import { Observable, Subject, Subscribable } from "rxjs";
 import { AdaptorTransmissionRole, ReceiverProfile, TransmitterProfile } from "./connector.interface";
 
 export interface FisAppActor {
@@ -16,7 +16,7 @@ export interface MessageTransmissionManager {
 }
 
 
-export interface MessageTransmissionBase extends FisAppActor {
+export interface MessageTransmissionBase<T> extends FisAppActor, Subscribable<T> {
     transmitterProfile: TransmitterProfile
     receiverProfile: ReceiverProfile
     msgRepositoryService: any
@@ -26,18 +26,18 @@ export interface MessageTransmissionBase extends FisAppActor {
     adapterService: any
 
     setTransmitter(transmitterProfile: TransmitterProfile): void
-    setReceiver(receiverProfile: ReceiverProfile): Promise<any>
+    setReceiver(receiverProfile: ReceiverProfile): void
 }
 
-export interface MsgReceiver extends MessageTransmissionBase {
+export interface MessageReceiver<T> extends MessageTransmissionBase<T> {
 
 }
 
-export interface MsgTransmitter extends MessageTransmissionBase {
+export interface MessageTransmitter<T> extends MessageTransmissionBase<T> {
 
 }
 
-export interface MsgRequestResponse extends MessageTransmissionBase, MsgReceiver {
+export interface MessageRequestResponse<T> extends MessageTransmissionBase<T>, MessageReceiver<T> {
 
 }
 
@@ -47,4 +47,10 @@ export interface FisMessage {
         messageName: `NotificationMessage` | `ResponseMessage` | `RequestMessage`
     },
     data: any
+}
+
+export interface TransmisionMessage {
+    transmitter?: TransmitterProfile,
+    receiver?: ReceiverProfile,
+    payload?: FisMessage
 }

+ 39 - 0
src/transmission/msg.receiver.ts

@@ -0,0 +1,39 @@
+import { Subject, Subscriber, Unsubscribable } from 'rxjs';
+import { AdaptorTransmissionRole, TransmitterProfile } from '../interface/connector.interface';
+import { MessageTransmissionBase } from './msg.transmission.base';
+import { MessageReceiver as MessageReceiverInterface } from '../interface/transport.interface'
+import { ConnectionAdapter } from '../connector/connector';
+
+export class MessageReceiver extends MessageTransmissionBase implements MessageReceiverInterface<any> {
+    receiverProfile: TransmitterProfile; // Property to hold the receiver profile
+    transmissionRole: AdaptorTransmissionRole;
+    incomingMessageBus: Subject<any> = new Subject()
+
+    constructor(receiverProfile: TransmitterProfile) {
+        super();
+        this.transmissionRole = AdaptorTransmissionRole.Receiver;
+        this.receiverProfile = receiverProfile;
+        this.instantiateConnectionAdapter()
+    }
+
+    // Implement Subscribable's subscribe method. All messages pass through here
+    subscribe(subscriber: Subscriber<any>): Unsubscribable {
+        // Emit some value based on T (here for demonstration)
+        subscriber.next({} as any);
+        subscriber.complete();
+
+        // Return an Unsubscribable object to allow unsubscription
+        return {
+            unsubscribe: () => {
+                console.log('Unsubscribed');
+            }
+        };
+    }
+
+    private instantiateConnectionAdapter(): void {
+        this.adapterService = new ConnectionAdapter()
+        this.adapterService.subscribe((message: any) => {
+            this.incomingMessageBus.next(message)
+        })
+    }
+}

+ 13 - 7
src/transmission/msg.transmission.base.ts

@@ -1,10 +1,11 @@
 
-import { Observable, Subject } from 'rxjs';
+import { Observable, Observer, Subject, Unsubscribable } from 'rxjs';
 import { TransmitterProfile, ReceiverProfile, AdaptorTransmissionRole } from '../interface/connector.interface';
 import { MessageTransmissionBase as MessageTransmissionBaseInterface } from '../interface/transport.interface'
 import { v4 as uuidv4 } from 'uuid'
+import { MessageTransmitter } from './msg.transmitter';
 
-export class MessageTransmissionBase implements MessageTransmissionBaseInterface {
+export class MessageTransmissionBase implements MessageTransmissionBaseInterface<any> {
     transmitterProfile!: TransmitterProfile;
     receiverProfile!: ReceiverProfile;
     msgRepositoryService: any;
@@ -16,6 +17,7 @@ export class MessageTransmissionBase implements MessageTransmissionBaseInterface
     outgoingMessageBus!: Subject<any>;
 
     constructor() {
+        // these are for self. Not to be confused with setting up credentials for connected clients in the context of websocket
         this.setTransmitter({
             id: uuidv4(),
             name: 'Transmitter' + process.env.PORT,
@@ -27,18 +29,22 @@ export class MessageTransmissionBase implements MessageTransmissionBaseInterface
             dateCreated: new Date()
         })
     }
+    
+    subscribe(observer: Partial<Observer<any>>): Unsubscribable {
+        throw new Error('Method not implemented.');
+    }
 
     // Pretty straight forward. Set itself straight away.
-    setTransmitter(transmitterProfile: TransmitterProfile): void {
-        this.transmitterProfile = transmitterProfile
+    setTransmitter(transmitterProfile: TransmitterProfile): MessageTransmitter {
+        return new MessageTransmitter(transmitterProfile)
     }
 
-    setReceiver(receiverProfile: ReceiverProfile): Promise<any> {
-        throw new Error('Method not implemented.');
+    setReceiver(receiverProfile: ReceiverProfile): void {
+        this.receiverProfile = receiverProfile
     }
 
     send(message: any): Observable<any> {
-        throw new Error('Method not implemented.');
+       throw new Error(`Method not implemented`)
     }
     emit(message: any): void {
         throw new Error('Method not implemented.');

+ 48 - 0
src/transmission/msg.transmitter.ts

@@ -0,0 +1,48 @@
+import { MessageTransmissionBase } from "./msg.transmission.base";
+import { FisMessage, MessageTransmitter as MessageTransmitterInterface } from '../interface/transport.interface'
+import { AdaptorTransmissionRole, TransmitterProfile } from "../interface/connector.interface";
+import { filter, Observable, Observer, Subscription, takeWhile } from "rxjs";
+
+export class MessageTransmitter extends MessageTransmissionBase implements MessageTransmitterInterface<any> {
+
+    constructor(transmitterProfile: TransmitterProfile) {
+        super()
+        this.transmissionRole = AdaptorTransmissionRole.Transmitter
+        this.transmitterProfile = transmitterProfile
+    }
+
+    send(message: FisMessage): Observable<FisMessage> {
+        return new Observable((response: Observer<FisMessage>) => {
+            this.transmissionService.send({
+                transmitter: this.transmitterProfile,
+                payload: message
+            });
+    
+            const subscription: Subscription = this.incomingMessageBus
+                .pipe(
+                    // Filter for matching message IDs
+                    filter(incomingMessage => incomingMessage.header.messageID === message.header.messageID),
+                    // Complete the observable when the data is 'Complete'
+                    takeWhile(incomingMessage => incomingMessage.data !== 'Complete')
+                )
+                .subscribe({
+                    next: incomingMessage => {
+                        response.next(incomingMessage);
+                        // Complete the observable if data is 'Complete'
+                        if (incomingMessage.data === 'Complete') {
+                            response.complete();
+                            subscription.unsubscribe();
+                        }
+                    }
+                });
+        });
+    }
+
+    emit(message: any): void {
+        // emit base on receiver
+    }
+
+    emitStream(message: any): void {
+
+    }
+}

+ 1 - 1
src/transport/websocket.ts

@@ -3,7 +3,7 @@ import { Socket as ClientSocket } from "socket.io-client";
 import { Socket as SocketForConnectedClient } from "socket.io"
 import { handleClientSocketConnection, handleNewSocketClient, startClientSocketConnection, startSocketServer } from "../utils/socket.utils";
 import { ReceiverProfile, TransportEventNotification, TransportMessage, TransportSettings } from "../interface/ITransport.interface";
-import { FisMessage } from "../connector/connector";
+import { FisMessage } from "../interface/transport.interface";
 
 /* Just code in the context that this websocket service will be handling multiple UI clients. Can think about the server communication at a later time. */
 export class WebsocketTransportService {