Browse Source

adjustments to fix socket behaviour

Enzo 1 month ago
parent
commit
2d36318f21

+ 1 - 1
interfaces/general.interface.ts

@@ -88,7 +88,7 @@ export interface ClientNotificationState {
 
 export interface WrappedMessage {
     timeReceived: any, // this property is for sender to sort
-    payload: BaseMessage,
+    payload: any,
     thisMessageID?: string,
     previousMessageID?: string // this property is for receiver to sort
 }

+ 7 - 5
services/retransmission.service.ts

@@ -1,5 +1,4 @@
 import { BehaviorSubject, buffer, concatMap, distinctUntilChanged, from, Observable, Subject, takeWhile } from "rxjs";
-import { BaseMessage } from "../dependencies/logging/dependencies/msgutil/dependencies/dependencies";
 import { RetransmissionInterface } from "../interfaces/retransmission.interface";
 import { WrappedMessage } from "../interfaces/general.interface";
 import { sortMessageBasedOnDate } from "./utility/message-ordering";
@@ -57,7 +56,7 @@ export class RetransmissionService implements RetransmissionInterface {
         if (!message.timeReceived) {
             let WrappedMessage: WrappedMessage = {
                 timeReceived: new Date(),
-                payload: message as BaseMessage,
+                payload: message,
                 thisMessageID: uuidV4(),
                 previousMessageID: previousMessageID
             }
@@ -82,6 +81,7 @@ export class RetransmissionService implements RetransmissionInterface {
                             this.wrappedMessageToBeBuffered.next(message)
                         }
                         if (this.receiverConnectionState.getValue() == 'ONLINE') {
+                            // console.log(`At retransmission ${message.payload.header.messageID ?? 'undefined'}`)
                             this.messageToBeTransmitted.next(message)
                         }
                     },
@@ -101,9 +101,11 @@ export class RetransmissionService implements RetransmissionInterface {
                 });
             } else {
                 // If I don't do setTimeout, then bufferrelasesignal will be overloaded
-                setTimeout(() => {
-                    this.bufferReleaseSignal.next()
-                }, 3000)
+                if (this.receiverConnectionState.getValue() === 'ONLINE') {
+                    setTimeout(() => {
+                        this.bufferReleaseSignal.next()
+                    }, 3000)
+                }
             }
         }
         )

+ 27 - 25
test/socket/socket-client.ts

@@ -10,23 +10,24 @@ import { MongoService } from "./temp-log-service";
 let onHoldMessagesSubject: Subject<WrappedMessage> = new Subject()
 let toBePassedOverToApp: Subject<BaseMessage> = new Subject()
 // Serve static files (optional)
-let sender: Subject<BaseMessage> = prepareResponseMessages(5, 1000)
+let sender: Subject<BaseMessage> = prepareResponseMessages(2, 1000)
 let serverSocketUrl: string = 'http://192.168.100.96:3000'
+// let serverSocketUrl: string = 'http://192.168.100.96:4047'
 let socket: Socket
-let client: string = 'client3'
+let client: string = 'client1'
 let mongoService: MongoService = new MongoService(client)
 
 
 establishSocketConnection(serverSocketUrl).then(() => {
-    // sender.subscribe({
-    //     next: (message: BaseMessage) => {
-    //         makeRequest(message).subscribe({
-    //             next: (message: BaseMessage) => {
-    //             },
-    //             complete: () => console.log(`Request ${message.header.messageID} has acquired all responses.`)
-    //         })
-    //     }
-    // })
+        // sender.sub`scribe({
+        //     next: (message: BaseMessage) => {
+        //         makeRequest(message).subscribe({
+        //             next: (message: BaseMessage) => {
+        //             },
+        //             complete: () => console.log(`Request ${message.header.messageID} has acquired all responses.`)
+        //         })
+        //     }
+        // })`
 })
 
 // the interface the client Program will make without having to decide transport protocol
