Browse Source

Basic retransmissino applicatidon for socket. To be Tested. Message Ordering for client side still needs to be fixedd due to always having to wait for previous message before being released. Not tested with multiple cliens.

enzo 1 month ago
parent
commit
e04120a484

+ 84 - 0
info.json

@@ -0,0 +1,84 @@
+{
+  "id": "qeT6-GRlp_Y43ct4AAAB",
+  "clientName": "ffcb2acb-0060-433c-a53e-6b69b11c5a2d",
+  "connectedAt": "2024-09-16T09:17:11.369Z",
+  "clientConnectionState": {
+    "closed": false,
+    "currentObservers": null,
+    "observers": [],
+    "isStopped": false,
+    "hasError": false,
+    "thrownError": null,
+    "_value": "ONLINE"
+  },
+  "requests": [],
+  "buffer": {
+    "sortMessage": false,
+    "bufferReleaseSignal": {
+      "closed": false,
+      "currentObservers": null,
+      "observers": [],
+      "isStopped": false,
+      "hasError": false,
+      "thrownError": null
+    },
+    "receiverConnectionState": {
+      "closed": false,
+      "currentObservers": null,
+      "observers": [],
+      "isStopped": false,
+      "hasError": false,
+      "thrownError": null,
+      "_value": "OFFLINE"
+    },
+    "transmissionState": {
+      "closed": false,
+      "currentObservers": null,
+      "observers": [],
+      "isStopped": false,
+      "hasError": false,
+      "thrownError": null,
+      "_value": "ARRAY EMPTY"
+    },
+    "arrayToBeTransmitted": {
+      "closed": false,
+      "currentObservers": null,
+      "observers": [],
+      "isStopped": false,
+      "hasError": false,
+      "thrownError": null
+    },
+    "toBeWrapped": {
+      "closed": false,
+      "currentObservers": null,
+      "observers": [],
+      "isStopped": false,
+      "hasError": false,
+      "thrownError": null
+    },
+    "wrappedMessageToBeBuffered": {
+      "closed": false,
+      "currentObservers": null,
+      "observers": [],
+      "isStopped": false,
+      "hasError": false,
+      "thrownError": null
+    },
+    "messageToBeTransmitted": {
+      "closed": false,
+      "currentObservers": null,
+      "observers": [],
+      "isStopped": false,
+      "hasError": false,
+      "thrownError": null
+    }
+  },
+  "responseObs": {
+    "closed": false,
+    "currentObservers": null,
+    "observers": [],
+    "isStopped": false,
+    "hasError": false,
+    "thrownError": null
+  }
+}

+ 2 - 1
interfaces/general.interface.ts

@@ -82,12 +82,13 @@ export interface ConnectionID {
 export interface ClientNotificationState {
     event: string,
     message: string,
-    status?: 'ONLINE' | 'OFFLINE' | null 
+    status?: 'ONLINE' | 'OFFLINE' | null
 }
 
 
 export interface WrappedMessage {
     timeReceived: any, // this property is for sender to sort
     payload: BaseMessage,
+    thisMessageID?: string,
     previousMessageID?: string // this property is for receiver to sort
 }

+ 3 - 3
package.json

@@ -11,15 +11,15 @@
     "generatedata": "node services/utility/generateData.js",
     "http1": "node test/http1.js",
     "http2": "node test/http2.js",
-    "socket": "node test/socket.js",
+    "socket": "node test/socket/socket-client.js",
     "grpc1": "node test/grpc1.js",
     "grpc2": "node test/grpc2.js",
     "grpc3": "node test/grpc3.js",
     "testing": "node test/test.js",
     "server": "node test/grpcTest.js",
     "rxjsbuffer": "node test/rxjsbuffer.sample.test.js",
-    "http-test-server": "node test/http-test-server.js",
-    "socket-test-server": "node test/socket-test-server.js",
+    "http-test-server": "node test/socket/http-test-server.js",
+    "socket-test-server": "node test/socket/socket-test-server.js",
     "simpleObsTest": "node test/simpleObsTest.js",
     "compareString": "node test/stringtest.js"
   },

+ 14 - 9
services/retransmission.service.ts

