Browse Source

transmitter and receiver start up. Able to start itself each.

Enzo 4 weeks ago
parent
commit
cc654e8ac1

+ 8 - 0
Client3f03a18b-13d6-4e99-ba57-1d9119f20a29.json

@@ -0,0 +1,8 @@
+{
+  "uuid": "380dc51e-9dca-4be0-b020-7e45577688f5",
+  "name": "Client3f03a18b-13d6-4e99-ba57-1d9119f20a29",
+  "dateCreated": "2024-10-15T08:15:07.106Z",
+  "transportType": "WEBSOCKET",
+  "eventNotification": null,
+  "instance": null
+}

+ 1 - 1
package.json

@@ -7,7 +7,7 @@
     "test": "echo \"Error: no test specified\" && exit 1",
     "build": "tsc",
     "start": "node dist/index.js",
-    "transmit": "node dist/test/transmitter.js",
+    "transmitter": "node dist/test/transmitter.js",
     "receiver": "node dist/test/receiver.js"
   },
   "author": "",

+ 33 - 5
src/interface/ITransport.interface.ts

@@ -1,9 +1,15 @@
 import { Observable, Subject } from "rxjs";
+import { RetransmissionService } from "../utils/retransmission.service";
 
-export type TransportEvent = 'Connection' | 'New Message'
+export type TransportManagerEvent = 'New Client Connection' | 'New Client Request' | 'Client Disconnected'
+export type TransportEvent = 'Connection' | 'New Message' | 'Disconnection'
 export type TransportType = 'WEBSOCKET' | 'GRPC' | 'HTTP' | 'KAFKA' | undefined
