Explorar o código

websocket service and transport service

Enzo hai 1 mes
pai
achega
07dab0df4a

+ 1 - 0
.env

@@ -0,0 +1 @@
+Transport = "WEBSOCKET"

+ 32 - 0
package-lock.json

@@ -9,6 +9,7 @@
       "version": "1.0.0",
       "license": "ISC",
       "dependencies": {
+        "dotenv": "^16.4.5",
         "express": "^4.21.0",
         "rxjs": "^7.8.1",
         "socket.io": "^4.8.0",
@@ -18,6 +19,7 @@
       "devDependencies": {
         "@types/express": "^5.0.0",
         "@types/node": "^22.7.4",
+        "@types/uuid": "^10.0.0",
         "typescript": "^5.6.2"
       }
     },
@@ -140,6 +142,13 @@
         "@types/send": "*"
       }
     },
+    "node_modules/@types/uuid": {
+      "version": "10.0.0",
+      "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-10.0.0.tgz",
+      "integrity": "sha512-7gqG38EyHgyP1S+7+xomFtL+ZNHcKv6DwNaCZmJmo1vgMugyF3TCnXVg4t1uk89mLNwnLtnY3TpOpCOyp1/xHQ==",
+      "dev": true,
+      "license": "MIT"
+    },
     "node_modules/accepts": {
       "version": "1.3.8",
       "resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.8.tgz",
@@ -337,6 +346,18 @@
         "npm": "1.2.8000 || >= 1.4.16"
       }
     },
+    "node_modules/dotenv": {
+      "version": "16.4.5",
+      "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-16.4.5.tgz",
+      "integrity": "sha512-ZmdL2rui+eB2YwhsWzjInR8LldtZHGDoQ1ugH85ppHKwpUHL7j7rN0Ti9NCnGiQbhaZ11FpR+7ao1dNsmduNUg==",
+      "license": "BSD-2-Clause",
+      "engines": {
+        "node": ">=12"
+      },
+      "funding": {
+        "url": "https://dotenvx.com"
+      }
+    },
     "node_modules/ee-first": {
       "version": "1.1.1",
       "resolved": "https://registry.npmjs.org/ee-first/-/ee-first-1.1.1.tgz",
@@ -1290,6 +1311,12 @@
         "@types/send": "*"
       }
     },
+    "@types/uuid": {
+      "version": "10.0.0",
+      "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-10.0.0.tgz",
+      "integrity": "sha512-7gqG38EyHgyP1S+7+xomFtL+ZNHcKv6DwNaCZmJmo1vgMugyF3TCnXVg4t1uk89mLNwnLtnY3TpOpCOyp1/xHQ==",
+      "dev": true
+    },
     "accepts": {
       "version": "1.3.8",
       "resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.8.tgz",
@@ -1420,6 +1447,11 @@
       "resolved": "https://registry.npmjs.org/destroy/-/destroy-1.2.0.tgz",
       "integrity": "sha512-2sJGJTaXIIaR1w4iJSNoN0hnMY7Gpc/n8D4qSCJw8QqFWXf7cuAgnEHxBpweaVcPevC2l3KpjYCx3NypQQgaJg=="
     },
+    "dotenv": {
+      "version": "16.4.5",
+      "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-16.4.5.tgz",
+      "integrity": "sha512-ZmdL2rui+eB2YwhsWzjInR8LldtZHGDoQ1ugH85ppHKwpUHL7j7rN0Ti9NCnGiQbhaZ11FpR+7ao1dNsmduNUg=="
+    },
     "ee-first": {
       "version": "1.1.1",
       "resolved": "https://registry.npmjs.org/ee-first/-/ee-first-1.1.1.tgz",

+ 6 - 2
package.json

@@ -6,20 +6,24 @@
   "scripts": {
     "test": "echo \"Error: no test specified\" && exit 1",
     "build": "tsc",
-    "start": "node dist/index.js"
+    "start": "node dist/index.js",
+    "transmit": "node dist/test/transmitter.js",
+    "receiver": "node dist/test/receiver.js"
   },
   "author": "",
   "license": "ISC",
   "devDependencies": {
     "@types/express": "^5.0.0",
     "@types/node": "^22.7.4",
+    "@types/uuid": "^10.0.0",
     "typescript": "^5.6.2"
   },
   "dependencies": {
+    "dotenv": "^16.4.5",
     "express": "^4.21.0",
     "rxjs": "^7.8.1",
     "socket.io": "^4.8.0",
     "socket.io-client": "^4.8.0",
     "uuid": "^10.0.0"
   }
-}
+}

