Browse Source

latest change so far. But none working code

Enzo 1 month ago
parent
commit
9c45bff921

+ 29 - 16
src/interface/ITransport.interface.ts

@@ -1,30 +1,30 @@
-import { Observable } from "rxjs";
+import { Observable, Subject } from "rxjs";
 
+export type TransportEvent = 'Connection' | 'New Message'
+export type TransportType = 'WEBSOCKET' | 'GRPC' | 'HTTP' | 'KAFKA' | undefined
 export interface ITransport {
-    send(message: string): void
-    emit(event: string, data: any): void;
-    getResponse(): Observable<any>
-    getEventNotification(): Observable<any>
+    getEventNotification(): Observable<TransportEventNotification>
 }
 
-export interface ITransportTransmission extends ITransport{
-    // send(message: string): void
-    // emit(event: string, data: any): void;
+export interface ITransportTransmitting extends ITransport {
+    transmit(message: TransportMessage): void
 }
 
-export interface ITransportReceiving extends ITransport{
-    // returnResponse(): Observable<any>
-    // returnEventNotification(): Observable<any>
+export interface ITransportReceiving extends ITransport {
+    receive(): Observable<any>
 }
 
+export interface ITransportSuper extends ITransportTransmitting, ITransportReceiving {
+    // additional properties && methods  here
+}
 export interface TransportSettings {
-    role: 'Server' | 'Client',
     profileInfo: ProfileInfo,
 }
 
 export interface ProfileInfo {
     id: string,
     name: string,
+    transport: TransportType,
     url?: string | null,
     port?: number | null
 }
@@ -33,18 +33,31 @@ export interface TransmitterProfile {
     uuid: string,
     name: string,
     dateCreated: Date,
-    data?: any
+    transportType: TransportType,
+    eventNotification: Subject<TransportEventNotification>,
+    instance?: any
 }
 
 export interface ReceiverProfile {
     uuid: string,
     name: string,
     dateCreated: Date,
-    data?: any
+    transportType: TransportType,
+    eventNotification: Subject<TransportEventNotification>,
+    instance?: any
 }
 
 export interface TransportEventNotification {
-    event: string,
+    event: TransportEvent,
     description: string,
+    transportType: TransportType,
     data?: any
-} 
+}
+
+
+export interface TransportMessage {
+    id: string,
+    receiverID: string,
+    payload: any,
+    event: string
+}

+ 0 - 10
src/test/receiver.ts

@@ -1,10 +0,0 @@
-import { interval, Subject } from "rxjs"
-import { v4 as uuidv4 } from 'uuid'
-import { TransportService } from "../transport/transport.service"
-
-const transmitterService = new TransportService()
-
-
-
-// listen to event notification
-transmitterService.getEventNotification().subscribe(notification => console.log(notification))

+ 73 - 52
src/test/transmitter.ts

@@ -1,55 +1,76 @@
-import { interval, Subject } from "rxjs"
+import { interval, map, Observable, Subject } from "rxjs"
 import { v4 as uuidv4 } from 'uuid'
-import { TransportService } from "../transport/transport.service"
-
-const transmitterService = new TransportService()
-
-// this one to emulate broadcasting notification. Assuming the receiver has subscribed for this
-let count = 0
-let response = interval(1000)
-response.subscribe(eachInterval => {
-    transmitterService.emit({
-        header: {
-            messageID: uuidv4(),
-            messageName: `NotificationMessage`
-        },
-        data: 'just a test Notification'
-    })
-})
-
-// For incoming requests and return data just for testing
-let incomingRequest = new Subject<any>()
-incomingRequest.subscribe(request => {
-    // return 10 messages
-    let count = 0
-    let response = interval(1000)
-    response.subscribe(eachInterval => {
-        count++
-        if (count < 10) {
-            transmitterService.emit({
-                header: {
-                    messageID: request.header.messageID,
-                    messageName: `ResponseMessage`
-                },
-                data: 'just a test Response'
-            })
-        } else {
-            transmitterService.emit({
-                header: {
-                    messageID: request.header.messageID,
-                    messageName: `ResponseMessage`
-                },
-                data: 'Complete'
-            })
-        }
-    })
-})
-
-transmitterService.getIncomingResponse().subscribe(message => {
-    if (message.header.messageName == 'RequestMessage') {
-        incomingRequest.next(message)
+import { FisMessage, TransportManager } from "../transport/transport.manager"
+
+// need to instantiate to grab both incoming and outgoing
+
+class Application {
+    service!: Blue
+    notification = new Subject<any>()
+    constructor(service: Blue) {
+        this.service = service
+
+        // this one will directly send over 
+        // this.subscribeNotification().subscribe(notification => this.service.transportManager.send(notification))
+        this.notify()
+    }
+
+    subscribeNotification(): Observable<FisMessage> {
+        return this.notification.asObservable()
+    }
+
+    notify() {
+        // logic here
+        let output = interval(1000)
+        output.pipe(map(
+            value => {
+                let message: FisMessage = {
+                    header: {
+                        messageID: uuidv4(),
+                        messageName: `NotificationMessage`
+                    },
+                    data: `Notification Message ${value}`
+                }
+                return message
+            }
+        )).subscribe(this.notification)
+    }
+
+    /* EXTRA. Appplication acting as a receiver */
+    processRequest(request: FisMessage): Observable<FisMessage> {
+        return new Observable((response) => {
+            // process responses. Should be called by Blue after receiving a request
+        })
     }
-})
+    // This two only exist at this level. 
+    send(message: FisMessage): Observable<FisMessage> {
+        return new Observable((response) => {
+            // logic here
+        })
+    }
+}
+
+/* Actually this class will acquire a high level information on the receiver as well.
+Because it's the retransmission that instantiated in this layer.
+So blue will work closely with the transport manager to acquire the necessary notifications with regards to the receiver's 
+information and it's status, but it should not concern itself with the intricacies of how the messages will be transmitted over. */
+class Blue {
+    transportManager!: TransportManager
+    receiverProfiles: any[] = []
+    constructor(transportManager: TransportManager) {
+        this.transportManager = transportManager
+
+        this.processIncomingRequest()
+    }
+
+    processIncomingRequest(): void {
+        // prcoess incoming request based on the receiver's profile relevant request
+    }
+
+}
+
+
+const application = new Application(new Blue(new TransportManager(3000, 'http://localhost:3000')))
 
-// listen to event notification
-transmitterService.getEventNotification().subscribe(notification => console.log(notification))
+/* Transmitter acting as a receiver */
+// logic here

+ 12 - 0
src/transport/gRPC.ts

@@ -0,0 +1,12 @@
+import { Observable } from "rxjs";
+import { ITransport, ReceiverProfile, TransmitterProfile, TransportEventNotification } from "../interface/ITransport.interface";
+
+export class GrpcTransportService implements ITransport {
+    getInfo(type: "transmitter" | "receiver", id?: string): TransmitterProfile | ReceiverProfile | null {
+        throw new Error("Method not implemented.");
+    }
+    getEventNotification(): Observable<TransportEventNotification> {
+        throw new Error("Method not implemented.");
+    }
+    
+}

+ 131 - 0
src/transport/transport.manager.ts

@@ -0,0 +1,131 @@
+import { Observable, Subject } from "rxjs";
+import dotenv from 'dotenv';
+import { v4 as uuidv4 } from 'uuid';
+import { WebsocketTransportService } from "./websocket";
+import { ITransportReceiving, ITransportSuper, ITransportTransmitting, ReceiverProfile, TransportEventNotification, TransportMessage, TransportSettings, TransportType } from "../interface/ITransport.interface";
+import { error } from "console";
+dotenv.config();
+/* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
+So how?: */
+export class TransportManager {
+    private incomingMessage: Subject<TransportMessage> = new Subject()
+    private outGoingMessage: Subject<TransportMessage> = new Subject()
+    private responseMessage: Subject<FisMessage> = new Subject()
+    private connectedClients: ReceiverProfile[] = []
+    // this is assuming that the application will do request response, otherwise, just one will do.
+    private transportService!: ITransportSuper
+    private transmittingService!: ITransportTransmitting
+    private receivingService!: ITransportReceiving
+    private serverPort!: number
+    private serverUrl!: string
+
+    constructor(port?: number, url?: string) {
+        /* For web browser ui, because this server can also act as a receiver, not just transmitter as well.  */
+        if (port) {
+            this.serverPort = port
+            this.establishTransportTransmitting(process.env.Transport as TransportType).then((transmission: ITransportTransmitting) => {
+                transmission.getEventNotification().subscribe((notification: TransportEventNotification) => {
+                    if (notification.event == `Connection` && notification.description == 'New Client Connected') {
+                        this.connectedClients.push(notification.data)
+                        this.handleNewReceiver(notification.data)
+                    }
+                })
+            }).catch((error) => console.error(error))
+        }
+        if (url) {
+            this.serverUrl = url
+            this.establishTransportReceiving(process.env.Transport as TransportType).then((receivingService: ITransportReceiving) => {
+                // console.log(receivingService.getInfo('receiver'))
+                receivingService.receive().subscribe(this.incomingMessage)
+            })
+        }
+        if (!port && !url) console.error(`No role has assigned for transport service due to lack of information provided!`)
+        this.processOutputMessage()
+    }
+
+    public send(message: FisMessage): void {
+        this.responseMessage.next(message)
+    }
+
+    private async establishTransportTransmitting(tranportType: TransportType): Promise<ITransportTransmitting> {
+        // console.log(`Setting up ${tranportType.transportName} transmitter`)
+        return new Promise((resolve, reject) => {
+            let setting: TransportSettings = {
+                profileInfo: {
+                    id: uuidv4(),
+                    name: 'Server',
+                    port: this.serverPort,
+                    transport: tranportType
+                }
+            }
+            if (tranportType == 'WEBSOCKET') {
+                this.transportService = new WebsocketTransportService(setting)
+                this.transmittingService = this.transportService
+                resolve(this.transmittingService)
+            } else {
+                reject(`No such Transport Type Exist...`)
+            }
+        })
+    }
+
+    private async establishTransportReceiving(tranportType: TransportType): Promise<any> {
+        return new Promise((resolve, reject) => {
+            // will be sharing one instance, This part may break the code
+            if (tranportType == 'WEBSOCKET') {
+                this.receivingService = this.transportService
+                resolve(this.receivingService)
+            }
+        })
+    }
+
+    // This function will wrap the message to be delivered over to established transport to be sent over
+    private processOutputMessage(): void {
+        this.responseMessage.subscribe({
+            next: (message: FisMessage) => {
+                if (message.header.messageName == 'NotificationMessage') {
+                    // This is just here temporarily. Because The application will be concerned with whom the party is subcribed to, not this transport manager
+                    this.outGoingMessage.next({
+                        id: '',
+                        receiverID: '',
+                        payload: message,
+                        event: `notification`
+                    })
+                }
+                if (message.header.messageName == 'ResponseMessage') {
+                    // Map responses according to the requestID???
+                    this.outGoingMessage.next({
+                        id: '',
+                        receiverID: '',
+                        payload: message,
+                        event: `resposne`
+                    })
+                }
+            }
+        })
+    }
+
+    private transmitWrappedMessages(): void {
+        this.outGoingMessage.subscribe(message => {
+            this.transmittingService.transmit(message)
+        })
+    }
+
+    // Responsibility include subscribing to event notification as well as tranpport instance for message transmission
+    private handleNewReceiver(receiver: ReceiverProfile) {
+        receiver.eventNotification.subscribe({
+            next: event => {
+                // new request will be handled. And then manager will here will pick it up from eventNotification and respond accordingly if there's a need for it
+            },
+            error: error => console.error(error)
+        })
+    }
+}
+
+
+export interface FisMessage {
+    header: {
+        messageName: 'RequestMessage' | 'ResponseMessage' | 'NotificationMessage',
+        messageID: string
+    },
+    data: any
+}

+ 0 - 93
src/transport/transport.service.ts

@@ -1,93 +0,0 @@
-import { Observable, Subject } from "rxjs";
-import dotenv from 'dotenv';
-import { v4 as uuidv4 } from 'uuid';
-import { WebsocketTransportService } from "./websocket";
-import { ITransport, TransportSettings } from "../interface/ITransport.interface";
-dotenv.config();
-/* The application doesn't care at this point whether or not what transport to use or what role. it just wants to send request and expecting
-response as well as emitting notification and receiving notification from concerned parties. All the managing parts should be here. */
-export class TransportService {
-    private incomingResponse: Subject<any> = new Subject()
-    private eventNotification: Subject<any> = new Subject()
-    // this is assuming that the application will do request response, otherwise, just one will do.
-    private transmittingService!: ITransport
-    private receivingService!: ITransport
-    private serverPort!: number
-    private serverUrl!: string
-
-    constructor(port?: number, url?: string) {
-        // just putting it here for testing for now.
-        if (port) this.serverPort = port
-        if (url) this.serverUrl = url
-        // logic here
-        this.establishTransportTransmission({ transportName: process.env.Transport?.toString() })
-        this.establishTransportReceiving({ transportName: process.env.Transport?.toString() }).then(() => {
-            // pipe the notification from the server into local notification
-            this.receivingService.getEventNotification().subscribe(this.eventNotification)
-        })
-    }
-
-    public getIncomingResponse(): Observable<any> {
-        return this.incomingResponse.asObservable()
-    }
-
-    public getEventNotification(): Observable<any> {
-        return this.eventNotification.asObservable()
-    }
-
-    public send(message: any): Observable<any> {
-        return new Observable((response) => {
-            this.transmittingService.send(message)
-            /// demultiplex here, assuming all responses are piped into this.incomingResponse
-            this.incomingResponse.subscribe(responseMsg => {
-                if (responseMsg.id == message.id) {
-                    response.next(responseMsg)
-                } // don't forget to add complete message to close out this function
-            })
-        })
-    }
-
-    public emit(message: any): void {
-        this.transmittingService.emit('notification', message)
-    }
-
-    private async establishTransportTransmission(tranportType: TranportType): Promise<any> {
-        console.log(`Setting up ${tranportType.transportName} transmitter`)
-        return new Promise((resolve, reject) => {
-            let setting: TransportSettings = {
-                role: 'Server',
-                profileInfo: {
-                    id: uuidv4(),
-                    name: 'Server',
-                    port: this.serverPort ?? 3000
-                }
-            }
-            if (tranportType.transportName == 'WEBSOCKET') {
-                this.transmittingService = new WebsocketTransportService(setting)
-                resolve(this.transmittingService)
-            }
-        })
-    }
-
-    private async establishTransportReceiving(tranportType: TranportType): Promise<any> {
-        console.log(`Setting up ${tranportType.transportName} Receiver`)
-        return new Promise((resolve, reject) => {
-            let setting: TransportSettings = {
-                role: 'Client',
-                profileInfo: {
-                    id: uuidv4(),
-                    name: 'Client',
-                    url: this.serverUrl ?? 'http://localhost:3000'
-                }
-            }
-            if (tranportType.transportName == 'WEBSOCKET') {
-                this.receivingService = new WebsocketTransportService(setting)
-                resolve(this.receivingService)
-            }
-        })
-    }
-}
-
-export interface TranportType {
-    transportName: 'WEBSOCKET' | 'GRPC' | 'HTTP' | 'KAFKA' | string | undefined
-}

+ 64 - 69
src/transport/websocket.ts

@@ -1,85 +1,80 @@
 import { Observable, Subject } from "rxjs";
-import { io, Socket, Socket as SocketClient } from "socket.io-client";
-import { Socket as SocketServer } from "socket.io"
+import { Socket, Socket as ClientSocket } from "socket.io-client";
+import { Socket as SocketForConnectedClient } from "socket.io"
 import { startClientSocketConnection, startSocketServer } from "../utils/socket.utils";
-import { ITransport, ReceiverProfile, TransportEventNotification, TransportSettings } from "../interface/ITransport.interface";
-import { error } from "console";
+import { ITransportReceiving, ITransportTransmitting, ReceiverProfile, TransmitterProfile, TransportEventNotification, TransportMessage, TransportSettings } from "../interface/ITransport.interface";
 import { v4 as uuidv4 } from 'uuid'
 
-export class WebsocketTransportService implements ITransport {
-    private websocketRole!: 'Server' | 'Client' | null
-    private clientSocket!: SocketClient
-    private connectedClient: ReceiverProfile[] = []
-    private responseSubject: Subject<any> = new Subject()
+/* Just code in the context that this websocket service will be handling multiple UI clients. Can think about the server communication at a later time. */
+export class WebsocketTransportService implements ITransportTransmitting, ITransportReceiving {
+    private transmitterProfile!: TransmitterProfile | null
+    private socketReceiverProfile: ReceiverProfile[] = []
+    private clientSocket!: ClientSocket
+    private incomingMessage: Subject<TransportMessage> = new Subject()
     private eventNotification: Subject<TransportEventNotification> = new Subject()
 
     constructor(setting: TransportSettings) {
-        this.websocketRole == setting.role
-        if (setting.role == 'Server') {
-            startSocketServer(setting.profileInfo.port as number).subscribe({
-                next: (connectedClient: SocketServer) => {
-                    // returns the socket client instance 
-                    let receiverProfile: ReceiverProfile = {
-                        uuid: uuidv4(),
-                        name: `Client`,
-                        dateCreated: new Date(),
-                        data: connectedClient
-                    }
-                    this.eventNotification.next({ event: 'WebsocketClientConnection', description: 'New Client Connected', data: receiverProfile })
-                    this.connectedClient.push(receiverProfile)
-
-                    // put here first, but can me mgirated to other parts of the code
-                    connectedClient.on('disconnect', () => {
-                        this.eventNotification.next({ event: 'WebscoketClientConnection', description: `Existing Client ${connectedClient.id} disonnected`, data: receiverProfile })
-                    })
-                },
-                error: error => console.error(error),
-                complete: () => { } // should not complete. Keep listening to new client connection
-            })
-        }
-        // just focus on receiving. 
-        if (setting.role == 'Client') {
-            startClientSocketConnection(setting.profileInfo.url as string).then((client: Socket) => {
-                this.clientSocket = client
-                // Need to open to listen to incoming responses
-                this.clientSocket.on('message', (message) => {
-                    // console.log(message)
-                    this.responseSubject.next(message)
-                })
-                this.clientSocket.on('notification', (notification) => {
-                    // console.log(notification)
-                    if (notification.header.messageName == 'NotificationMessage') {
-                        this.eventNotification.next({ event: 'Server Notification', description: 'Notification from server', data: notification })
-                    }
-                })
-                this.clientSocket.on('disconnect', () => {
-                    this.eventNotification.next({ event: 'Websocket Client Connection Status', description: 'Server disconnected' })
-                })
-            })
-        }
+        startSocketServer(setting.profileInfo.port as number).subscribe({
+            next: (connectedClient: SocketForConnectedClient) => {
+                this.handleNewSocketClient(connectedClient)
+            },
+            error: error => console.error(error),
+        })
     }
 
-    // to be used to emulate request response. But only for sending request, since all the response are to be redirected to another output
-    send(message: any, target?: string): void {
-        let socket = this.connectedClient.find(object => object.uuid === target)
-        if (socket) {
-            socket.data.emit('message', message)
-        }
+    // for transmission(Server Only, not applicable for client Socket)
+    public async transmit(message: TransportMessage): Promise<string> {
+        return new Promise((resolve, reject) => {
+            let receiverInstance = this.socketReceiverProfile.find(obj => obj.uuid === message.receiverID);
+            if (receiverInstance) {
+                (receiverInstance?.instance as SocketForConnectedClient).emit(message.event, message.payload)
+                resolve('Message called for delivery...')
+            } else {
+                reject(`Receiver cannot be found or doesn't exist...`)
+            }
+        })
     }
 
-    emit(event: string, message: any): void {
-        // for now just assume everyone is subscribed to this broacdcast
-        this.connectedClient.forEach(client => {
-            client.data.emit(event, message)
-        });
+    public getEventNotification(): Observable<TransportEventNotification> {
+        return this.eventNotification as Observable<TransportEventNotification>
     }
 
-    getResponse(): Observable<any> {
-        return this.responseSubject.asObservable()
-    }
+    // Set up socket listeners to start listening for different events
+    private handleNewSocketClient(socket: SocketForConnectedClient): void {
+        console.log(`Setting up listeners for socket:${socket.id}`)
+        // returns the socket client instance 
+        let receiverProfile: ReceiverProfile = {
+            uuid: uuidv4(),
+            name: `Client`,
+            dateCreated: new Date(),
+            transportType: `WEBSOCKET`,
+            eventNotification: new Subject(),
+            instance: socket
+        }
+        this.eventNotification.next({ event: 'Connection', description: 'New Client Connected', transportType: `WEBSOCKET`, data: receiverProfile })
+        this.socketReceiverProfile.push(receiverProfile)
 
-    getEventNotification(): Observable<TransportEventNotification> {
-        return this.eventNotification as Observable<TransportEventNotification>
+        /* Generally, we don't need this unless in the case of being the receiver */
+        socket.on('message', (message: any) => {
+            // here
+        })
+
+        socket.on('request', (request: any) => {
+            // here
+        })
+
+        socket.on('notification', (notification: any) => {
+            // logic here
+        })
+
+        socket.on('disconnect', () => {
+            receiverProfile.eventNotification.next({ event: 'Connection', description: `Existing Client ${socket.id} disonnected`, transportType: `WEBSOCKET`, data: receiverProfile })
+        })
     }
 
-}
+
+    // for client UI eg
+    public receive(): Observable<any> {
+        return this.incomingMessage.asObservable()
+    }
+}

+ 2 - 2
src/utils/socket.utils.ts

@@ -1,9 +1,9 @@
 import { Observable } from 'rxjs';
 import { createServer } from 'http';
-import { Server, Socket as ServerSocket } from 'socket.io';
+import { Server, Socket as SocketForThisClient } from 'socket.io';
 import { io, Socket as ClientSocket } from 'socket.io-client';
 
-export function startSocketServer(port: number): Observable<ServerSocket> {
+export function startSocketServer(port: number): Observable<SocketForThisClient> {
     return new Observable((observer) => {
         try {
             let httpServer = createServer();