+
+export interface TransportManager {
+    getTransportManagerEventNotification(): Observable<TransportManagerEventNotification>
+}
 export interface ITransport {
-    getEventNotification(): Observable<TransportEventNotification>
+    getTransportEventNotification(): Observable<TransportEventNotification>
 }
 
 export interface ITransportTransmitting extends ITransport {
@@ -11,7 +17,7 @@ export interface ITransportTransmitting extends ITransport {
 }
 
 export interface ITransportReceiving extends ITransport {
-    receive(): Observable<any>
+    receive(): Observable<TransportMessage>
 }
 
 export interface ITransportSuper extends ITransportTransmitting, ITransportReceiving {
@@ -47,17 +53,39 @@ export interface ReceiverProfile {
     instance?: any
 }
 
+export interface ReceiverRetransmissionProfile extends ReceiverProfile {
+    buffer: RetransmissionService
+}
+
 export interface TransportEventNotification {
     event: TransportEvent,
     description: string,
     transportType: TransportType,
-    data?: any
+    data?: TransportPayload
 }
 
-
+export interface TransportManagerEventNotification {
+    event: TransportManagerEvent,
+    data?: TransportPayload
+}
 export interface TransportMessage {
     id: string,
     receiverID: string,
     payload: any,
     event: string
+}
+
+export interface TransportPayload {
+    receiverID: string,
+    receiverName: string,
+    date: Date,
+    payload?: any
+}
+
+
+export interface WrappedMessage {
+    timeReceived: any, // this property is for sender to sort
+    payload: any,
+    thisMessageID?: string,
+    previousMessageID?: string | null // this property is for receiver to sort
 }

+ 114 - 0
src/test/receiver.ts

@@ -0,0 +1,114 @@
+import { interval, map, Observable, Observer, Subject } from "rxjs"
+import { v4 as uuidv4 } from 'uuid'
+import { FisMessage, TransportManager } from "../transport/transport.manager"
+import { ReceiverRetransmissionProfile, TransportManagerEventNotification } from "../interface/ITransport.interface"
+import { RetransmissionService } from "../utils/retransmission.service"
+
+// need to instantiate to grab both incoming and outgoing
+
+class Application {
+    service!: Blue
+    notification = new Subject<any>()
+    constructor(service: Blue) {
+        this.service = service
+
+        // this one will directly send over 
+        // this.subscribeNotification().subscribe(notification => this.service.transportManager.send(notification))
+        this.notify()
+    }
+
+    subscribeNotification(): Observable<FisMessage> {
+        return this.notification.asObservable()
+    }
+
+    notify() {
+        // logic here
+        let output = interval(1000)
+        output.pipe(map(
+            value => {
+                let message: FisMessage = {
+                    header: {
+                        messageID: uuidv4(),
+                        messageName: `NotificationMessage`
+                    },
+                    data: `Notification Message ${value}`
+                }
+                return message
+            }
+        )).subscribe(this.notification)
+    }
+
+    /* EXTRA. Appplication acting as a receiver */
+    processRequest(request: FisMessage): Observable<FisMessage> {
+        return new Observable((response) => {
+            // process responses. Should be called by Blue after receiving a request
+        })
+    }
+    // This two only exist at this level. 
+    send(message: FisMessage): Observable<FisMessage> {
+        return new Observable((response) => {
+            // logic here
+        })
+    }
+}
+
+/* At the highest level, blue will also need to know who the receiver is so that it can establsih said one to one relationsuhip */
+class Blue {
+    transportManager!: TransportManager
+    receiverProfiles: ReceiverRetransmissionProfile[] = []
+    event: Subject<TransportManagerEventNotification> = new Subject()
+    constructor(transportManager: TransportManager) {
+        this.transportManager = transportManager
+        this.subscribeForTransportManagerEvent()
+        this.manageReceiverCommunication()
+    }
+
+    private subscribeForTransportManagerEvent(): void {
+        this.transportManager.getTransportManagerEventNotification().subscribe({
+            next: (notification: TransportManagerEventNotification) => {
+                this.event.next(notification)
+            }
+        })
+    }
+
+    private manageReceiverCommunication(): void {
+        this.event.subscribe((notification: TransportManagerEventNotification) => {
+            if (notification.event == 'New Client Connection') {
+                // first thing first, it will check if it's a previously connected receiver
+                this.checkIfPreviouslyConnected(notification.data?.receiverID as string) // but this id is assigned at connection though So that means the receiver must self identify
+                let adjustedReceiverProfile: ReceiverRetransmissionProfile = notification.data?.payload
+                adjustedReceiverProfile.buffer = new RetransmissionService()
+                // retransmission to be activated. Now it's just assigning it there, but idling.
+                this.receiverProfiles.push(adjustedReceiverProfile)
+                console.log(`BLUE: Added new Receiver Profile ${notification.data?.receiverID} to record. `)
+            }
+            if (notification.event == 'Client Disconnected') {
+                console.log(`BLUE: Client ${notification.data?.receiverID} disconnected. Buffering....`)
+                let receiverProfile: ReceiverRetransmissionProfile | undefined = this.receiverProfiles.find(obj => obj.uuid === notification.data?.receiverID)
+                if (receiverProfile) {
+                    // then notifiy retransmission to stop releasing the buffer 
+                }
+            }
+            if (notification.event == 'New Client Request') {
+                this.manageReceiverRequest(notification.data?.payload)
+                console.log(`BLUE: Client request ${notification.data?.payload?.header?.messageID ?? `messageID undefined`} received.`)
+            }
+        })
+    }
+
+    private manageReceiverRequest(request: FisMessage): void {
+
+    }
+
+    private checkIfPreviouslyConnected(receiverID: string): boolean {
+        return this.receiverProfiles.some(obj => obj.uuid === receiverID)
+    }
+
+
+}
+
+
+const application = new Application(new Blue(new TransportManager(undefined, 'http://localhost:3000')))
+
+/* Transmitter acting as a receiver */
+// logic here

+ 52 - 12
src/test/transmitter.ts

@@ -1,6 +1,9 @@
-import { interval, map, Observable, Subject } from "rxjs"
+import { interval, map, Observable, Observer, Subject } from "rxjs"
 import { v4 as uuidv4 } from 'uuid'
 import { FisMessage, TransportManager } from "../transport/transport.manager"
+import { ReceiverProfile, ReceiverRetransmissionProfile, TransportEventNotification, TransportManagerEventNotification, TransportMessage } from "../interface/ITransport.interface"
+import { error } from "console"
+import { RetransmissionService } from "../utils/retransmission.service"
 
 // need to instantiate to grab both incoming and outgoing
 
@@ -11,8 +14,7 @@ class Application {
         this.service = service
 
         // this one will directly send over 
-        // this.subscribeNotification().subscribe(notification => this.service.transportManager.send(notification))
-        this.notify()
+        // this.notify()
     }
 
     subscribeNotification(): Observable<FisMessage> {
@@ -50,27 +52,65 @@ class Application {
     }
 }
 
-/* Actually this class will acquire a high level information on the receiver as well.
-Because it's the retransmission that instantiated in this layer.
-So blue will work closely with the transport manager to acquire the necessary notifications with regards to the receiver's 
-information and it's status, but it should not concern itself with the intricacies of how the messages will be transmitted over. */
+/* At the highest level, blue will also need to know who the receiver is so that it can establsih said one to one relationsuhip */
 class Blue {
     transportManager!: TransportManager
-    receiverProfiles: any[] = []
+    receiverProfiles: ReceiverRetransmissionProfile[] = []
+    event: Subject<TransportManagerEventNotification> = new Subject()
     constructor(transportManager: TransportManager) {
         this.transportManager = transportManager
+        this.subscribeForTransportManagerEvent()
+        this.manageReceiverCommunication()
+    }
+
+    private subscribeForTransportManagerEvent(): void {
+        this.transportManager.getTransportManagerEventNotification().subscribe({
+            next: (notification: TransportManagerEventNotification) => {
+                this.event.next(notification)
+            }
+        })
+    }
+
+    private manageReceiverCommunication(): void {
+        this.event.subscribe((notification: TransportManagerEventNotification) => {
+            if (notification.event == 'New Client Connection') {
+                // first thing first, it will check if it's a previously connected receiver
+                this.checkIfPreviouslyConnected(notification.data?.receiverID as string) // but this id is assigned at connection though So that means the receiver must self identify
+                let adjustedReceiverProfile: ReceiverRetransmissionProfile = notification.data?.payload
+                adjustedReceiverProfile.buffer = new RetransmissionService()
+                // retransmission to be activated. Now it's just assigning it there, but idling.
+                this.receiverProfiles.push(adjustedReceiverProfile)
+                console.log(`BLUE: Added new Receiver Profile ${notification.data?.receiverID} to record. `)
+            }
+            if (notification.event == 'Client Disconnected') {
+                console.log(`BLUE: Client ${notification.data?.receiverID} disconnected. Buffering....`)
+                let receiverProfile: ReceiverRetransmissionProfile | undefined = this.receiverProfiles.find(obj => obj.uuid === notification.data?.receiverID)
+                if (receiverProfile) {
+                    // then notifiy retransmission to stop releasing the buffer 
+                }
+            }
+            if (notification.event == 'New Client Request') {
+                this.manageReceiverRequest(notification.data?.payload)
+                console.log(`BLUE: Client request ${notification.data?.payload?.header?.messageID ?? `messageID undefined`} received.`)
+            }
+        })
+    }
+
+    private manageReceiverRequest(request: FisMessage): void {
 
-        this.processIncomingRequest()
     }
 
-    processIncomingRequest(): void {
-        // prcoess incoming request based on the receiver's profile relevant request
+    private checkIfPreviouslyConnected(receiverID: string): boolean {
+        return this.receiverProfiles.some(obj => obj.uuid === receiverID)
     }
 
+
 }
 
 
-const application = new Application(new Blue(new TransportManager(3000, 'http://localhost:3000')))
+const application = new Application(new Blue(new TransportManager(3000)))
+// const application = new Application(new Blue(new TransportManager(null, 'http://localhost:3000')))
+// const application = new Application(new Blue(new TransportManager(3000, 'http://localhost:3000')))
 
 /* Transmitter acting as a receiver */
 // logic here

+ 3 - 5
src/transport/gRPC.ts

@@ -2,11 +2,9 @@ import { Observable } from "rxjs";
 import { ITransport, ReceiverProfile, TransmitterProfile, TransportEventNotification } from "../interface/ITransport.interface";
 
 export class GrpcTransportService implements ITransport {
-    getInfo(type: "transmitter" | "receiver", id?: string): TransmitterProfile | ReceiverProfile | null {
+    getTransportEventNotification(): Observable<TransportEventNotification> {
         throw new Error("Method not implemented.");
     }
-    getEventNotification(): Observable<TransportEventNotification> {
-        throw new Error("Method not implemented.");
-    }
-    
+
+
 }

+ 53 - 31
src/transport/transport.manager.ts

@@ -1,17 +1,17 @@
-import { Observable, Subject } from "rxjs";
+import { Observable, Observer, Subject } from "rxjs";
 import dotenv from 'dotenv';
 import { v4 as uuidv4 } from 'uuid';
 import { WebsocketTransportService } from "./websocket";
-import { ITransportReceiving, ITransportSuper, ITransportTransmitting, ReceiverProfile, TransportEventNotification, TransportMessage, TransportSettings, TransportType } from "../interface/ITransport.interface";
-import { error } from "console";
+import { ITransportReceiving, ITransportSuper, ITransportTransmitting, ReceiverProfile, TransportEventNotification, TransportManagerEventNotification, TransportMessage, TransportSettings, TransportType } from "../interface/ITransport.interface";
+
 dotenv.config();
 /* This transport manager will be instantiating the necessary transport to deal with tranmission and receiving from different receivers
 So how?: */
 export class TransportManager {
-    private incomingMessage: Subject<TransportMessage> = new Subject()
-    private outGoingMessage: Subject<TransportMessage> = new Subject()
-    private responseMessage: Subject<FisMessage> = new Subject()
+    private incomingMessage: Subject<TransportMessage> = new Subject() // only for client roles usage. Servers will listen to event notification for incoming requests 
+    private outGoingMessage: Subject<FisMessage> = new Subject()
     private connectedClients: ReceiverProfile[] = []
+    private transportEventNotification: Subject<TransportEventNotification> = new Subject()
     // this is assuming that the application will do request response, otherwise, just one will do.
     private transportService!: ITransportSuper
     private transmittingService!: ITransportTransmitting
@@ -20,22 +20,23 @@ export class TransportManager {
     private serverUrl!: string
 
     constructor(port?: number, url?: string) {
-        /* For web browser ui, because this server can also act as a receiver, not just transmitter as well.  */
         if (port) {
             this.serverPort = port
-            this.establishTransportTransmitting(process.env.Transport as TransportType).then((transmission: ITransportTransmitting) => {
-                transmission.getEventNotification().subscribe((notification: TransportEventNotification) => {
+            this.establishTransportTransmitting(process.env.Transport as TransportType).then((transmissionService: ITransportTransmitting) => {
+                transmissionService.getTransportEventNotification().subscribe((notification: TransportEventNotification) => {
+                    // Collect client information when they are connected 
                     if (notification.event == `Connection` && notification.description == 'New Client Connected') {
-                        this.connectedClients.push(notification.data)
-                        this.handleNewReceiver(notification.data)
+                        this.connectedClients.push(notification.data?.payload)
+                        // console.log(this.connectedClients) // we do get the connected clients
+                        this.handleNewReceiver(notification.data?.payload)
                     }
                 })
             }).catch((error) => console.error(error))
         }
+        /* For web browser ui, because this server can also act as a receiver, not just transmitter as well.  */
         if (url) {
             this.serverUrl = url
             this.establishTransportReceiving(process.env.Transport as TransportType).then((receivingService: ITransportReceiving) => {
-                // console.log(receivingService.getInfo('receiver'))
                 receivingService.receive().subscribe(this.incomingMessage)
             })
         }
@@ -43,23 +44,32 @@ export class TransportManager {
         this.processOutputMessage()
     }
 
-    public send(message: FisMessage): void {
-        this.responseMessage.next(message)
+    public getTransportManagerEventNotification(): Observable<TransportManagerEventNotification> {
+        return new Observable((notification: Observer<TransportManagerEventNotification>) => {
+            this.transportEventNotification.subscribe((transportNotification: TransportEventNotification) => {
+                if (transportNotification.event == 'Connection') {
+                    notification.next({
+                        event: 'New Client Connection',
+                        data: transportNotification.data
+                    })
+                }
+            })
+        })
     }
 
     private async establishTransportTransmitting(tranportType: TransportType): Promise<ITransportTransmitting> {
-        // console.log(`Setting up ${tranportType.transportName} transmitter`)
+        console.log(`Setting up ${tranportType} transmitter`)
         return new Promise((resolve, reject) => {
             let setting: TransportSettings = {
                 profileInfo: {
                     id: uuidv4(),
-                    name: 'Server',
+                    name: 'For Server',
                     port: this.serverPort,
                     transport: tranportType
                 }
             }
             if (tranportType == 'WEBSOCKET') {
-                this.transportService = new WebsocketTransportService(setting)
+                this.transportService = this.transportService ? this.transportService : new WebsocketTransportService(setting)
                 this.transmittingService = this.transportService
                 resolve(this.transmittingService)
             } else {
@@ -68,11 +78,19 @@ export class TransportManager {
         })
     }
 
-    private async establishTransportReceiving(tranportType: TransportType): Promise<any> {
+    private async establishTransportReceiving(tranportType: TransportType): Promise<ITransportReceiving> {
         return new Promise((resolve, reject) => {
+            let setting: TransportSettings = {
+                profileInfo: {
+                    id: uuidv4(), 
+                    name: 'For Client',
+                    url: this.serverUrl,
+                    transport: tranportType
+                }
+            }
             // will be sharing one instance, This part may break the code
             if (tranportType == 'WEBSOCKET') {
-                this.receivingService = this.transportService
+                this.receivingService = this.transportService ? this.transportService : new WebsocketTransportService(setting)
                 resolve(this.receivingService)
             }
         })
@@ -80,12 +98,12 @@ export class TransportManager {
 
     // This function will wrap the message to be delivered over to established transport to be sent over
     private processOutputMessage(): void {
-        this.responseMessage.subscribe({
+        this.outGoingMessage.subscribe({
             next: (message: FisMessage) => {
                 if (message.header.messageName == 'NotificationMessage') {
                     // This is just here temporarily. Because The application will be concerned with whom the party is subcribed to, not this transport manager
-                    this.outGoingMessage.next({
-                        id: '',
+                    this.transmittingService.transmit({
+                        id: message.header.messageID,
                         receiverID: '',
                         payload: message,
                         event: `notification`
@@ -93,8 +111,8 @@ export class TransportManager {
                 }
                 if (message.header.messageName == 'ResponseMessage') {
                     // Map responses according to the requestID???
-                    this.outGoingMessage.next({
-                        id: '',
+                    this.transmittingService.transmit({
+                        id: message.header.messageID,
                         receiverID: '',
                         payload: message,
                         event: `resposne`
@@ -104,19 +122,23 @@ export class TransportManager {
         })
     }
 
-    private transmitWrappedMessages(): void {
-        this.outGoingMessage.subscribe(message => {
-            this.transmittingService.transmit(message)
-        })
-    }
-
     // Responsibility include subscribing to event notification as well as tranpport instance for message transmission
     private handleNewReceiver(receiver: ReceiverProfile) {
         receiver.eventNotification.subscribe({
             next: event => {
                 // new request will be handled. And then manager will here will pick it up from eventNotification and respond accordingly if there's a need for it
+                if (event.event == `Disconnection`) {
+                    console.error(`Client ${event.data?.payload.id} disconnected.`)
+                    receiver.eventNotification.complete() // no need to listen since it's disconnected. A new one will be established when it reconnects again
+                }
+                if (event.event == `New Message`) {
+                    console.log(`New Client request ${event.data?.payload.header.messageID} at receiverID: ${receiver.uuid}`)
+                }
             },
-            error: error => console.error(error)
+            error: error => {
+                console.error(error)
+                receiver.eventNotification.error(error)
+            }
         })
     }
 }

+ 22 - 48
src/transport/websocket.ts

@@ -1,25 +1,33 @@
 import { Observable, Subject } from "rxjs";
-import { Socket, Socket as ClientSocket } from "socket.io-client";
+import { Socket as ClientSocket } from "socket.io-client";
 import { Socket as SocketForConnectedClient } from "socket.io"
-import { startClientSocketConnection, startSocketServer } from "../utils/socket.utils";
+import { handleClientSocketConnection, handleNewSocketClient, startClientSocketConnection, startSocketServer, writeFile } from "../utils/socket.utils";
 import { ITransportReceiving, ITransportTransmitting, ReceiverProfile, TransmitterProfile, TransportEventNotification, TransportMessage, TransportSettings } from "../interface/ITransport.interface";
-import { v4 as uuidv4 } from 'uuid'
 
 /* Just code in the context that this websocket service will be handling multiple UI clients. Can think about the server communication at a later time. */
 export class WebsocketTransportService implements ITransportTransmitting, ITransportReceiving {
-    private transmitterProfile!: TransmitterProfile | null
     private socketReceiverProfile: ReceiverProfile[] = []
-    private clientSocket!: ClientSocket
-    private incomingMessage: Subject<TransportMessage> = new Subject()
+    private incomingMessage: Subject<TransportMessage> = new Subject() // this is only for client roles only atm
     private eventNotification: Subject<TransportEventNotification> = new Subject()
 
     constructor(setting: TransportSettings) {
-        startSocketServer(setting.profileInfo.port as number).subscribe({
-            next: (connectedClient: SocketForConnectedClient) => {
-                this.handleNewSocketClient(connectedClient)
-            },
-            error: error => console.error(error),
-        })
+        if (setting.profileInfo.port) {
+            startSocketServer(setting.profileInfo.port as number).subscribe({
+                next: (connectedClient: SocketForConnectedClient) => {
+                    handleNewSocketClient(connectedClient, this.eventNotification, this.socketReceiverProfile)
+                },
+                error: error => console.error(error),
+            })
+        }
+
+        // this is for those who wants to act as receiver. Usually for web browser, but servers can use this too.
+        if (setting.profileInfo.url) {
+            startClientSocketConnection(setting.profileInfo.url).then((socket: ClientSocket) => {
+                handleClientSocketConnection(socket, this.incomingMessage).subscribe(this.eventNotification)
+            }).catch((error) => {
+                console.error(`WebsocketTransport ERROR:`, error)
+            })
+        }
     }
 
     // for transmission(Server Only, not applicable for client Socket)
@@ -35,46 +43,12 @@ export class WebsocketTransportService implements ITransportTransmitting, ITrans
         })
     }
 
-    public getEventNotification(): Observable<TransportEventNotification> {
+    public getTransportEventNotification(): Observable<TransportEventNotification> {
         return this.eventNotification as Observable<TransportEventNotification>
     }
 
-    // Set up socket listeners to start listening for different events
-    private handleNewSocketClient(socket: SocketForConnectedClient): void {
-        console.log(`Setting up listeners for socket:${socket.id}`)
-        // returns the socket client instance 
-        let receiverProfile: ReceiverProfile = {
-            uuid: uuidv4(),
-            name: `Client`,
-            dateCreated: new Date(),
-            transportType: `WEBSOCKET`,
-            eventNotification: new Subject(),
-            instance: socket
-        }
-        this.eventNotification.next({ event: 'Connection', description: 'New Client Connected', transportType: `WEBSOCKET`, data: receiverProfile })
-        this.socketReceiverProfile.push(receiverProfile)
-
-        /* Generally, we don't need this unless in the case of being the receiver */
-        socket.on('message', (message: any) => {
-            // here
-        })
-
-        socket.on('request', (request: any) => {
-            // here
-        })
-
-        socket.on('notification', (notification: any) => {
-            // logic here
-        })
-
-        socket.on('disconnect', () => {
-            receiverProfile.eventNotification.next({ event: 'Connection', description: `Existing Client ${socket.id} disonnected`, transportType: `WEBSOCKET`, data: receiverProfile })
-        })
-    }
-
-
     // for client UI eg
-    public receive(): Observable<any> {
+    public receive(): Observable<TransportMessage> {
         return this.incomingMessage.asObservable()
     }
 }

+ 29 - 0
src/utils/message.ordering.ts

@@ -0,0 +1,29 @@
+import { Subject, takeWhile } from "rxjs";
+import { WrappedMessage } from "../interface/ITransport.interface";
+
+export function sortMessageBasedOnDate(array: WrappedMessage[]): WrappedMessage[] {
+    console.log(`Sorting ${array.length} messages....`)
+    return array.sort((a, b) => {
+        return new Date(a.timeReceived).getTime() - new Date(b.timeReceived).getTime();
+    });
+}
+
+// SO concept will be that if the message behind it is received, then 
+export async function checkMessage(message: WrappedMessage, messageChecking: Subject<WrappedMessage>): Promise<any> {
+    return new Promise((resolve, reject) => {
+        if (message.previousMessageID) {
+            messageChecking.pipe(
+                takeWhile(item => message.previousMessageID === item.thisMessageID)
+            ).subscribe({
+                complete: () => {
+                    resolve('previousMessageID matched')
+                    // console.log(`${message.payload.header.messageID} : Previous messageID(${message.previousMessageID}) matched`)
+                    // console.log(`matched`)
+                }
+            })
+        } else {
+            console.log('No previous messageID. This should be the first message')
+            resolve('No previous message ID. Please Proceed.')
+        }
+    })
+}

+ 132 - 0
src/utils/retransmission.service.ts

@@ -0,0 +1,132 @@
+import { BehaviorSubject, buffer, distinctUntilChanged, from, Observable, Subject, takeWhile } from "rxjs";
+import { v4 as uuidV4 } from 'uuid';
+import { WrappedMessage } from "../interface/ITransport.interface";
+import { sortMessageBasedOnDate } from "./message.ordering";
+
+export class RetransmissionService {
+    private currentMessageId!: string | null
+    private sortMessage: boolean = false
+    private bufferReleaseSignal: Subject<void> = new Subject()
+    private receiverConnectionState: BehaviorSubject<ConnectionState> = new BehaviorSubject<ConnectionState>('OFFLINE')
+    private transmissionState: BehaviorSubject<TransmissionState> = new BehaviorSubject<TransmissionState>('ARRAY EMPTY')
+    private arrayToBeTransmitted: Subject<WrappedMessage[]> = new Subject()
+    private toBeWrapped: Subject<any> = new Subject()
+    private wrappedMessageToBeBuffered: Subject<WrappedMessage> = new Subject()
+    private messageToBeTransmitted: Subject<WrappedMessage> = new Subject()
+
+    // Interface
+    public implementRetransmission(payloadToBeTransmitted: Observable<any>, eventListener: Observable<any>, wantMessageOrdering?: boolean) {
+        if (wantMessageOrdering) {
+            this.sortMessage = true
+            console.log(`Message ordering is set to ${this.sortMessage}`)
+        }
+        eventListener.subscribe(event => this.receiverConnectionState.next(event))
+
+        this.startWrappingOperation()
+        this.startBufferTransmisionProcess()
+        this.linkEventListenerToBufferSignal()
+
+        payloadToBeTransmitted.subscribe((message) => {
+            this.toBeWrapped.next(message)
+        })
+    }
+
+    public returnSubjectForBufferedItems(): Observable<WrappedMessage> {
+        return this.messageToBeTransmitted.asObservable()
+    }
+
+    private startWrappingOperation() {
+        this.toBeWrapped.subscribe(message => {
+            this.wrappedMessageToBeBuffered.next(this.wrapMessageWithTimeReceived(message, this.currentMessageId ? this.currentMessageId : null))
+        })
+
+        // wrappedMessageToBeBuffered will then be pushed to buffer
+        this.wrappedMessageToBeBuffered.pipe(buffer(this.bufferReleaseSignal)).subscribe((bufferedMessages: WrappedMessage[]) => {
+            console.log(bufferedMessages.length + ' buffered messages')
+            // console.log(`Released buffered message: ${bufferedMessages.length} total messages. To Be sorted.`)
+            this.arrayToBeTransmitted.next(sortMessageBasedOnDate(bufferedMessages))
+            // this.arrayToBeTransmitted.next((this.sortMessage && bufferedMessages.length > 0) ? sortMessageBasedOnDate(bufferedMessages) : bufferedMessages)
+        });
+    }
+
+    private wrapMessageWithTimeReceived(message: any, previousMessageID: string | null): WrappedMessage {
+        // check if message has already a time received property if so no need to add anymore
+        if (!message.timeReceived) {
+            let WrappedMessage: WrappedMessage = {
+                timeReceived: new Date(),
+                payload: message,
+                thisMessageID: uuidV4(),
+                previousMessageID: previousMessageID
+            }
+            // console.log(`Current`, WrappedMessage.thisMessageID, 'Previous for this message:', WrappedMessage.previousMessageID)
+            this.currentMessageId = WrappedMessage.thisMessageID as string
+            // console.log(`Updating: `, this.currentMessageId)
+            return WrappedMessage
+        } else {
+            return message as WrappedMessage
+        }
+    }
+
+    private startBufferTransmisionProcess() {
+        console.log(`StartBufferTransmissionProcess`)
+        this.arrayToBeTransmitted.subscribe(array => {
+            if (array.length > 0) {
+                this.transmissionState.next('TRANSMITTING')
+                from(array).subscribe({
+                    next: (message: WrappedMessage) => {
+                        if (this.receiverConnectionState.getValue() == 'OFFLINE') {
+                            // buffer this message. Flush it back to buffer
+                            this.wrappedMessageToBeBuffered.next(message)
+                        }
+                        if (this.receiverConnectionState.getValue() == 'ONLINE') {
+                            this.messageToBeTransmitted.next(message)
+                        }
+                    },
+                    error: err => console.error(err),
+                    complete: () => {
+                        // update transmission state to indicate this batch is completed
+                        this.transmissionState.next('ARRAY EMPTY');
+
+                        if (this.receiverConnectionState.getValue() === 'ONLINE' && this.transmissionState.getValue() === 'ARRAY EMPTY') {
+                            setTimeout(() => {
+                                this.bufferReleaseSignal.next()
+                            }, 1000)
+                        }
+                        // Do nothing if the receiver connection is offline
+                    }
+                });
+            } else {
+                // If I don't do setTimeout, then bufferrelasesignal will be overloaded
+                if (this.receiverConnectionState.getValue() === 'ONLINE') {
+                    setTimeout(() => {
+                        this.bufferReleaseSignal.next()
+                    }, 3000)
+                }
+            }
+        }
+        )
+    }
+
+    private linkEventListenerToBufferSignal() {
+        this.receiverConnectionState.pipe(
+            distinctUntilChanged()
+        ).subscribe(clientState => {
+            console.log(`Client is now ${clientState}`)
+            if (clientState == 'OFFLINE') {
+                console.log(`Current transmission state: ${this.transmissionState.getValue()}`)
+                // just keep buffering
+            }
+            if (clientState == 'ONLINE') {
+                console.log(`Current transmission state: ${this.transmissionState.getValue()}`)
+                // get the stored messages to pump it back into the buffer to be ready to be processed immediately
+                if (this.transmissionState.getValue() == 'ARRAY EMPTY') {
+                    this.bufferReleaseSignal.next()
+                }
+
+            }
+        })
+    }
+}
+
+type ConnectionState = 'ONLINE' | 'OFFLINE'
+type TransmissionState = 'TRANSMITTING' | 'IDLE' | 'ARRAY EMPTY' | 'STORING DATA' | 'GETTING STORED DATA'

+ 244 - 12
src/utils/socket.utils.ts

@@ -1,9 +1,12 @@
-import { Observable } from 'rxjs';
+import { Observable, Observer, Subject } from 'rxjs';
 import { createServer } from 'http';
-import { Server, Socket as SocketForThisClient } from 'socket.io';
+import { Server, Socket as SocketForConnectedClient } from 'socket.io';
 import { io, Socket as ClientSocket } from 'socket.io-client';
+import * as fs from 'fs'
+import { ReceiverProfile, TransportEventNotification, TransportMessage } from '../interface/ITransport.interface';
+import { v4 as uuidv4 } from 'uuid'
 
-export function startSocketServer(port: number): Observable<SocketForThisClient> {
+export function startSocketServer(port: number): Observable<SocketForConnectedClient> {
     return new Observable((observer) => {
         try {
             let httpServer = createServer();
@@ -11,7 +14,6 @@ export function startSocketServer(port: number): Observable<SocketForThisClient>
 
             // something wrong here
             socketServer.on('connection', (socket) => {
-                // console.log(`New client connected ${socket.id}`)
                 observer.next(socket)
             })
 
@@ -33,17 +35,18 @@ export function startSocketServer(port: number): Observable<SocketForThisClient>
 export async function startClientSocketConnection(serverUrl: string): Promise<ClientSocket> {
     return new Promise((resolve, reject) => {
         try {
-            let clientSocket = io(serverUrl, {
-                reconnection: true,              // Enable automatic reconnections
-                reconnectionAttempts: 100,       // Retry up to 10 times
-                reconnectionDelay: 500,          // Start with a 500ms delay
-                reconnectionDelayMax: 10000,     // Delay can grow to a max of 10 seconds
-                randomizationFactor: 0.3,
-            })
+            let clientSocket = io(serverUrl)
+            // let clientSocket = io(serverUrl, {
+            //     reconnection: true,              // Enable automatic reconnections
+            //     reconnectionAttempts: 100,       // Retry up to 10 times
+            //     reconnectionDelay: 500,          // Start with a 500ms delay
+            //     reconnectionDelayMax: 10000,     // Delay can grow to a max of 10 seconds
+            //     randomizationFactor: 0.3,
+            // })
 
             // Listen for a connection event
             clientSocket.on('connect', () => {
-                // console.log('Connected to the server:', clientSocket.id)
+                console.log('Connected to the server:', clientSocket.id)
                 resolve(clientSocket)
             });
         }
@@ -51,4 +54,233 @@ export async function startClientSocketConnection(serverUrl: string): Promise<Cl
             reject(error)
         }
     })
+}
+
+// Specifically to write receiver profile information
+export async function writeFile(data: ReceiverProfile, filename: string): Promise<boolean> {
+    return new Promise((resolve, reject) => {
+        // Write JSON data to a file
+        fs.writeFile(`${filename}.json`, JSON.stringify(data, null, 2), (err) => {
+            if (err) {
+                console.error('Error writing file', err);
+                reject(false)
+            } else {
+                console.log('File has been written');
+                resolve(true)
+            }
+        });
+    })
+}
+
+// After establishing connection to the server, set up the credentials, confirm whether or not if there's any credentials, if not ask for one from the server
+export function handleClientSocketConnection(socket: ClientSocket, incomingMessage: Subject<TransportMessage>): Observable<TransportEventNotification> {
+    return new Observable((eventNotification: Observer<TransportEventNotification>) => {
+        let buffer: any[] = []
+        let receiverProfileInfo!: ReceiverProfile
+        checkOwnClientInfo('client1').then((profile: ReceiverProfile) => {
+            receiverProfileInfo = profile
+            socket.emit('profile', {
+                name: 'Old Client',
+                data: profile
+            })
+        }).catch((error) => {
+            socket.emit('profile', {
+                name: 'New Client',
+                data: null
+            })
+        })
+        // Listen for messages from the server. Generally here's the responses
+        socket.on('message', (msg: any) => {
+            console.log(`Websocket Client Transport Receieve Msg`, msg.id)
+            if (receiverProfileInfo) {
+                eventNotification.next({
+                    event: 'New Message',
+                    description: 'Received new message',
+                    transportType: 'WEBSOCKET',
+                    data: msg
+                })
+                incomingMessage.next({
+                    id: msg.header.MessageID,
+                    receiverID: receiverProfileInfo.uuid,
+                    payload: msg,
+                    event: 'New Message'
+                })
+            } else {
+                // Do nothing. just store in local array first. Cannot process without information. but then again, don['t need information if acting as client
+                // but for consistency sake, will impose the standard 
+                buffer.push(msg) // store locally for now
+            }
+        })
+
+        socket.on('profile', (data: { name: string, message: any }) => {
+            // console.log(data)
+            if (data.name == 'New Profile') {
+                console.log(`Assigned client Name: ${(data.message as ReceiverProfile).name}`)
+                receiverProfileInfo = data.message as ReceiverProfile
+                writeFile(data.message as ReceiverProfile, (data.message as ReceiverProfile).name).then(() => [
+                    // broadcast event to allow retransmission to release buffer
+                    eventNotification.next({
+                        event: 'Connection',
+                        description: 'Profile acquired || updated and stored',
+                        transportType: 'WEBSOCKET',
+                    })
+                ]).catch((error) => { }) // do nothing at the moment. 
+            }
+            if (data.name == 'Adjusted Profile') {
+                console.log(`Assigned client Name: ${(data.message as ReceiverProfile).name}`)
+                receiverProfileInfo = data.message as ReceiverProfile
+                writeFile(data.message as ReceiverProfile, (data.message as ReceiverProfile).name).then(() => [
+                    // broadcast event to allow retransmission to release buffer
+                    eventNotification.next({
+                        event: 'Connection',
+                        description: 'Profile acquired || updated and stored',
+                        transportType: 'WEBSOCKET',
+                    })
+                ]).catch((error) => { }) // do nothing at the moment. 
+            }
+            if (data.name == 'Error') {
+                console.log(`Server cannot find credentials`, data.message)
+                // logic to request for new credentials
+            }
+        })
+
+        // Handle disconnection
+        socket.on('disconnect', () => {
+            console.log('Websocket Client disconnected from the server');
+            if (receiverProfileInfo) {
+                eventNotification.next({
+                    event: 'Disconnection',
+                    description: 'Disconnected from the server',
+                    transportType: 'WEBSOCKET'
+                })
+            }
+        });
+    })
+}
+
+// Check if filename exists. Return profile information if there's any
+export async function checkOwnClientInfo(filename: string): Promise<ReceiverProfile> {
+    return new Promise((resolve, reject) => {
+        // Check if the file exists
+        if (fs.existsSync(`${filename}.json`)) {
+            try {
+                // Read the file contents
+                const fileData = fs.readFileSync(`${filename}.json`, 'utf8');
+
+                // If the file is empty, return an error
+                if (fileData.trim() === "") {
+                    throw new Error("File is empty");
+                }
+
+                // Parse and return the data if present
+                const jsonData = JSON.parse(fileData);
+                resolve(jsonData)
+
+            } catch (err) {
+                // Handle parsing errors or other file-related errors
+                console.error("Error reading or parsing file:", err);
+                reject('');
+            }
+        } else {
+            console.error("File does not exist");
+            reject('');
+        }
+    })
+}
+
+
+// For SERVER Usage: set up socket listeners to start listening for different events
+export function handleNewSocketClient(socket: SocketForConnectedClient, eventNotification: Subject<TransportEventNotification>, socketReceiverProfile: ReceiverProfile[]): void {
+    console.log(`Setting up listeners for socket:${socket.id}`)
+    // returns the socket client instance 
+    // listen to receiver's initiotion first before assigning 'credentials'
+    socket.on(`profile`, (message: { name: string, data: ReceiverProfile }) => {
+        if (message.name == 'New Client') {
+            let receiverProfile: ReceiverProfile = {
+                uuid: uuidv4(),
+                name: `Client${uuidv4()}`,
+                dateCreated: new Date(),
+                transportType: `WEBSOCKET`,
+                eventNotification: new Subject(),
+                instance: socket
+            }
+            // publish first event notification
+            eventNotification.next({
+                event: 'Connection',
+                description: 'New Client Connected',
+                transportType: 'WEBSOCKET',
+                data: {
+                    receiverID: receiverProfile.uuid,
+                    receiverName: receiverProfile.name,
+                    date: new Date(),
+                    payload: receiverProfile
+                }
+            })
+            // send to receiver for reference
+            socket.emit('profile', {
+                name: `New Profile`, message: {
+                    uuid: receiverProfile.uuid,
+                    name: receiverProfile.name,
+                    dateCreated: receiverProfile.dateCreated,
+                    transportType: `WEBSOCKET`,
+                    eventNotification: null,
+                    instance: null // have to put null, otherwise circular reference maximum stack error
+                }
+            }) 
+            socketReceiverProfile.push(receiverProfile)
+            startListening(socket, receiverProfile)
+        } else {
+            // update first
+            let receiverProfile: ReceiverProfile | undefined = socketReceiverProfile.find(obj => obj.uuid === message.data.uuid)
+            if (receiverProfile) {
+                receiverProfile.instance = socket
+                socket.emit('profile', { name: 'Adjusted Profile', message: receiverProfile })
+                // need to start listening again, because it's assigned a different socket instance this time round
+                startListening(socket, receiverProfile)
+            } else {
+                socket.emit('profile', { name: 'Error', message: 'Receiver Profile Not found' })
+            }
+        }
+    })
+}
+
+export function startListening(socket: SocketForConnectedClient, receiverProfile: ReceiverProfile): void {
+    /* Generally, we don't need this unless in the case of being the receiver */
+    socket.on('message', (message: any) => {
+        // here
+    })
+
+    socket.on('request', (request: any) => {
+        // here : Let's say there's a subcsription request here
+        receiverProfile.eventNotification.next({
+            event: 'New Message',
+            description: 'Incoming request',
+            transportType: 'WEBSOCKET',
+            data: {
+                receiverID: receiverProfile.uuid,
+                receiverName: receiverProfile.name,
+                date: new Date(),
+                payload: request
+            }
+        })
+    })
+
+    socket.on('notification', (notification: any) => {
+        // logic here
+    })
+
+    socket.on('disconnect', () => {
+        receiverProfile.eventNotification.next(
+            {
+                event: 'Disconnection',
+                description: `Existing Client ${socket.id} disonnected`,
+                transportType: `WEBSOCKET`,
+                data: {
+                    receiverID: receiverProfile.uuid,
+                    receiverName: receiverProfile.name,
+                    date: new Date(),
+                }
+            }
+        )
+    })
 }