+ 1 - 1
src/index.ts

@@ -1 +1 @@
-console.log(`Hellow World`)
+console.log(`Hello there. I am general obi wan kenobi`)

+ 50 - 0
src/interface/ITransport.interface.ts

@@ -0,0 +1,50 @@
+import { Observable } from "rxjs";
+
+export interface ITransport {
+    send(message: string): void
+    emit(event: string, data: any): void;
+    getResponse(): Observable<any>
+    getEventNotification(): Observable<any>
+}
+
+export interface ITransportTransmission extends ITransport{
+    // send(message: string): void
+    // emit(event: string, data: any): void;
+}
+
+export interface ITransportReceiving extends ITransport{
+    // returnResponse(): Observable<any>
+    // returnEventNotification(): Observable<any>
+}
+
+export interface TransportSettings {
+    role: 'Server' | 'Client',
+    profileInfo: ProfileInfo,
+}
+
+export interface ProfileInfo {
+    id: string,
+    name: string,
+    url?: string | null,
+    port?: number | null
+}
+
+export interface TransmitterProfile {
+    uuid: string,
+    name: string,
+    dateCreated: Date,
+    data?: any
+}
+
+export interface ReceiverProfile {
+    uuid: string,
+    name: string,
+    dateCreated: Date,
+    data?: any
+}
+
+export interface TransportEventNotification {
+    event: string,
+    description: string,
+    data?: any
+} 

+ 0 - 22
src/interface/transport.interface.ts

@@ -1,22 +0,0 @@
-import { Observable } from "rxjs";
-
-export interface TransportInterface {
-    send(message: any): Observable<any>
-
-    emit(message: any): void
-
-    getTransportEventNotification(): Observable<any>
-}
-
-
-export interface TransportSettings {
-    role: 'Server' | 'Client',
-    profileInfo: ProfileInfo,
-}
-
-export interface ProfileInfo {
-    id: string,
-    name: string,
-    url?: string | null,
-    port?: number | null
-}

+ 10 - 0
src/test/receiver.ts

@@ -0,0 +1,10 @@
+import { interval, Subject } from "rxjs"
+import { v4 as uuidv4 } from 'uuid'
+import { TransportService } from "../transport/transport.service"
+
+const transmitterService = new TransportService()
+
+
+
+// listen to event notification
+transmitterService.getEventNotification().subscribe(notification => console.log(notification))

+ 55 - 0
src/test/transmitter.ts

@@ -0,0 +1,55 @@
+import { interval, Subject } from "rxjs"
+import { v4 as uuidv4 } from 'uuid'
+import { TransportService } from "../transport/transport.service"
+
+const transmitterService = new TransportService()
+
+// this one to emulate broadcasting notification. Assuming the receiver has subscribed for this
+let count = 0
+let response = interval(1000)
+response.subscribe(eachInterval => {
+    transmitterService.emit({
+        header: {
+            messageID: uuidv4(),
+            messageName: `NotificationMessage`
+        },
+        data: 'just a test Notification'
+    })
+})
+
+// For incoming requests and return data just for testing
+let incomingRequest = new Subject<any>()
+incomingRequest.subscribe(request => {
+    // return 10 messages
+    let count = 0
+    let response = interval(1000)
+    response.subscribe(eachInterval => {
+        count++
+        if (count < 10) {
+            transmitterService.emit({
+                header: {
+                    messageID: request.header.messageID,
+                    messageName: `ResponseMessage`
+                },
+                data: 'just a test Response'
+            })
+        } else {
+            transmitterService.emit({
+                header: {
+                    messageID: request.header.messageID,
+                    messageName: `ResponseMessage`
+                },
+                data: 'Complete'
+            })
+        }
+    })
+})
+
+transmitterService.getIncomingResponse().subscribe(message => {
+    if (message.header.messageName == 'RequestMessage') {
+        incomingRequest.next(message)
+    }
+})
+
+// listen to event notification
+transmitterService.getEventNotification().subscribe(notification => console.log(notification))

+ 93 - 0
src/transport/transport.service.ts

