Browse Source

udpated interface with latest spec

Enzo 3 weeks ago
parent
commit
8fe4e9c96f

+ 2 - 1
.env

@@ -1 +1,2 @@
-Transport = "WEBSOCKET"
+Transport = "WEBSOCKET"
+PORT = 3000

+ 20 - 7
src/transport/transport.manager.ts → src/connector/connector.ts

@@ -1,13 +1,13 @@
 import { Observable, Observer, Subject } from "rxjs";
 import dotenv from 'dotenv';
 import { v4 as uuidv4 } from 'uuid';
-import { WebsocketTransportService } from "./websocket";
-import { ITransportReceiving, ITransportSuper, ITransportTransmitting, ReceiverProfile, TransportEventNotification, TransportManagerEventNotification, TransportMessage, TransportSettings, TransportType } from "../interface/ITransport.interface";
+import { WebsocketTransportService } from "../transport/websocket";
+import { ITransportReceiving, ITransportSuper, ITransportTransmitting, ReceiverProfile, TransportEventNotification, TransportManagerEventNotification, TransportManagerInterface, TransportMessage, TransportSettings, TransportType } from "../interface/ITransport.interface";
 
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
-export class TransportManager {
+export class ConnectionManager {
     private incomingMessage: Subject<TransportMessage> = new Subject() // only for client roles usage. Servers will listen to event notification for incoming requests 
     private outGoingMessage: Subject<FisMessage> = new Subject()
     private connectedClients: ReceiverProfile[] = []
@@ -27,7 +27,6 @@ export class TransportManager {
                     // Collect client information when they are connected 
                     if (notification.event == `Connection` && notification.description == 'New Client Connected') {
                         this.connectedClients.push(notification.data?.payload)
-                        // console.log(this.connectedClients) // we do get the connected clients
                         this.handleNewReceiver(notification.data?.payload)
                     }
                 })
@@ -37,13 +36,27 @@ export class TransportManager {
         if (url) {
             this.serverUrl = url
             this.establishTransportReceiving(process.env.Transport as TransportType).then((receivingService: ITransportReceiving) => {
-                receivingService.receive().subscribe(this.incomingMessage)
+                // logic here
             })
         }
         if (!port && !url) console.error(`No role has assigned for transport service due to lack of information provided!`)
         this.processOutputMessage()
     }
 
+    // use for as a transmitter
+    public emit(message: TransportMessage): void {
+        if(this.transmittingService) {
+            this.transmittingService.transmit(message)
+        }
+    }
+
+    // use for as a receiver
+    public send(message: TransportMessage): Observable<TransportMessage> {
+        return new Observable((response: Observer<TransportMessage>) => {
+            // logic here
+        })
+    }
+
     public getTransportManagerEventNotification(): Observable<TransportManagerEventNotification> {
         return new Observable((notification: Observer<TransportManagerEventNotification>) => {
             this.transportEventNotification.subscribe((transportNotification: TransportEventNotification) => {
@@ -82,7 +95,7 @@ export class TransportManager {
         return new Promise((resolve, reject) => {
             let setting: TransportSettings = {
                 profileInfo: {
-                    id: uuidv4(), 
+                    id: uuidv4(),
                     name: 'For Client',
                     url: this.serverUrl,
                     transport: tranportType
@@ -128,7 +141,7 @@ export class TransportManager {
             next: event => {
                 // new request will be handled. And then manager will here will pick it up from eventNotification and respond accordingly if there's a need for it
                 if (event.event == `Disconnection`) {
-                    console.error(`Client ${event.data?.receiverID?? 'undefined'} disconnected.`)
+                    console.error(`Client ${event.data?.receiverID ?? 'undefined'} disconnected.`)
                     receiver.eventNotification.complete() // no need to listen since it's disconnected. A new one will be established when it reconnects again
                 }
                 if (event.event == `New Message`) {

+ 6 - 2
src/interface/ITransport.interface.ts

@@ -5,8 +5,12 @@ export type TransportManagerEvent = 'New Client Connection' | 'New Client Reques
 export type TransportEvent = 'Connection' | 'New Message' | 'Disconnection'
 export type TransportType = 'WEBSOCKET' | 'GRPC' | 'HTTP' | 'KAFKA' | undefined
 
-export interface TransportManager {
+export interface TransportManagerInterface {
     getTransportManagerEventNotification(): Observable<TransportManagerEventNotification>
+    // This is for acting as a transmitter
+    emit(message: TransportMessage): void
+    // For acting as a receiver to send a request. 
+    transmit(request: TransportMessage): Observable<TransportMessage>
 }
 export interface ITransport {
     getTransportEventNotification(): Observable<TransportEventNotification>
@@ -17,7 +21,7 @@ export interface ITransportTransmitting extends ITransport {
 }
 
 export interface ITransportReceiving extends ITransport {
-    receive(): Observable<TransportMessage>
+    // additional properties
 }
 
 export interface ITransportSuper extends ITransportTransmitting, ITransportReceiving {

+ 58 - 0
src/interface/connector.interface.ts

@@ -0,0 +1,58 @@
+import { Observable } from "rxjs"
+import { FisAppActor } from "./transport.interface"
+
+
+export type TYPE = {
+    adapterProfile: AdapterProfile,
+    transmitterProfile: TransmitterProfile,
+    receiverProfile: ReceiverProfile
+}
+
+export interface AdapterProfile {
+
+}
+
+export interface TransmitterProfile {
+    id: string,
+    name: string,
+    dateCreated: Date
+}
+
+export interface ReceiverProfile {
+    id: string,
+    name: string,
+    dateCreated: Date
+}
+
+export interface ConnectionManager {
+
+}
+
+export interface ConnectionAdaptorBase extends FisAppActor {
+    connector: any
+    connectorProfile: AdapterProfile | any
+    connectionState: ConnectionState
+    connectionStateBus: Observable<ConnectionState>
+    adaptorTransmissionRole: AdaptorTransmissionRole
+
+    subscribeConnectionState(): Observable<ConnectionState>
+    publishConnectionState(): void
+    connect(): void
+    disconnect(): void
+    send(message: any): Observable<any>
+    emit(message: any): void
+    emitStream(message: any): void
+    subscribeMessages(messageFilter: any): Observable<any>
+}
+
+
+export interface TransmitterAdapterBase extends ConnectionAdaptorBase {
+
+}
+
+export type ConnectionState = 'ONLINE' | 'OFFLINE'
+export enum AdaptorTransmissionRole {
+    Transmitter,
+    Receiever,
+    RequestResponse
+}

+ 50 - 0
src/interface/transport.interface.ts

@@ -0,0 +1,50 @@
+import { Observable, Subject } from "rxjs";
+import { AdaptorTransmissionRole, ReceiverProfile, TransmitterProfile } from "./connector.interface";
+
+export interface FisAppActor {
+    incomingMessageBus: Subject<any>
+    outgoingMessageBus: Subject<any>
+
+    send(message: any): Observable<any>
+    emit(message: any): void
+    emitStream(message: any): void
+    subscribeMessages(messageFilter: any): Observable<any>
+}
+
+export interface MessageTransmissionManager {
+    // what the hell is this here for
+}
+
+
+export interface MessageTransmissionBase extends FisAppActor {
+    transmitterProfile: TransmitterProfile
+    receiverProfile: ReceiverProfile
+    msgRepositoryService: any
+    transmissionRole: AdaptorTransmissionRole
+    adaptorsArray: any[]
+    transmissionService: any
+    adapterService: any
+
+    setTransmitter(transmitterProfile: TransmitterProfile): void
+    setReceiver(receiverProfile: ReceiverProfile): Promise<any>
+}
+
+export interface MsgReceiver extends MessageTransmissionBase {
+
+}
+
+export interface MsgTransmitter extends MessageTransmissionBase {
+
+}
+
+export interface MsgRequestResponse extends MessageTransmissionBase, MsgReceiver {
+
+}
+
+export interface FisMessage {
+    header: {
+        messageID: string,
+        messageName: `NotificationMessage` | `ResponseMessage` | `RequestMessage`
+    },
+    data: any
+}

+ 0 - 114
src/test/receiver.ts

@@ -1,114 +0,0 @@
-import { interval, map, Observable, Observer, Subject } from "rxjs"
-import { v4 as uuidv4 } from 'uuid'
-import { FisMessage, TransportManager } from "../transport/transport.manager"
-import { ReceiverRetransmissionProfile, TransportManagerEventNotification } from "../interface/ITransport.interface"
-import { RetransmissionService } from "../utils/retransmission.service"
-
-// need to instantiate to grab both incoming and outgoing
-
-class Application {
-    service!: Blue
-    notification = new Subject<any>()
-    constructor(service: Blue) {
-        this.service = service
-
-        // this one will directly send over 
-        // this.subscribeNotification().subscribe(notification => this.service.transportManager.send(notification))
-        this.notify()
-    }
-
-    subscribeNotification(): Observable<FisMessage> {
-        return this.notification.asObservable()
-    }
-
-    notify() {
-        // logic here
-        let output = interval(1000)
-        output.pipe(map(
-            value => {
-                let message: FisMessage = {
-                    header: {
-                        messageID: uuidv4(),
-                        messageName: `NotificationMessage`
-                    },
-                    data: `Notification Message ${value}`
-                }
-                return message
-            }
-        )).subscribe(this.notification)
-    }
-
-    /* EXTRA. Appplication acting as a receiver */
-    processRequest(request: FisMessage): Observable<FisMessage> {
-        return new Observable((response) => {
-            // process responses. Should be called by Blue after receiving a request
-        })
-    }
-    // This two only exist at this level. 
-    send(message: FisMessage): Observable<FisMessage> {
-        return new Observable((response) => {
-            // logic here
-        })
-    }
-}
-
-/* At the highest level, blue will also need to know who the receiver is so that it can establsih said one to one relationsuhip */
-class Blue {
-    transportManager!: TransportManager
-    receiverProfiles: ReceiverRetransmissionProfile[] = []
-    event: Subject<TransportManagerEventNotification> = new Subject()
-    constructor(transportManager: TransportManager) {
-        this.transportManager = transportManager
-        this.subscribeForTransportManagerEvent()
-        this.manageReceiverCommunication()
-    }
-
-    private subscribeForTransportManagerEvent(): void {
-        this.transportManager.getTransportManagerEventNotification().subscribe({
-            next: (notification: TransportManagerEventNotification) => {
-                this.event.next(notification)
-            }
-        })
-    }
-
-    private manageReceiverCommunication(): void {
-        this.event.subscribe((notification: TransportManagerEventNotification) => {
-            if (notification.event == 'New Client Connection') {
-                // first thing first, it will check if it's a previously connected receiver
-                this.checkIfPreviouslyConnected(notification.data?.receiverID as string) // but this id is assigned at connection though So that means the receiver must self identify
-                let adjustedReceiverProfile: ReceiverRetransmissionProfile = notification.data?.payload
-                adjustedReceiverProfile.buffer = new RetransmissionService()
-                // retransmission to be activated. Now it's just assigning it there, but idling.
-                this.receiverProfiles.push(adjustedReceiverProfile)
-                console.log(`BLUE: Added new Receiver Profile ${notification.data?.receiverID} to record. `)
-            }
-            if (notification.event == 'Client Disconnected') {
-                console.log(`BLUE: Client ${notification.data?.receiverID} disconnected. Buffering....`)
-                let receiverProfile: ReceiverRetransmissionProfile | undefined = this.receiverProfiles.find(obj => obj.uuid === notification.data?.receiverID)
-                if (receiverProfile) {
-                    // then notifiy retransmission to stop releasing the buffer 
-                }
-            }
-            if (notification.event == 'New Client Request') {
-                this.manageReceiverRequest(notification.data?.payload)
-                console.log(`BLUE: Client request ${notification.data?.payload?.header?.messageID ?? `messageID undefined`} received.`)
-            }
-        })
-    }
-
-    private manageReceiverRequest(request: FisMessage): void {
-
-    }
-
-    private checkIfPreviouslyConnected(receiverID: string): boolean {
-        return this.receiverProfiles.some(obj => obj.uuid === receiverID)
-    }
-
-
-}
-
-
-const application = new Application(new Blue(new TransportManager(undefined, 'http://localhost:3000')))
-
-/* Transmitter acting as a receiver */
-// logic here

+ 33 - 100
src/test/transmitter.ts

@@ -1,116 +1,49 @@
-import { interval, map, Observable, Observer, Subject } from "rxjs"
-import { v4 as uuidv4 } from 'uuid'
-import { FisMessage, TransportManager } from "../transport/transport.manager"
-import { ReceiverProfile, ReceiverRetransmissionProfile, TransportEventNotification, TransportManagerEventNotification, TransportMessage } from "../interface/ITransport.interface"
-import { error } from "console"
-import { RetransmissionService } from "../utils/retransmission.service"
+import { Observable, Subject } from "rxjs";
+import { FisAppActor, FisMessage } from "../interface/transport.interface";
+import { MessageTransmissionBase } from "../transmission/msg.transmission.base";
+import dotenv from 'dotenv';
 
-// need to instantiate to grab both incoming and outgoing
+/*  These are the purple fonts. Gonna interact with blue fonts to set up the credentials to establish the necessary roles.
+Assuming the primary role is server. That means we will need transmitter and multiple receiver profiles that are connected. */
+class Application implements FisAppActor {
+    incomingMessageBus!: Subject<FisMessage>
+    outgoingMessageBus!: Subject<FisMessage>
+    transmissionService!: MessageTransmissionBase
 
-class Application {
-    service!: Blue
-    notification = new Subject<any>()
-    constructor(service: Blue) {
-        this.service = service
-
-        // this one will directly send over 
-        // this.notify()
-    }
-
-    subscribeNotification(): Observable<FisMessage> {
-        return this.notification.asObservable()
-    }
-
-    notify() {
-        // logic here
-        let output = interval(1000)
-        output.pipe(map(
-            value => {
-                let message: FisMessage = {
-                    header: {
-                        messageID: uuidv4(),
-                        messageName: `NotificationMessage`
-                    },
-                    data: `Notification Message ${value}`
-                }
-                return message
-            }
-        )).subscribe(this.notification)
+    constructor(messageTransmissionBase: MessageTransmissionBase) {
+        this.transmissionService = messageTransmissionBase
+        this.incomingMessageBus = messageTransmissionBase.incomingMessageBus
+        this.outgoingMessageBus = messageTransmissionBase.outgoingMessageBus
     }
 
-    /* EXTRA. Appplication acting as a receiver */
-    processRequest(request: FisMessage): Observable<FisMessage> {
-        return new Observable((response) => {
-            // process responses. Should be called by Blue after receiving a request
-        })
-    }
-    // This two only exist at this level. 
     send(message: FisMessage): Observable<FisMessage> {
         return new Observable((response) => {
-            // logic here
-        })
-    }
-}
-
-/* At the highest level, blue will also need to know who the receiver is so that it can establsih said one to one relationsuhip */
-class Blue {
-    transportManager!: TransportManager
-    receiverProfiles: ReceiverRetransmissionProfile[] = []
-    event: Subject<TransportManagerEventNotification> = new Subject()
-    constructor(transportManager: TransportManager) {
-        this.transportManager = transportManager
-        this.subscribeForTransportManagerEvent()
-        this.manageReceiverCommunication()
-    }
-
-    private subscribeForTransportManagerEvent(): void {
-        this.transportManager.getTransportManagerEventNotification().subscribe({
-            next: (notification: TransportManagerEventNotification) => {
-                this.event.next(notification)
-            }
+            this.outgoingMessageBus.next(message)
+            this.incomingMessageBus.subscribe({
+                next: (message: FisMessage) => {
+                    if (message.header.messageID == message.header.messageID) {
+                        response.next(message)
+                    }
+                    if (message.header.messageID == message.header.messageID && message.data == 'Complete') {
+                        response.complete()
+                    }
+                },
+                error: error => response.error(error)
+            })
         })
     }
 
-    private manageReceiverCommunication(): void {
-        this.event.subscribe((notification: TransportManagerEventNotification) => {
-            if (notification.event == 'New Client Connection') {
-                // first thing first, it will check if it's a previously connected receiver
-                this.checkIfPreviouslyConnected(notification.data?.receiverID as string) // but this id is assigned at connection though So that means the receiver must self identify
-                let adjustedReceiverProfile: ReceiverRetransmissionProfile = notification.data?.payload
-                adjustedReceiverProfile.buffer = new RetransmissionService()
-                // retransmission to be activated. Now it's just assigning it there, but idling.
-                this.receiverProfiles.push(adjustedReceiverProfile)
-                console.log(`BLUE: Added new Receiver Profile ${notification.data?.receiverID} to record. `)
-            }
-            if (notification.event == 'Client Disconnected') {
-                console.log(`BLUE: Client ${notification.data?.receiverID} disconnected. Buffering....`)
-                let receiverProfile: ReceiverRetransmissionProfile | undefined = this.receiverProfiles.find(obj => obj.uuid === notification.data?.receiverID)
-                if (receiverProfile) {
-                    // then notifiy retransmission to stop releasing the buffer 
-                }
-            }
-            if (notification.event == 'New Client Request') {
-                this.manageReceiverRequest(notification.data?.payload)
-                console.log(`BLUE: Client request ${notification.data?.payload?.header?.messageID ?? `messageID undefined`} received.`)
-            }
-        })
+    emit(message: FisMessage): void {
+        this.outgoingMessageBus.next(message)
     }
 
-    private manageReceiverRequest(request: FisMessage): void {
-
+    emitStream(message: FisMessage): void {
+        this.outgoingMessageBus.next(message)
     }
 
-    private checkIfPreviouslyConnected(receiverID: string): boolean {
-        return this.receiverProfiles.some(obj => obj.uuid === receiverID)
+    subscribeMessages(messageFilter: any): Observable<FisMessage> {
+        throw new Error(`Unavailable for now....`)
     }
-
-
 }
 
-
-const application = new Application(new Blue(new TransportManager(3000)))
-// const application = new Application(new Blue(new TransportManager(null, 'http://localhost:3000')))
-// const application = new Application(new Blue(new TransportManager(3000, 'http://localhost:3000')))
-
-/* Transmitter acting as a receiver */
-// logic here
+const application = new Application(new MessageTransmissionBase())

+ 53 - 0
src/transmission/msg.transmission.base.ts

@@ -0,0 +1,53 @@
+
+import { Observable, Subject } from 'rxjs';
+import { TransmitterProfile, ReceiverProfile, AdaptorTransmissionRole } from '../interface/connector.interface';
+import { MessageTransmissionBase as MessageTransmissionBaseInterface } from '../interface/transport.interface'
+import { v4 as uuidv4 } from 'uuid'
+
+export class MessageTransmissionBase implements MessageTransmissionBaseInterface {
+    transmitterProfile!: TransmitterProfile;
+    receiverProfile!: ReceiverProfile;
+    msgRepositoryService: any;
+    transmissionRole!: AdaptorTransmissionRole;
+    adaptorsArray!: any[];
+    transmissionService: any;
+    adapterService: any;
+    incomingMessageBus!: Subject<any>;
+    outgoingMessageBus!: Subject<any>;
+
+    constructor() {
+        this.setTransmitter({
+            id: uuidv4(),
+            name: 'Transmitter' + process.env.PORT,
+            dateCreated: new Date()
+        })
+        this.setReceiver({
+            id: uuidv4(),
+            name: 'Receiver',
+            dateCreated: new Date()
+        })
+    }
+
+    // Pretty straight forward. Set itself straight away.
+    setTransmitter(transmitterProfile: TransmitterProfile): void {
+        this.transmitterProfile = transmitterProfile
+    }
+
+    setReceiver(receiverProfile: ReceiverProfile): Promise<any> {
+        throw new Error('Method not implemented.');
+    }
+
+    send(message: any): Observable<any> {
+        throw new Error('Method not implemented.');
+    }
+    emit(message: any): void {
+        throw new Error('Method not implemented.');
+    }
+    emitStream(message: any): void {
+        throw new Error('Method not implemented.');
+    }
+    subscribeMessages(messageFilter: any): Observable<any> {
+        throw new Error('Method not implemented.');
+    }
+
+}

+ 6 - 0
src/transmission/msg.transmission.manager.ts

@@ -0,0 +1,6 @@
+
+
+
+export class MessageTransmissionManager {
+    
+}

+ 2 - 2
src/transport/gRPC.ts

@@ -1,7 +1,7 @@
 import { Observable } from "rxjs";
-import { ITransport, ReceiverProfile, TransmitterProfile, TransportEventNotification } from "../interface/ITransport.interface";
+import { TransportEventNotification } from "../interface/ITransport.interface";
 
-export class GrpcTransportService implements ITransport {
+export class GrpcTransportService {
     getTransportEventNotification(): Observable<TransportEventNotification> {
         throw new Error("Method not implemented.");
     }

+ 10 - 0
src/transport/http.ts

@@ -0,0 +1,10 @@
+import { Observable } from "rxjs";
+import { TransportEventNotification } from "../interface/ITransport.interface";
+
+export class HttpTransportService {
+    getTransportEventNotification(): Observable<TransportEventNotification> {
+        throw new Error("Method not implemented.");
+    }
+
+
+}

+ 9 - 6
src/transport/websocket.ts

@@ -2,12 +2,14 @@ import { Observable, Subject } from "rxjs";
 import { Socket as ClientSocket } from "socket.io-client";
 import { Socket as SocketForConnectedClient } from "socket.io"
 import { handleClientSocketConnection, handleNewSocketClient, startClientSocketConnection, startSocketServer } from "../utils/socket.utils";
-import { ITransportReceiving, ITransportTransmitting, ReceiverProfile, TransmitterProfile, TransportEventNotification, TransportMessage, TransportSettings } from "../interface/ITransport.interface";
+import { ReceiverProfile, TransportEventNotification, TransportMessage, TransportSettings } from "../interface/ITransport.interface";
+import { FisMessage } from "../connector/connector";
 
 /* Just code in the context that this websocket service will be handling multiple UI clients. Can think about the server communication at a later time. */
-export class WebsocketTransportService implements ITransportTransmitting, ITransportReceiving {
+export class WebsocketTransportService {
+    private selfSocket!: ClientSocket
     private socketReceiverProfile: ReceiverProfile[] = []
-    private incomingMessage: Subject<TransportMessage> = new Subject() // this is only for client roles only atm
+    // private incomingMessage: Subject<TransportMessage> = new Subject() // this is only for client roles only atm
     private eventNotification: Subject<TransportEventNotification> = new Subject()
 
     constructor(setting: TransportSettings) {
@@ -23,7 +25,8 @@ export class WebsocketTransportService implements ITransportTransmitting, ITrans
         // this is for those who wants to act as receiver. Usually for web browser, but servers can use this too.
         if (setting.profileInfo.url) {
             startClientSocketConnection(setting.profileInfo.url).then((socket: ClientSocket) => {
-                handleClientSocketConnection(socket, this.incomingMessage).subscribe(this.eventNotification)
+                this.selfSocket = socket
+                handleClientSocketConnection(socket).subscribe(this.eventNotification)
             }).catch((error) => {
                 console.error(`WebsocketTransport ERROR:`, error)
             })
@@ -48,7 +51,7 @@ export class WebsocketTransportService implements ITransportTransmitting, ITrans
     }
 
     // for client UI eg
-    public receive(): Observable<TransportMessage> {
-        return this.incomingMessage.asObservable()
+    public clientEmit(request: FisMessage): void {
+        this.selfSocket.emit(`message`, request)
     }
 }

+ 7 - 7
src/utils/socket.utils.ts

@@ -52,7 +52,7 @@ export async function startClientSocketConnection(serverUrl: string): Promise<Cl
 }
 
 // After establishing connection to the server, set up the credentials, confirm whether or not if there's any credentials, if not ask for one from the server
-export function handleClientSocketConnection(socket: ClientSocket, incomingMessage: Subject<TransportMessage>): Observable<TransportEventNotification> {
+export function handleClientSocketConnection(socket: ClientSocket): Observable<TransportEventNotification> {
     return new Observable((eventNotification: Observer<TransportEventNotification>) => {
         let clientName!: string
         let buffer: any[] = []
@@ -97,12 +97,12 @@ export function handleClientSocketConnection(socket: ClientSocket, incomingMessa
                         payload: msg
                     }
                 })
-                incomingMessage.next({
-                    id: msg.header.MessageID,
-                    receiverID: receiverProfileInfo.uuid,
-                    payload: msg,
-                    event: 'New Message'
-                })
+                // incomingMessage.next({
+                //     id: msg.header.MessageID,
+                //     receiverID: receiverProfileInfo.uuid,
+                //     payload: msg,
+                //     event: 'New Message'
+                // })
             } else {
                 // Do nothing. just store in local array first. Cannot process without information. but then again, don['t need information if acting as client
                 // but for consistency sake, will impose the standard