@@ -34,15 +35,14 @@ function makeRequest(request: BaseMessage): Observable<BaseMessage> {
     return new Observable((response) => {
         sendMessage(request)
         toBePassedOverToApp.subscribe({
-            next: (message: BaseMessage) => {
+            next: (message: BaseMessage | any) => {
                 // console.log(message.header.messageName)
                 // The identification of responses mapping to the request be adjusted accordingly
                 // For now it's a simple demulti-plexing
-                if (message.header.messageID == request.header.messageID && message.header.messageName != 'Complete') {
-                    response.next(message)
-                }
-                if (message.header.messageID == request.header.messageID && message.header.messageName == 'Complete') {
+                if (message.complete) {
                     response.complete()
+                } else {
+                    response.next(message)
                 }
             },
             error: err => console.error(err),
@@ -76,19 +76,21 @@ async function establishSocketConnection(serverUrl: string): Promise<any> {
                 // socket.emit('Hello from the client!')
                 console.log('Connected to the server:', socket.id)
             });
-            
+
             // Listen for messages from the server
             socket.on('response', (msg: WrappedMessage) => {
-                console.log('Message from server:', msg.payload.header.messageName, ' for ', msg.payload.header.messageID);
-                mongoService.write(msg.payload, msg.payload.header.messageID, () => console.log(`Error function doesn't exist.`))
+                console.log('Message from server:', msg.payload.header.messageName ?? 'null', ' for ', msg.payload.header.messageID ?? 'complete');
+                if (!msg.payload.complete) {
+                    mongoService.write(msg.payload, msg.payload.header.messageID, () => console.log(`Error function doesn't exist.`))
+                }
                 // Check the sequence by ensuring the message value before the current message exists, then pass them over to "App"
                 // onHoldMessagesSubject.next(msg)
                 // checkMessage(msg, onHoldMessageSubject).then(() => [
-                    //     toBePassedOverToApp.next(msg.payload as BaseMessage)
-                    // ]).catch((err) => console.error(err))
-                toBePassedOverToApp.next(msg.payload as BaseMessage)
+                //     toBePassedOverToApp.next(msg.payload as BaseMessage)
+                // ]).catch((err) => console.error(err))
+                toBePassedOverToApp.next(msg.payload)
             })
-            
+
             socket.on('notification', (msg: any) => {
                 if (msg.notification == 'Your credentials') {
                     console.log(`Assigned client Name: ${msg.socketInfo.clientName}`)
@@ -131,11 +133,11 @@ function checkOwnClientInfo(filename: string): ClientInfo | null {
             if (fileData.trim() === "") {
                 throw new Error("File is empty");
             }
-            
+
             // Parse and return the data if present
             const jsonData = JSON.parse(fileData);
             return jsonData;
-            
+
         } catch (err) {
             // Handle parsing errors or other file-related errors
             console.error("Error reading or parsing file:", err.message);

+ 4 - 13
test/socket/socket-test-server.ts

@@ -29,9 +29,9 @@ incomingRequest.subscribe((request: BaseMessage) => {
     })
 })
 
-function returnResponse(request: BaseMessage): Observable<BaseMessage> {
+function returnResponse(request: BaseMessage): Observable<any> {
     return new Observable((observer) => {
-        prepareResponseMessages(30, 1000).subscribe({
+        prepareResponseMessages(20, 300).subscribe({
             next: (message: BaseMessage) => {
                 message.header.messageID = request.header.messageID
                 console.log(`Generating response message for ${request.header.messageID}`)
@@ -39,17 +39,8 @@ function returnResponse(request: BaseMessage): Observable<BaseMessage> {
             },
             error: err => console.error(err),
             complete: () => {
-                prepareResponseMessages(1, 1000).subscribe({
-                    next: message => {
-                        console.log(`Finalizing complete response for request${request.header.messageID}`)
-                        message.header.messageID = request.header.messageID
-                        message.header.messageName = 'Complete'
-                        observer.next(message)
-                    },
-                    complete: () => {
-                        observer.complete()
-                    }
-                })
+                observer.next({ header: { messageID: request.header.messageID }, complete: true })
+                observer.complete()
             }
         })
     })

+ 47 - 27
test/socket/socket.service.ts

@@ -1,11 +1,11 @@
-import { BehaviorSubject, Observable, Subject } from "rxjs"
+import { BehaviorSubject, catchError, Observable, of, Subject, Subscription, tap, timeout } from "rxjs"
 import { RetransmissionService } from "../../services/retransmission.service"
 import { BaseMessage } from "../../dependencies/logging/services/logging-service"
 import { v4 as uuidV4 } from 'uuid';
-import { Socket } from "socket.io-client";
 const express = require('express');
 const http = require('http');
-const { Server } = require('socket.io');
+// const { Server } = require('socket.io');
+import { Server } from 'socket.io'
 
 /* This is only for demonstration purposes. Because the actual nestjs socket implementation may differ. */
 export class SocketService {
@@ -15,7 +15,7 @@ export class SocketService {
     private app = express();
     private server = http.createServer(this.app);
     private io = new Server(this.server);
-    private responseFromApp: Subject<BaseMessage>
+    private responseFromApp: Subject<BaseMessage | any>
     private incomingRequest: Subject<BaseMessage> = new Subject()
 
     constructor(response: Subject<BaseMessage>) {
@@ -32,7 +32,6 @@ export class SocketService {
     public async setUpConnection() {
         this.io.on('connection', (socket) => {
             this.announcements.next('a client is connected:' + socket.id);
-            let clientIsOnline: BehaviorSubject<boolean> = new BehaviorSubject(true)
             let clientInfo: ClientInfo | null
 
             socket.on('connect', (msg) => {
@@ -49,7 +48,7 @@ export class SocketService {
                         clientConnectionState: new BehaviorSubject<'ONLINE' | 'OFFLINE'>('ONLINE'),
                         requests: [],
                         buffer: new RetransmissionService(),
-                        responseObs: new Subject<BaseMessage>()
+                        responseObs: new Subject<any>()
                     }
                     this.connectedClients.push(clientInfo);
 
@@ -64,7 +63,7 @@ export class SocketService {
                     clientInfo.buffer.retransmission(clientInfo.responseObs, clientInfo.clientConnectionState)
                     let subscription = clientInfo.buffer.returnBufferedMessages().subscribe(output => {
                         // console.log(output)
-                        if (clientIsOnline.getValue() === true) {
+                        if (clientInfo.clientConnectionState.getValue() === 'ONLINE') {
                             socket.emit('response', output)
                         } else {
                             subscription.unsubscribe()
@@ -74,12 +73,12 @@ export class SocketService {
 
                 if (msg.agenda == 'existingClient') {
                     // check if client exists
-                    let clientObj = this.connectedClients.find(obj => obj.clientName === msg.data.clientName)
+                    let clientObj: ClientInfo = this.connectedClients.find(obj => obj.clientName === msg.data.clientName)
                     if (clientObj) {
                         clientInfo = clientObj
                         console.log('Existing client found')
                         // but also update socketId
-                        clientObj.id = socket.id
+                        clientInfo.id = socket.id
 
                         // Send data over for client to persist
                         socket.emit('notification', {
@@ -91,14 +90,36 @@ export class SocketService {
                         // resume operation Release them buffer
                         /* local client isOnline need not be mutated, since this is a new connection. However the previous intance of client Connection State
                         inside the retransmission needs to be updated to release the buffered values.*/
-                        let subscription = clientObj.buffer.returnBufferedMessages().subscribe(output => {
-                            // console.log(output)
-                            if (clientIsOnline.getValue() === true) {
-                                socket.emit('response', output)
-                            } else {
-                                subscription.unsubscribe()
-                            }
-                        })
+                        function releaseBufferedItems(clientInfo: ClientInfo) {
+                            let subscription: Subscription = clientInfo.buffer.returnBufferedMessages().pipe(
+                                tap(message => {
+                                    if (clientInfo.clientConnectionState.getValue() === 'OFFLINE') {
+                                        clientInfo.responseObs.next(message)
+                                    }
+                                }),
+                                timeout(10000), // Unsubscribe if no value is emitted within 10 seconds
+                                catchError(err => {
+                                    if (err.name === 'TimeoutError') {
+                                        console.log('TimeoutError: No value emitted within 10 seconds.');
+                                        if (clientInfo.clientConnectionState.getValue() === 'ONLINE') {
+                                            releaseBufferedItems(clientInfo); // Call the function if it's still online
+                                        } else {
+                                            subscription.unsubscribe()
+                                        }
+                                    }
+                                    return of(); 
+                                })
+                            )
+                                .subscribe({
+                                    next: output => {
+                                        socket.emit('response', output)
+                                    },
+                                    error: err => console.error(err),
+                                    complete: () => { }
+                                })
+                        }
+                        releaseBufferedItems(clientInfo)
+                        //signal to release buffered items
                         clientObj.clientConnectionState.next('ONLINE')
                     } else {
                         console.log(this.connectedClients)
@@ -134,7 +155,6 @@ export class SocketService {
             // Handle disconnection
             socket.on('disconnect', () => {
                 if (clientInfo) {
-                    clientIsOnline.next(false)
                     clientInfo.clientConnectionState.next('OFFLINE') // signal to start buffering\
                     this.announcements.next(`Client ${clientInfo.id} disconnected`);
                     // this.deleteClientById(socket.id)
@@ -167,17 +187,17 @@ export class SocketService {
         }
     }
 
-    private processRequest(request: BaseMessage): Observable<BaseMessage> {
+    private processRequest(request: BaseMessage): Observable<any> {
         return new Observable((observer) => {
             this.responseFromApp.subscribe(message => {
                 // console.log(message)
-                if (message.header.messageID === request.header.messageID && message.header.messageName != 'Complete') {
-                    observer.next(message)
-                }
-                if (message.header.messageID === request.header.messageID && message.header.messageName == 'Complete') {
-                    observer.next(message)
-                    // console.log(message) // so it does show
-                    observer.complete()
+                if (message.header.messageID === request.header.messageID) {
+                    if (!message.complete) {
+                        observer.next(message)
+                    } else {
+                        observer.next(message)
+                        observer.complete()
+                    }
                 }
             })
         })
@@ -193,5 +213,5 @@ export interface ClientInfo {
     clientConnectionState: BehaviorSubject<'ONLINE' | 'OFFLINE'>,
     requests: { message: any, completed: boolean }[],
     buffer: RetransmissionService,
-    responseObs: Subject<BaseMessage>
+    responseObs: Subject<any>
 }