@@ -0,0 +1,93 @@
+import { Observable, Subject } from "rxjs";
+import dotenv from 'dotenv';
+import { v4 as uuidv4 } from 'uuid';
+import { WebsocketTransportService } from "./websocket";
+import { ITransport, TransportSettings } from "../interface/ITransport.interface";
+dotenv.config();
+/* The application doesn't care at this point whether or not what transport to use or what role. it just wants to send request and expecting
+response as well as emitting notification and receiving notification from concerned parties. All the managing parts should be here. */
+export class TransportService {
+    private incomingResponse: Subject<any> = new Subject()
+    private eventNotification: Subject<any> = new Subject()
+    // this is assuming that the application will do request response, otherwise, just one will do.
+    private transmittingService!: ITransport
+    private receivingService!: ITransport
+    private serverPort!: number
+    private serverUrl!: string
+
+    constructor(port?: number, url?: string) {
+        // just putting it here for testing for now.
+        if (port) this.serverPort = port
+        if (url) this.serverUrl = url
+        // logic here
+        this.establishTransportTransmission({ transportName: process.env.Transport?.toString() })
+        this.establishTransportReceiving({ transportName: process.env.Transport?.toString() }).then(() => {
+            // pipe the notification from the server into local notification
+            this.receivingService.getEventNotification().subscribe(this.eventNotification)
+        })
+    }
+
+    public getIncomingResponse(): Observable<any> {
+        return this.incomingResponse.asObservable()
+    }
+
+    public getEventNotification(): Observable<any> {
+        return this.eventNotification.asObservable()
+    }
+
+    public send(message: any): Observable<any> {
+        return new Observable((response) => {
+            this.transmittingService.send(message)
+            /// demultiplex here, assuming all responses are piped into this.incomingResponse
+            this.incomingResponse.subscribe(responseMsg => {
+                if (responseMsg.id == message.id) {
+                    response.next(responseMsg)
+                } // don't forget to add complete message to close out this function
+            })
+        })
+    }
+
+    public emit(message: any): void {
+        this.transmittingService.emit('notification', message)
+    }
+
+    private async establishTransportTransmission(tranportType: TranportType): Promise<any> {
+        console.log(`Setting up ${tranportType.transportName} transmitter`)
+        return new Promise((resolve, reject) => {
+            let setting: TransportSettings = {
+                role: 'Server',
+                profileInfo: {
+                    id: uuidv4(),
+                    name: 'Server',
+                    port: this.serverPort ?? 3000
+                }
+            }
+            if (tranportType.transportName == 'WEBSOCKET') {
+                this.transmittingService = new WebsocketTransportService(setting)
+                resolve(this.transmittingService)
+            }
+        })
+    }
+
+    private async establishTransportReceiving(tranportType: TranportType): Promise<any> {
+        console.log(`Setting up ${tranportType.transportName} Receiver`)
+        return new Promise((resolve, reject) => {
+            let setting: TransportSettings = {
+                role: 'Client',
+                profileInfo: {
+                    id: uuidv4(),
+                    name: 'Client',
+                    url: this.serverUrl ?? 'http://localhost:3000'
+                }
+            }
+            if (tranportType.transportName == 'WEBSOCKET') {
+                this.receivingService = new WebsocketTransportService(setting)
+                resolve(this.receivingService)
+            }
+        })
+    }
+}
+
+export interface TranportType {
+    transportName: 'WEBSOCKET' | 'GRPC' | 'HTTP' | 'KAFKA' | string | undefined
+}

+ 57 - 30
src/transport/websocket.ts

@@ -2,57 +2,84 @@ import { Observable, Subject } from "rxjs";
 import { io, Socket, Socket as SocketClient } from "socket.io-client";
 import { Socket as SocketServer } from "socket.io"
 import { startClientSocketConnection, startSocketServer } from "../utils/socket.utils";
-import { TransportInterface, TransportSettings } from "../interface/transport.interface";
+import { ITransport, ReceiverProfile, TransportEventNotification, TransportSettings } from "../interface/ITransport.interface";
+import { error } from "console";
+import { v4 as uuidv4 } from 'uuid'
 