@@ -3,8 +3,10 @@ import { BaseMessage } from "../dependencies/logging/dependencies/msgutil/depend
 import { RetransmissionInterface } from "../interfaces/retransmission.interface";
 import { WrappedMessage } from "../interfaces/general.interface";
 import { sortMessageBasedOnDate } from "./utility/message-ordering";
+import { v4 as uuidV4 } from 'uuid';
 
 export class RetransmissionService implements RetransmissionInterface {
+    private currentMessageId: string | null
     private sortMessage: boolean = false
     private bufferReleaseSignal: Subject<void> = new Subject()
     private receiverConnectionState: BehaviorSubject<'OFFLINE' | 'ONLINE'> = new BehaviorSubject('OFFLINE')
@@ -15,7 +17,7 @@ export class RetransmissionService implements RetransmissionInterface {
     private messageToBeTransmitted: Subject<WrappedMessage> = new Subject()
 
     // Interface
-    public retransmission(payloadToBeTransmitted: Observable<any>, eventListener: Observable<any>, messageOrdering?: boolean): Observable<any> {
+    public retransmission(payloadToBeTransmitted: Observable<any>, eventListener: Observable<any>, messageOrdering?: boolean): Observable<WrappedMessage> {
         return new Observable((observer) => {
             if (messageOrdering) {
                 this.sortMessage = true
@@ -31,35 +33,38 @@ export class RetransmissionService implements RetransmissionInterface {
                 this.toBeWrapped.next(message)
             })
 
+            // these ones are the ones that are exited out from the bufferey'
             this.messageToBeTransmitted.subscribe(message => observer.next(message))
         })
     }
 
     private startWrappingOperation() {
-        let currentMessageId: string | null
         this.toBeWrapped.subscribe(message => {
-            this.wrappedMessageToBeBuffered.next(this.wrapMessageWithTimeReceived(message, currentMessageId ? currentMessageId : null))
-            currentMessageId = message.header.messageID
+            this.wrappedMessageToBeBuffered.next(this.wrapMessageWithTimeReceived(message, this.currentMessageId ? this.currentMessageId : null))
         })
         //simulate connection test
 
         // wrappedMessageToBeBuffered will then be pushed to buffer
         this.wrappedMessageToBeBuffered.pipe(buffer(this.bufferReleaseSignal)).subscribe((bufferedMessages: WrappedMessage[]) => {
-            console.log(bufferedMessages.length + ' buffered messages')
-            console.log(`Released buffered message: ${bufferedMessages.length} total messages. To Be sorted.`)
+            // console.log(bufferedMessages.length + ' buffered messages')
+            // console.log(`Released buffered message: ${bufferedMessages.length} total messages. To Be sorted.`)
             // arrayToBeTransmitted.next(sortMessage(bufferedMessages))
-            this.arrayToBeTransmitted.next(this.sortMessage && bufferedMessages.length > 0 ? sortMessageBasedOnDate(bufferedMessages) : [])
+            this.arrayToBeTransmitted.next((this.sortMessage && bufferedMessages.length > 0) ? sortMessageBasedOnDate(bufferedMessages) : bufferedMessages)
         });
     }
 
-    private wrapMessageWithTimeReceived(message: any, previousMessageID: string): WrappedMessage {
+    private wrapMessageWithTimeReceived(message: any, previousMessageID: string | null): WrappedMessage {
         // check if message has already a time received property if so no need to add anymore
         if (!message.timeReceived) {
             let WrappedMessage: WrappedMessage = {
                 timeReceived: new Date(),
                 payload: message as BaseMessage,
+                thisMessageID: uuidV4(),
                 previousMessageID: previousMessageID
             }
+            // console.log(`Current`, WrappedMessage.thisMessageID, 'Previous for this message:', WrappedMessage.previousMessageID)
+            this.currentMessageId = WrappedMessage.thisMessageID
+            // console.log(`Updating: `, this.currentMessageId)
             return WrappedMessage
         } else {
             return message as WrappedMessage
@@ -84,7 +89,7 @@ export class RetransmissionService implements RetransmissionInterface {
                     error: err => console.error(err),
                     complete: () => {
                         // update transmission state to indicate this batch is completed
-                        console.log(`Processing buffered array completed. Changing transmission state to ARRAY EMPTY`);
+                        // console.log(`Processing buffered array completed. Changing transmission state to ARRAY EMPTY`);
                         this.transmissionState.next('ARRAY EMPTY');
 
                         if (this.receiverConnectionState.getValue() === 'ONLINE' && this.transmissionState.getValue() === 'ARRAY EMPTY') {

+ 0 - 120
test/socket-test-server.ts

@@ -1,120 +0,0 @@
-import { BehaviorSubject, from, Observable, Subject, takeUntil, takeWhile } from "rxjs";
-import { RetransmissionService } from "../services/retransmission.service";
-import { prepareResponseMessages } from "../services/utility/prepareFISmessage";
-import { BaseMessage } from "../dependencies/logging/services/logging-service";
-
-const express = require('express');
-const http = require('http');
-const { Server } = require('socket.io');
-
-const app = express();
-const server = http.createServer(app);
-const io = new Server(server);
-// Keep track of connected clients
-const clients: any[] = []
-let announcements: Subject<any> = new Subject()
-announcements.subscribe(announcement => {
-    console.log(`Server Announcement: ${announcement}`)
-})
-
-app.use(express.static('public'));
-
-io.on('connection', (socket) => {
-    announcements.next('a client connected:' + socket.id);
-    let clientInfo: ClientInfo = {
-        id: socket.id,
-        connectedAt: new Date(),
-        clientConnectionState: new BehaviorSubject<'ONLINE' | 'OFFLINE'>('ONLINE'),
-        requests: [],
-        buffer: new RetransmissionService()
-    }
-    let serverBuffer = new Subject<any>()
-    clients.push(clientInfo);
-    clientInfo.buffer.retransmission(serverBuffer, clientInfo.clientConnectionState).subscribe(output => socket.emit('message', output))
-
-    // Listen for messages from the client
-    socket.on('message', (request) => {
-        announcements.next(`Received Message: ${request.header.messageID} from ${clientInfo.id}`);
-        clientInfo.requests.push({ message: request, completed: false })
-        returnResponse(request).subscribe({
-            next: message => serverBuffer.next(message),
-            error: err => console.error(err),
-            complete: () => {
-                let clientOBJ = clientInfo.requests.find(obj => obj.message.header.messageID === request.header.messageID)
-                clientOBJ.completed = true
-                console.log('Current Array', clients)
-            }
-        })
-    });
-
-    socket.on('connect', (msg) => {
-        // Send a response back to the client
-        socket.emit('notification', `Hello from server. You have been assigned ${socket.id}`);
-    });
-
-    socket.on('interval', (value) => {
-        console.log(socket.id, value) // okay so it does receive in sequence after reconnection
-    })
-
-    // Handle disconnection
-    socket.on('disconnect', () => {
-        announcements.next(`Client ${clientInfo.id} disconnected`);
-        deleteClientById(socket.id)
-    });
-});
-
-io.engine.on("connection_error", (err) => {
-    console.log(err.req);      // the request object
-    console.log(err.code);     // the error code, for example 1
-    console.log(err.message);  // the error message, for example "Session ID unknown"
-    console.log(err.context);  // some additional error context
-  });
-
-// Start the server
-const PORT = process.env.PORT || 3000;
-server.listen(PORT, () => {
-    console.log(`Server listening on port ${PORT}`);
-});
-
-
-
-// Utils
-// Function to delete an item by its id (mutating the array)
-function deleteClientById(id) {
-    const index = clients.findIndex(item => item.id === id);
-    if (index !== -1) {
-        clients.splice(index, 1);
-    }
-}
-
-function returnResponse(request: BaseMessage): Observable<BaseMessage> {
-    return new Observable((observer) => {
-        prepareResponseMessages(10, 1000).subscribe({
-            next: (message: BaseMessage) => {
-                message.header.messageID = request.header.messageID
-                observer.next(message)
-            },
-            error: err => console.error(err),
-            complete: () => {
-                prepareResponseMessages(1).subscribe({
-                    next: message => {
-                        message.header.messageID = request.header.messageID
-                        message.header.messageName = 'Complete'
-                        observer.next(message)
-                    },
-                    complete: () => {
-                        observer.complete()
-                    }
-                })
-            }
-        })
-    })
-}
-
-export interface ClientInfo {
-    id: string,
-    connectedAt: Date,
-    clientConnectionState: BehaviorSubject<'ONLINE' | 'OFFLINE'>,
-    requests: { message: any, completed: boolean }[],
-    buffer: RetransmissionService
-}

+ 0 - 123
test/socket.ts

@@ -1,123 +0,0 @@
-import { Observable, Subject, takeWhile } from "rxjs";
-import { prepareResponseMessages } from "../services/utility/prepareFISmessage";
-import { BaseMessage } from "../dependencies/logging/interface/export";
-import { WrappedMessage } from "../services/retransmission.service";
-import { io, Socket } from "socket.io-client";
-
-let onHoldMessagesSubject: Subject<WrappedMessage> = new Subject()
-let toBePassedOverToApp: Subject<BaseMessage> = new Subject()
-// Serve static files (optional)
-let sender: Subject<BaseMessage> = prepareResponseMessages(1, 2000)
-let serverSocketUrl: string = 'http://localhost:3000'
-let socket: Socket
-establishSocketConnection(serverSocketUrl)
-// interval(1000).subscribe(value => { // just to test if the emission is in sequence after reconnection
-//     console.log(value)
-//     socket.emit('interval', value)
-// })
-sender.subscribe({
-    next: message => {
-        makeRequest(message).subscribe({
-            complete: () => console.log(`Request ${message.header.messageID} has acquired all responses.`)
-        })
-    }
-})
-
-// the interface the client Program will make without having to decide transport protocol
-function makeRequest(request: BaseMessage): Observable<any> {
-    return new Observable((response) => {
-        sendMessage(request)
-        toBePassedOverToApp.subscribe({
-            next: (message: BaseMessage) => {
-                // The identification of responses mapping to the request be adjusted accordingly
-                // For now it's a simple demulti-plexing
-                if (message.header.messageID == request.header.messageID && message.header.messageName == 'ResponseData') {
-                    response.next(message)
-                }
-                if (message.header.messageID == request.header.messageID && message.header.messageName == 'Complete') {
-                    response.complete()
-                }
-            },
-            error: err => console.error(err),
-            complete: () => { }
-        })
-    })
-}
-
-// socket util: Assuming that the client program would already have something like this in place
-function establishSocketConnection(serverUrl: string) {
-    socket = io(serverUrl, {
-        reconnection: true,              // Enable automatic reconnections
-        reconnectionAttempts: 100,        // Retry up to 10 times
-        reconnectionDelay: 500,          // Start with a 500ms delay
-        reconnectionDelayMax: 10000,     // Delay can grow to a max of 10 seconds
-        randomizationFactor: 0.3,
-    })
-
-    // Listen for a connection event
-    socket.on('connect', () => {
-        // socket.emit('Hello from the client!')
-        console.log('Connected to the server:', socket.id)
-        // receiverConnectionState.next('ONLINE')
-    });
-
-    // Listen for messages from the server
-    socket.on('message', (msg: WrappedMessage) => {
-        console.log('Message from server:', msg.payload.header.messageID);
-
-        // Check the sequence by ensuring the message value before the current message exists, then pass them over to "App"
-        onHoldMessagesSubject.next(msg)
-        checkMessage(msg).then(() => [
-            toBePassedOverToApp.next(msg.payload as BaseMessage)
-        ]).catch((err) => console.error(err))
-    })
-
-    socket.on('notification', (msg: string) => {
-        console.log(msg)
-    })
-
-    // Handle disconnection
-    socket.on('disconnect', () => {
-        console.log('Disconnected from the server');
-        // receiverConnectionState.next('OFFLINE')
-    });
-}
-
-
-
-async function sendMessage(message: BaseMessage): Promise<any> {
-    return new Promise((resolve, reject) => {
-        try {
-            // extra precaution: According to chatgpt, if disconnected, then the payload will be loaded back in event queue whilst the socket will try to reestablish connection
-            // https://socket.io/docs/v4/client-offline-behavior/
-            socket.emit('message', message); // inherently an aysnc
-            console.log(`SocketEmit() for message to event queue ${message.header.messageID}`)
-            resolve('')
-        } catch (error) {
-            console.error('Error emitting message:', error);
-            this.wrappedMessage.next(message)
-            reject(error)
-        } ``
-    })
-}
-
-
-// SO concept will be that if the message behind it is received, then 
-async function checkMessage(message: WrappedMessage): Promise<any> {
-    return new Promise((resolve, reject) => {
-        if (message.previousMessageID) {
-            onHoldMessagesSubject.pipe(
-                takeWhile(item => item.payload.header.messageID == message.previousMessageID)
-            ).subscribe({
-                complete: () => {
-                    resolve('previousMessageID matched')
-                    // console.log(`${message.payload.header.messageID} : Previous messageID(${message.previousMessageID}) matched`)
-                    console.log(`matched`)
-                }
-            })
-        } else {
-            console.log('No previous messageID. This should be the first message')
-            resolve('No previous message ID. Please Proceed.')
-        }
-    })
-}

+ 189 - 0
test/socket/socket-client.ts

@@ -0,0 +1,189 @@
+import { Observable, Subject, takeWhile } from "rxjs";
+import { prepareResponseMessages } from "../../services/utility/prepareFISmessage";
+import { BaseMessage } from "../../dependencies/logging/interface/export";
+import { io, Socket } from "socket.io-client";
+import { WrappedMessage } from "../../interfaces/general.interface";
+import * as fs from 'fs'
+import { ClientInfo } from "./socket.service";
+
+let onHoldMessagesSubject: Subject<WrappedMessage> = new Subject()
+let toBePassedOverToApp: Subject<BaseMessage> = new Subject()
+// Serve static files (optional)
+let sender: Subject<BaseMessage> = prepareResponseMessages(1, 2000)
+let serverSocketUrl: string = 'http://localhost:3000'
+let socket: Socket
+
+
+establishSocketConnection(serverSocketUrl).then(() => {
+    sender.subscribe({
+        next: message => {
+            makeRequest(message).subscribe({
+                complete: () => console.log(`Request ${message.header.messageID} has acquired all responses.`)
+            })
+        }
+    })
+})
+
+// the interface the client Program will make without having to decide transport protocol
+function makeRequest(request: BaseMessage): Observable<any> {
+    return new Observable((response) => {
+        sendMessage(request)
+        toBePassedOverToApp.subscribe({
+            next: (message: BaseMessage) => {
+                // console.log(message.header.messageName)
+                // The identification of responses mapping to the request be adjusted accordingly
+                // For now it's a simple demulti-plexing
+                if (message.header.messageID == request.header.messageID && message.header.messageName != 'Complete') {
+                    response.next(message)
+                }
+                if (message.header.messageID == request.header.messageID && message.header.messageName == 'Complete') {
+                    response.complete()
+                }
+            },
+            error: err => console.error(err),
+            complete: () => { }
+        })
+    })
+}
+
+// socket util: Assuming that the client program would already have something like this in place
+async function establishSocketConnection(serverUrl: string): Promise<any> {
+    return new Promise((resolve, reject) => {
+        try {
+            socket = io(serverUrl, {
+                reconnection: true,              // Enable automatic reconnections
+                reconnectionAttempts: 100,        // Retry up to 10 times
+                reconnectionDelay: 500,          // Start with a 500ms delay
+                reconnectionDelayMax: 10000,     // Delay can grow to a max of 10 seconds
+                randomizationFactor: 0.3,
+            })
+
+            // Check if it's a previuos client.
+            let data: ClientInfo | null = checkOwnClientInfo('info.json')
+            if (data) {
+                socket.emit('notification', { agenda: 'existingClient', data: data })
+            } else {
+                socket.emit('notification', { agenda: 'newClient' })
+            }
+
+            // Listen for a connection event
+            socket.on('connect', () => {
+                // socket.emit('Hello from the client!')
+                console.log('Connected to the server:', socket.id)
+            });
+
+            // Listen for messages from the server
+            socket.on('response', (msg: WrappedMessage) => {
+                // console.log('Message from server:', msg.payload.header.messageName);
+
+                // Check the sequence by ensuring the message value before the current message exists, then pass them over to "App"
+                // onHoldMessagesSubject.next(msg)
+                // checkMessage(msg).then(() => [
+                //     toBePassedOverToApp.next(msg.payload as BaseMessage)
+                // ]).catch((err) => console.error(err))
+                toBePassedOverToApp.next(msg.payload as BaseMessage)
+            })
+
+            socket.on('notification', (msg: any) => {
+                if (msg.notification == 'Your credentials') {
+                    console.log(`Assigned client Name: ${msg.socketInfo.clientName}`)
+                    writeFile(msg.socketInfo as ClientInfo)
+                }
+                if (msg.notification == 'Your updated credentials') {
+                    console.log(`Updated socket ID: ${msg.socketInfo.id}`)
+                    writeFile(msg.socketInfo as ClientInfo)
+                }
+                if (msg.notification == 'Failed Request') {
+                    console.log(`Resending request...`, msg.data.header.messageID)
+                    // sender.next(msg.data)
+                }
+            })
+
+            resolve('')
+            // Handle disconnection
+            socket.on('disconnect', () => {
+                console.log('Disconnected from the server');
+                // receiverConnectionState.next('OFFLINE')
+            });
+        }
+        catch (error) {
+            reject(error)
+        }
+    })
+}
+
+
+function checkOwnClientInfo(filename: string): ClientInfo | null {
+    // Check if the file exists
+    if (fs.existsSync(filename)) {
+        try {
+            // Read the file contents
+            const fileData = fs.readFileSync(filename, 'utf8');
+
+            // If the file is empty, return an error
+            if (fileData.trim() === "") {
+                throw new Error("File is empty");
+            }
+
+            // Parse and return the data if present
+            const jsonData = JSON.parse(fileData);
+            return jsonData;
+
+        } catch (err) {
+            // Handle parsing errors or other file-related errors
+            console.error("Error reading or parsing file:", err.message);
+            return null;
+        }
+    } else {
+        console.error("File does not exist");
+        return null;
+    }
+}
+
+function writeFile(data: ClientInfo) {
+    // Write JSON data to a file
+    fs.writeFile('info.json', JSON.stringify(data, null, 2), (err) => {
+        if (err) {
+            console.error('Error writing file', err);
+        } else {
+            console.log('File has been written');
+        }
+    });
+}
+
+async function sendMessage(message: BaseMessage): Promise<any> {
+    return new Promise((resolve, reject) => {
+        try {
+            // extra precaution: According to chatgpt, if disconnected, then the payload will be loaded back in event queue whilst the socket will try to reestablish connection
+            // https://socket.io/docs/v4/client-offline-behavior/
+            socket.emit('request', message); // inherently an aysnc
+            console.log(`SocketEmit() for message to event queue ${message.header.messageID}`)
+            resolve('')
+        } catch (error) {
+            console.error('Error emitting message:', error);
+            sender.next(message)
+            reject(error)
+        }
+    })
+}
+
+
+// SO concept will be that if the message behind it is received, then 
+async function checkMessage(message: WrappedMessage): Promise<any> {
+    return new Promise((resolve, reject) => {
+        if (message.previousMessageID) {
+            onHoldMessagesSubject.pipe(
+                takeWhile(item => message.previousMessageID === item.thisMessageID )
+            ).subscribe({
+                complete: () => {
+                    resolve('previousMessageID matched')
+                    // console.log(`${message.payload.header.messageID} : Previous messageID(${message.previousMessageID}) matched`)
+                    // console.log(`matched`)
+                }
+            })
+        } else {
+            console.log('No previous messageID. This should be the first message')
+            resolve('No previous message ID. Please Proceed.')
+        }
+    })
+}

+ 54 - 0
test/socket/socket-test-server.ts

@@ -0,0 +1,54 @@
+
+
+// so This is just for demonstaratin purpose.
+/* App logic will accept a request and return responses via subjects. Whether or not 
+whose request it belongs to, would be presumed to be handled by the service */
+
+import { Observable, Subject } from "rxjs";
+import { SocketService } from "./socket.service";
+import { BaseMessage } from "../../dependencies/logging/services/logging-service";
+import { prepareResponseMessages } from "../../services/utility/prepareFISmessage";
+
+let incomingRequest = new Observable<BaseMessage>()
+let outGoingResponses = new Subject<BaseMessage>()
+let socketService = new SocketService(outGoingResponses)
+incomingRequest = socketService.getIncomingRequest()
+
+socketService.setUpConnection()
+
+incomingRequest.subscribe((request: BaseMessage) => {
+    returnResponse(request).subscribe({
+        next: message => {
+            outGoingResponses.next(message)
+        },
+        error: err => console.error(err),
+        complete: () => {
+            // no need to do anything
+            console.log(`Application completed in providing responses for request${request.header.messageID}...`)
+        }
+    })
+})
+
+function returnResponse(request: BaseMessage): Observable<BaseMessage> {
+    return new Observable((observer) => {
+        prepareResponseMessages(10, 500).subscribe({
+            next: (message: BaseMessage) => {
+                message.header.messageID = request.header.messageID
+                observer.next(message)
+            },
+            error: err => console.error(err),
+            complete: () => {
+                prepareResponseMessages(1, 1000).subscribe({
+                    next: message => {
+                        message.header.messageID = request.header.messageID
+                        message.header.messageName = 'Complete'
+                        observer.next(message)
+                    },
+                    complete: () => {
+                        observer.complete()
+                    }
+                })
+            }
+        })
+    })
+}

+ 186 - 0
test/socket/socket.service.ts

@@ -0,0 +1,186 @@
+import { BehaviorSubject, Observable, Subject } from "rxjs"
+import { RetransmissionService } from "../../services/retransmission.service"
+import { BaseMessage } from "../../dependencies/logging/services/logging-service"
+import { v4 as uuidV4 } from 'uuid';
+import { Socket } from "socket.io-client";
+const express = require('express');
+const http = require('http');
+const { Server } = require('socket.io');
+
+/* This is only for demonstration purposes. Because the actual nestjs socket implementation may differ. */
+export class SocketService {
+
+    private connectedClients: ClientInfo[] = []
+    private announcements: Subject<any> = new Subject()
+    private app = express();
+    private server = http.createServer(this.app);
+    private io = new Server(this.server);
+    private responseFromApp: Subject<BaseMessage>
+    private incomingRequest: Subject<BaseMessage> = new Subject()
+
+    constructor(response: Subject<BaseMessage>) {
+        this.responseFromApp = response
+        this.announcements.subscribe(announcement => {
+            console.log(`Server Announcement: ${announcement}`)
+        })
+    }
+
+    public getIncomingRequest(): Observable<BaseMessage> {
+        return this.incomingRequest.asObservable()
+    }
+
+    public async setUpConnection() {
+        this.io.on('connection', (socket) => {
+            this.announcements.next('a client is connected:' + socket.id);
+            let clientInfo: ClientInfo | null
+
+            socket.on('connect', (msg) => {
+                // this is reserved....
+            });
+
+            socket.on('notification', (msg) => {
+                console.log(msg)
+                clientInfo = this.handleNotification(msg, socket, clientInfo)
+            })
+
+            // Listen for messages from the client
+            socket.on('request', (request: BaseMessage) => {
+                if (clientInfo) {
+                    this.announcements.next(`Received Message: ${request.header.messageID} from ${clientInfo.clientName}`);
+                    // clientInfo.requests.push({ message: request, completed: false })
+                    this.incomingRequest.next(request)
+                    this.processRequest(request).subscribe({
+                        next: message => {
+                            // console.log(message.header.messageName) // it does receive
+                            clientInfo.responseObs.next(message)
+                        },
+                        error: err => console.error(err),
+                        complete: () => { }
+                    })
+                } else {
+                    console.log(`Client is still not defined. Please have this client set up the credentials`)
+                    socket.emit('notification', {
+                        notification: 'Failed Request',
+                        data: request,
+                        message: 'Client Credentials is not properply set up! Cannot process requests at the moment.'
+                    })
+                }
+            });
+
+            // Handle disconnection
+            socket.on('disconnect', () => {
+                if (clientInfo) {
+                    clientInfo.clientConnectionState.next('OFFLINE') // signal to start buffering
+                    this.announcements.next(`Client ${clientInfo.id} disconnected`);
+                    // this.deleteClientById(socket.id)
+                }
+            });
+
+        });
+
+        this.io.engine.on("connection_error", (err) => {
+            console.log(err.req);      // the request object
+            console.log(err.code);     // the error code, for example 1
+            console.log(err.message);  // the error message, for example "Session ID unknown"
+            console.log(err.context);  // some additional error context
+        });
+
+        // Start the server
+        const PORT = process.env.PORT || 3000;
+        this.server.listen(PORT, () => {
+            console.log(`Server listening on port ${PORT}`);
+        });
+    }
+
+
+    // Utils
+    // Function to delete an item by its id (mutating the array)
+    private deleteClientById(id) {
+        const index = this.connectedClients.findIndex(item => item.id === id);
+        if (index !== -1) {
+            this.connectedClients.splice(index, 1);
+        }
+    }
+
+    private processRequest(request: BaseMessage): Observable<BaseMessage> {
+        return new Observable((observer) => {
+            this.responseFromApp.subscribe(message => {
+                // console.log(message)
+                if (message.header.messageID === request.header.messageID && message.header.messageName != 'Complete') {
+                    observer.next(message)
+                }
+                if (message.header.messageID === request.header.messageID && message.header.messageName == 'Complete') {
+                    observer.next(message)
+                    // console.log(message) // so it does show
+                    observer.complete()
+                }
+            })
+        })
+    }
+
+    private handleNotification(msg: any, socket: Socket, clientInfo: ClientInfo | null) {
+        if (msg.agenda == 'newClient') {
+            clientInfo = {
+                id: socket.id,
+                clientName: uuidV4(),
+                connectedAt: new Date(),
+                clientConnectionState: new BehaviorSubject<'ONLINE' | 'OFFLINE'>('ONLINE'),
+                requests: [],
+                buffer: new RetransmissionService(),
+                responseObs: new Subject<BaseMessage>()
+            }
+            this.connectedClients.push(clientInfo);
+
+            // Send data over for client to persist
+            socket.emit('notification', {
+                notification: 'Your credentials',
+                createdAt: new Date(),
+                socketInfo: clientInfo
+            })
+
+            // this is the supposed responses to be pushed to this socket client
+            clientInfo.buffer.retransmission(clientInfo.responseObs, clientInfo.clientConnectionState).subscribe(output => {
+                // console.log(output)
+                socket.emit('response', output)
+            })
+        }
+
+        if (msg.agenda == 'existingClient') {
+            // check if client exists
+            let clientObj = this.connectedClients.find(obj => obj.clientName === msg.data.clientName)
+            if (clientObj) {
+                clientInfo = clientObj
+                console.log('Existing client found')
+                // but also update socketId
+                clientObj.id = socket.id
+
+                // Send data over for client to persist
+                socket.emit('notification', {
+                    notification: 'Your updated credentials',
+                    connectedAt: new Date(),
+                    socketInfo: clientInfo
+                })
+
+                socket.emit('notification', `Hello from server. You have been assigned ${socket.id}`);
+                // resume operation
+                clientObj.clientConnectionState.next('ONLINE')
+            } else {
+                console.log(this.connectedClients)
+                console.log(`Existing Client is not found`)
+            }
+        }
+
+        return clientInfo
+    }
+
+}
+
+export interface ClientInfo {
+    id: string,
+    clientName: string,
+    connectedAt: Date,
+    clientConnectionState: BehaviorSubject<'ONLINE' | 'OFFLINE'>,
+    requests: { message: any, completed: boolean }[],
+    buffer: RetransmissionService,
+    responseObs: Subject<BaseMessage>
+}

+ 0 - 0
test/socket.test.ts → test/socket/socket.test.ts


+ 0 - 0
test/socketTest.txt → test/socket/socketTest.txt