Browse Source

1 on 1 retransmission fixed

Enzo 1 month ago
parent
commit
b0d2ed0081

+ 4 - 1
interfaces/retransmission.interface.ts

@@ -1,5 +1,8 @@
 import { Observable } from "rxjs";
+import { WrappedMessage } from "./general.interface";
 
 export interface RetransmissionInterface  {
-    retransmission(payloadToBeTransmitted: Observable<any>, eventListener: Observable<any>): Observable<any>
+    retransmission(payloadToBeTransmitted: Observable<any>, eventListener: Observable<any>)
+
+    returnBufferedMessages(): Observable<WrappedMessage>
 }

+ 15 - 16
services/retransmission.service.ts

@@ -17,27 +17,26 @@ export class RetransmissionService implements RetransmissionInterface {
     private messageToBeTransmitted: Subject<WrappedMessage> = new Subject()
 
     // Interface
-    public retransmission(payloadToBeTransmitted: Observable<any>, eventListener: Observable<any>, messageOrdering?: boolean): Observable<WrappedMessage> {
-        return new Observable((observer) => {
-            if (messageOrdering) {
-                this.sortMessage = true
-                console.log(`Message ordering is set to ${this.sortMessage}`)
-            }
-            eventListener.subscribe(event => this.receiverConnectionState.next(event))
-
-            this.startWrappingOperation()
-            this.startBufferTransmisionProcess()
-            this.releaseSignalManager()
+    public retransmission(payloadToBeTransmitted: Observable<any>, eventListener: Observable<any>, messageOrdering?: boolean) {
+        if (messageOrdering) {
+            this.sortMessage = true
+            console.log(`Message ordering is set to ${this.sortMessage}`)
+        }
+        eventListener.subscribe(event => this.receiverConnectionState.next(event))
 
-            payloadToBeTransmitted.subscribe((message) => {
-                this.toBeWrapped.next(message)
-            })
+        this.startWrappingOperation()
+        this.startBufferTransmisionProcess()
+        this.releaseSignalManager()
 
-            // these ones are the ones that are exited out from the bufferey'
-            this.messageToBeTransmitted.subscribe(message => observer.next(message))
+        payloadToBeTransmitted.subscribe((message) => {
+            this.toBeWrapped.next(message)
         })
     }
 
+    public returnBufferedMessages(): Observable<WrappedMessage> {
+        return this.messageToBeTransmitted.asObservable()
+    }
+
     private startWrappingOperation() {
         this.toBeWrapped.subscribe(message => {
             this.wrappedMessageToBeBuffered.next(this.wrapMessageWithTimeReceived(message, this.currentMessageId ? this.currentMessageId : null))

+ 8 - 6
test/socket/socket-client.ts

@@ -10,7 +10,7 @@ let onHoldMessagesSubject: Subject<WrappedMessage> = new Subject()
 let toBePassedOverToApp: Subject<BaseMessage> = new Subject()
 // Serve static files (optional)
 let sender: Subject<BaseMessage> = prepareResponseMessages(1, 2000)
-let serverSocketUrl: string = 'http://localhost:3000'
+let serverSocketUrl: string = 'http://192.168.100.96:3000'
 let socket: Socket
 
 
@@ -74,7 +74,7 @@ async function establishSocketConnection(serverUrl: string): Promise<any> {
 
             // Listen for messages from the server
             socket.on('response', (msg: WrappedMessage) => {
-                // console.log('Message from server:', msg.payload.header.messageName);
+                console.log('Message from server:', msg.payload.header.messageName, ' for ', msg.payload.header.messageID);
 
                 // Check the sequence by ensuring the message value before the current message exists, then pass them over to "App"
                 // onHoldMessagesSubject.next(msg)
@@ -90,12 +90,14 @@ async function establishSocketConnection(serverUrl: string): Promise<any> {
                     writeFile(msg.socketInfo as ClientInfo)
                 }
                 if (msg.notification == 'Your updated credentials') {
-                    console.log(`Updated socket ID: ${msg.socketInfo.id}`)
-                    writeFile(msg.socketInfo as ClientInfo)
+                    console.log(`Updated socket ID: `, msg)
+                    // writeFile(msg.socketInfo as ClientInfo)
                 }
                 if (msg.notification == 'Failed Request') {
                     console.log(`Resending request...`, msg.data.header.messageID)
-                    // sender.next(msg.data)
+                    setTimeout(() => {
+                        sender.next(msg.data)
+                    }, 1000)
                 }
             })
 
@@ -173,7 +175,7 @@ async function checkMessage(message: WrappedMessage): Promise<any> {
     return new Promise((resolve, reject) => {
         if (message.previousMessageID) {
             onHoldMessagesSubject.pipe(
-                takeWhile(item => message.previousMessageID === item.thisMessageID )
+                takeWhile(item => message.previousMessageID === item.thisMessageID)
             ).subscribe({
                 complete: () => {
                     resolve('previousMessageID matched')

+ 1 - 1
test/socket/socket-test-server.ts

@@ -31,7 +31,7 @@ incomingRequest.subscribe((request: BaseMessage) => {
 
 function returnResponse(request: BaseMessage): Observable<BaseMessage> {
     return new Observable((observer) => {
-        prepareResponseMessages(10, 500).subscribe({
+        prepareResponseMessages(10, 1000).subscribe({
             next: (message: BaseMessage) => {
                 message.header.messageID = request.header.messageID
                 observer.next(message)

+ 68 - 56
test/socket/socket.service.ts

@@ -32,6 +32,7 @@ export class SocketService {
     public async setUpConnection() {
         this.io.on('connection', (socket) => {
             this.announcements.next('a client is connected:' + socket.id);
+            let clientIsOnline: BehaviorSubject<boolean> = new BehaviorSubject(true)
             let clientInfo: ClientInfo | null
 
             socket.on('connect', (msg) => {
@@ -40,7 +41,71 @@ export class SocketService {
 
             socket.on('notification', (msg) => {
                 console.log(msg)
-                clientInfo = this.handleNotification(msg, socket, clientInfo)
+                if (msg.agenda == 'newClient') {
+                    clientInfo = {
+                        id: socket.id,
+                        clientName: uuidV4(),
+                        connectedAt: new Date(),
+                        clientConnectionState: new BehaviorSubject<'ONLINE' | 'OFFLINE'>('ONLINE'),
+                        requests: [],
+                        buffer: new RetransmissionService(),
+                        responseObs: new Subject<BaseMessage>()
+                    }
+                    this.connectedClients.push(clientInfo);
+
+                    // Send data over for client to persist
+                    socket.emit('notification', {
+                        notification: 'Your credentials',
+                        createdAt: new Date(),
+                        socketInfo: clientInfo
+                    })
+
+                    // this is the supposed responses to be pushed to this socket client
+                    clientInfo.buffer.retransmission(clientInfo.responseObs, clientInfo.clientConnectionState)
+                    let subscription = clientInfo.buffer.returnBufferedMessages().subscribe(output => {
+                        // console.log(output)
+                        if (clientIsOnline.getValue() === true) {
+                            socket.emit('response', output)
+                        } else {
+                            subscription.unsubscribe()
+                        }
+                    })
+                }
+
+                if (msg.agenda == 'existingClient') {
+                    // check if client exists
+                    let clientObj = this.connectedClients.find(obj => obj.clientName === msg.data.clientName)
+                    if (clientObj) {
+                        // clientInfo = clientObj
+                        console.log('Existing client found')
+                        // but also update socketId
+                        clientObj.id = socket.id
+
+                        // Send data over for client to persist
+                        socket.emit('notification', {
+                            notification: 'Your updated credentials',
+                            connectedAt: new Date(),
+                            updatedId: socket.id
+                        })
+
+                        socket.emit('notification', `Hello from server. You have been assigned ${socket.id}`);
+                        // resume operation Release them buffer
+                        /* local client isOnline need not be mutated, since this is a new connection. However the previous intance of client Connection State
+                        inside the retransmission needs to be updated to release the buffered values.*/
+                        clientObj.clientConnectionState.next('ONLINE')
+                        let subscription = clientObj.buffer.returnBufferedMessages().subscribe(output => {
+                            // console.log(output)
+                            if (clientIsOnline.getValue() === true) {
+                                socket.emit('response', output)
+                            } else {
+                                subscription.unsubscribe()
+                            }
+                        })
+                    } else {
+                        console.log(this.connectedClients)
+                        console.log(`Existing Client is not found`)
+                    }
+                }
             })
 
             // Listen for messages from the client
@@ -70,7 +135,8 @@ export class SocketService {
             // Handle disconnection
             socket.on('disconnect', () => {
                 if (clientInfo) {
-                    clientInfo.clientConnectionState.next('OFFLINE') // signal to start buffering
+                    clientIsOnline.next(false)
+                    clientInfo.clientConnectionState.next('OFFLINE') // signal to start buffering\
                     this.announcements.next(`Client ${clientInfo.id} disconnected`);
                     // this.deleteClientById(socket.id)
                 }
@@ -118,60 +184,6 @@ export class SocketService {
         })
     }
 
-    private handleNotification(msg: any, socket: Socket, clientInfo: ClientInfo | null) {
-        if (msg.agenda == 'newClient') {
-            clientInfo = {
-                id: socket.id,
-                clientName: uuidV4(),
-                connectedAt: new Date(),
-                clientConnectionState: new BehaviorSubject<'ONLINE' | 'OFFLINE'>('ONLINE'),
-                requests: [],
-                buffer: new RetransmissionService(),
-                responseObs: new Subject<BaseMessage>()
-            }
-            this.connectedClients.push(clientInfo);
-
-            // Send data over for client to persist
-            socket.emit('notification', {
-                notification: 'Your credentials',
-                createdAt: new Date(),
-                socketInfo: clientInfo
-            })
-
-            // this is the supposed responses to be pushed to this socket client
-            clientInfo.buffer.retransmission(clientInfo.responseObs, clientInfo.clientConnectionState).subscribe(output => {
-                // console.log(output)
-                socket.emit('response', output)
-            })
-        }
-
-        if (msg.agenda == 'existingClient') {
-            // check if client exists
-            let clientObj = this.connectedClients.find(obj => obj.clientName === msg.data.clientName)
-            if (clientObj) {
-                clientInfo = clientObj
-                console.log('Existing client found')
-                // but also update socketId
-                clientObj.id = socket.id
-
-                // Send data over for client to persist
-                socket.emit('notification', {
-                    notification: 'Your updated credentials',
-                    connectedAt: new Date(),
-                    socketInfo: clientInfo
-                })
-
-                socket.emit('notification', `Hello from server. You have been assigned ${socket.id}`);
-                // resume operation
-                clientObj.clientConnectionState.next('ONLINE')
-            } else {
-                console.log(this.connectedClients)
-                console.log(`Existing Client is not found`)
-            }
-        }
-
-        return clientInfo
-    }
 
 }