-export class WebsocketTransport implements TransportInterface {
+export class WebsocketTransportService implements ITransport {
     private websocketRole!: 'Server' | 'Client' | null
     private clientSocket!: SocketClient
-    private serverSocket!: SocketServer
+    private connectedClient: ReceiverProfile[] = []
     private responseSubject: Subject<any> = new Subject()
-    private eventNotification: Subject<any> = new Subject()
+    private eventNotification: Subject<TransportEventNotification> = new Subject()
 
     constructor(setting: TransportSettings) {
         this.websocketRole == setting.role
         if (setting.role == 'Server') {
-            startSocketServer(setting.profileInfo.port as number).then((server: SocketServer) => {
-                this.serverSocket = server
+            startSocketServer(setting.profileInfo.port as number).subscribe({
+                next: (connectedClient: SocketServer) => {
+                    // returns the socket client instance 
+                    let receiverProfile: ReceiverProfile = {
+                        uuid: uuidv4(),
+                        name: `Client`,
+                        dateCreated: new Date(),
+                        data: connectedClient
+                    }
+                    this.eventNotification.next({ event: 'WebsocketClientConnection', description: 'New Client Connected', data: receiverProfile })
+                    this.connectedClient.push(receiverProfile)
+
+                    // put here first, but can me mgirated to other parts of the code
+                    connectedClient.on('disconnect', () => {
+                        this.eventNotification.next({ event: 'WebscoketClientConnection', description: `Existing Client ${connectedClient.id} disonnected`, data: receiverProfile })
+                    })
+                },
+                error: error => console.error(error),
+                complete: () => { } // should not complete. Keep listening to new client connection
             })
         }
+        // just focus on receiving. 
         if (setting.role == 'Client') {
             startClientSocketConnection(setting.profileInfo.url as string).then((client: Socket) => {
                 this.clientSocket = client
+                // Need to open to listen to incoming responses
+                this.clientSocket.on('message', (message) => {
+                    // console.log(message)
+                    this.responseSubject.next(message)
+                })
+                this.clientSocket.on('notification', (notification) => {
+                    // console.log(notification)
+                    if (notification.header.messageName == 'NotificationMessage') {
+                        this.eventNotification.next({ event: 'Server Notification', description: 'Notification from server', data: notification })
+                    }
+                })
+                this.clientSocket.on('disconnect', () => {
+                    this.eventNotification.next({ event: 'Websocket Client Connection Status', description: 'Server disconnected' })
+                })
             })
         }
     }
 
-    send(message: any): Observable<any> {
-        return new Observable((response) => {
-            if (this.websocketRole == 'Server') {
-
-            }
-            if (this.websocketRole == 'Client') {
-                // filter message
-                this.clientSocket.emit('message', message)
-                this.responseSubject.subscribe(responseMsg => {
-                    if (responseMsg.id == message.id && !responseMsg.complete) {
-                        response.next(responseMsg)
-                    }
-                    if (responseMsg.id == message.id && responseMsg.complete) {
-                        response.complete()
-                    }
-                })
-            }
-        })
+    // to be used to emulate request response. But only for sending request, since all the response are to be redirected to another output
+    send(message: any, target?: string): void {
+        let socket = this.connectedClient.find(object => object.uuid === target)
+        if (socket) {
+            socket.data.emit('message', message)
+        }
     }
 
-    emit(message: any): void {
-        if (this.clientSocket) {
-            this.clientSocket.emit(`event`, message)
-        }
+    emit(event: string, message: any): void {
+        // for now just assume everyone is subscribed to this broacdcast
+        this.connectedClient.forEach(client => {
+            client.data.emit(event, message)
+        });
+    }
+
+    getResponse(): Observable<any> {
+        return this.responseSubject.asObservable()
     }
 
-    getTransportEventNotification(): Observable<any> {
-        return this.eventNotification as Observable<any>
+    getEventNotification(): Observable<TransportEventNotification> {
+        return this.eventNotification as Observable<TransportEventNotification>
     }
 
 }

+ 11 - 7
src/utils/socket.utils.ts

@@ -1,14 +1,18 @@
+import { Observable } from 'rxjs';
 import { createServer } from 'http';
 import { Server, Socket as ServerSocket } from 'socket.io';
 import { io, Socket as ClientSocket } from 'socket.io-client';
 
-export async function startSocketServer(port: number): Promise<ServerSocket> {
-    return new Promise((resolve, reject) => {
+export function startSocketServer(port: number): Observable<ServerSocket> {
+    return new Observable((observer) => {
         try {
-            let socketServer = new Server()
+            let httpServer = createServer();
+            let socketServer = new Server(httpServer)
 
+            // something wrong here
             socketServer.on('connection', (socket) => {
-                resolve(socket)
+                // console.log(`New client connected ${socket.id}`)
+                observer.next(socket)
             })
 
             socketServer.engine.on("connection_error", (err) => {
@@ -19,9 +23,9 @@ export async function startSocketServer(port: number): Promise<ServerSocket> {
             });
 
             // Start the socketServer
-            socketServer.listen(port)
+            httpServer.listen(port)
         } catch (error) {
-            reject(error)
+            observer.error(error)
         }
     })
 }
@@ -39,7 +43,7 @@ export async function startClientSocketConnection(serverUrl: string): Promise<Cl
 
             // Listen for a connection event
             clientSocket.on('connect', () => {
-                console.log('Connected to the server:', clientSocket.id)
+                // console.log('Connected to the server:', clientSocket.id)
                 resolve(clientSocket)
             });
         }