Browse Source

multi semi-stree test retransmission via mongo <TEST>

Enzo 1 month ago
parent
commit
56e1fc82af

+ 3 - 1
.gitignore

@@ -2,4 +2,6 @@ node_modules
 *.js
 *.js
 *.js.map
 *.js.map
 *payload.json
 *payload.json
-*info.json
+*client1.json
+*client2.json
+*client3.json

+ 3 - 3
services/retransmission.service.ts

@@ -45,10 +45,10 @@ export class RetransmissionService implements RetransmissionInterface {
 
 
         // wrappedMessageToBeBuffered will then be pushed to buffer
         // wrappedMessageToBeBuffered will then be pushed to buffer
         this.wrappedMessageToBeBuffered.pipe(buffer(this.bufferReleaseSignal)).subscribe((bufferedMessages: WrappedMessage[]) => {
         this.wrappedMessageToBeBuffered.pipe(buffer(this.bufferReleaseSignal)).subscribe((bufferedMessages: WrappedMessage[]) => {
-            // console.log(bufferedMessages.length + ' buffered messages')
+            console.log(bufferedMessages.length + ' buffered messages')
             // console.log(`Released buffered message: ${bufferedMessages.length} total messages. To Be sorted.`)
             // console.log(`Released buffered message: ${bufferedMessages.length} total messages. To Be sorted.`)
-            // arrayToBeTransmitted.next(sortMessage(bufferedMessages))
-            this.arrayToBeTransmitted.next((this.sortMessage && bufferedMessages.length > 0) ? sortMessageBasedOnDate(bufferedMessages) : bufferedMessages)
+            this.arrayToBeTransmitted.next(sortMessageBasedOnDate(bufferedMessages))
+            // this.arrayToBeTransmitted.next((this.sortMessage && bufferedMessages.length > 0) ? sortMessageBasedOnDate(bufferedMessages) : bufferedMessages)
         });
         });
     }
     }
 
 

+ 27 - 22
test/socket/socket-client.ts

@@ -5,27 +5,32 @@ import { io, Socket } from "socket.io-client";
 import { WrappedMessage } from "../../interfaces/general.interface";
 import { WrappedMessage } from "../../interfaces/general.interface";
 import * as fs from 'fs'
 import * as fs from 'fs'
 import { ClientInfo } from "./socket.service";
 import { ClientInfo } from "./socket.service";
+import { MongoService } from "./temp-log-service";
 
 
 let onHoldMessagesSubject: Subject<WrappedMessage> = new Subject()
 let onHoldMessagesSubject: Subject<WrappedMessage> = new Subject()
 let toBePassedOverToApp: Subject<BaseMessage> = new Subject()
 let toBePassedOverToApp: Subject<BaseMessage> = new Subject()
 // Serve static files (optional)
 // Serve static files (optional)
-let sender: Subject<BaseMessage> = prepareResponseMessages(5, 2000)
+let sender: Subject<BaseMessage> = prepareResponseMessages(5, 1000)
 let serverSocketUrl: string = 'http://192.168.100.96:3000'
 let serverSocketUrl: string = 'http://192.168.100.96:3000'
 let socket: Socket
 let socket: Socket
+let client: string = 'client2'
+let mongoService: MongoService = new MongoService(client)
 
 
 
 
 establishSocketConnection(serverSocketUrl).then(() => {
 establishSocketConnection(serverSocketUrl).then(() => {
-    sender.subscribe({
-        next: message => {
-            makeRequest(message).subscribe({
-                complete: () => console.log(`Request ${message.header.messageID} has acquired all responses.`)
-            })
-        }
-    })
+    // sender.subscribe({
+    //     next: (message: BaseMessage) => {
+    //         makeRequest(message).subscribe({
+    //             next: (message: BaseMessage) => {
+    //             },
+    //             complete: () => console.log(`Request ${message.header.messageID} has acquired all responses.`)
+    //         })
+    //     }
+    // })
 })
 })
 
 
 // the interface the client Program will make without having to decide transport protocol
 // the interface the client Program will make without having to decide transport protocol
-function makeRequest(request: BaseMessage): Observable<any> {
+function makeRequest(request: BaseMessage): Observable<BaseMessage> {
     return new Observable((response) => {
     return new Observable((response) => {
         sendMessage(request)
         sendMessage(request)
         toBePassedOverToApp.subscribe({
         toBePassedOverToApp.subscribe({
@@ -59,7 +64,7 @@ async function establishSocketConnection(serverUrl: string): Promise<any> {
             })
             })
 
 
             // Check if it's a previuos client.
             // Check if it's a previuos client.
-            let data: ClientInfo | null = checkOwnClientInfo('info.json')
+            let data: ClientInfo | null = checkOwnClientInfo(client)
             if (data) {
             if (data) {
                 socket.emit('notification', { agenda: 'existingClient', data: data })
                 socket.emit('notification', { agenda: 'existingClient', data: data })
             } else {
             } else {
@@ -71,23 +76,23 @@ async function establishSocketConnection(serverUrl: string): Promise<any> {
                 // socket.emit('Hello from the client!')
                 // socket.emit('Hello from the client!')
                 console.log('Connected to the server:', socket.id)
                 console.log('Connected to the server:', socket.id)
             });
             });
-
+            
             // Listen for messages from the server
             // Listen for messages from the server
             socket.on('response', (msg: WrappedMessage) => {
             socket.on('response', (msg: WrappedMessage) => {
                 console.log('Message from server:', msg.payload.header.messageName, ' for ', msg.payload.header.messageID);
                 console.log('Message from server:', msg.payload.header.messageName, ' for ', msg.payload.header.messageID);
-
+                mongoService.write(msg.payload, msg.payload.header.messageID, () => console.log(`Error function doesn't exist.`))
                 // Check the sequence by ensuring the message value before the current message exists, then pass them over to "App"
                 // Check the sequence by ensuring the message value before the current message exists, then pass them over to "App"
                 // onHoldMessagesSubject.next(msg)
                 // onHoldMessagesSubject.next(msg)
                 // checkMessage(msg).then(() => [
                 // checkMessage(msg).then(() => [
-                //     toBePassedOverToApp.next(msg.payload as BaseMessage)
-                // ]).catch((err) => console.error(err))
+                    //     toBePassedOverToApp.next(msg.payload as BaseMessage)
+                    // ]).catch((err) => console.error(err))
                 toBePassedOverToApp.next(msg.payload as BaseMessage)
                 toBePassedOverToApp.next(msg.payload as BaseMessage)
             })
             })
-
+            
             socket.on('notification', (msg: any) => {
             socket.on('notification', (msg: any) => {
                 if (msg.notification == 'Your credentials') {
                 if (msg.notification == 'Your credentials') {
                     console.log(`Assigned client Name: ${msg.socketInfo.clientName}`)
                     console.log(`Assigned client Name: ${msg.socketInfo.clientName}`)
-                    writeFile(msg.socketInfo as ClientInfo)
+                    writeFile(msg.socketInfo as ClientInfo, client)
                 }
                 }
                 if (msg.notification == 'Your updated credentials') {
                 if (msg.notification == 'Your updated credentials') {
                     console.log(`Updated socket ID: `, msg)
                     console.log(`Updated socket ID: `, msg)
@@ -117,20 +122,20 @@ async function establishSocketConnection(serverUrl: string): Promise<any> {
 
 
 function checkOwnClientInfo(filename: string): ClientInfo | null {
 function checkOwnClientInfo(filename: string): ClientInfo | null {
     // Check if the file exists
     // Check if the file exists
-    if (fs.existsSync(filename)) {
+    if (fs.existsSync(`${filename}.json`)) {
         try {
         try {
             // Read the file contents
             // Read the file contents
-            const fileData = fs.readFileSync(filename, 'utf8');
+            const fileData = fs.readFileSync(`${filename}.json`, 'utf8');
 
 
             // If the file is empty, return an error
             // If the file is empty, return an error
             if (fileData.trim() === "") {
             if (fileData.trim() === "") {
                 throw new Error("File is empty");
                 throw new Error("File is empty");
             }
             }
-
+            
             // Parse and return the data if present
             // Parse and return the data if present
             const jsonData = JSON.parse(fileData);
             const jsonData = JSON.parse(fileData);
             return jsonData;
             return jsonData;
-
+            
         } catch (err) {
         } catch (err) {
             // Handle parsing errors or other file-related errors
             // Handle parsing errors or other file-related errors
             console.error("Error reading or parsing file:", err.message);
             console.error("Error reading or parsing file:", err.message);
@@ -142,9 +147,9 @@ function checkOwnClientInfo(filename: string): ClientInfo | null {
     }
     }
 }
 }
 
 
-function writeFile(data: ClientInfo) {
+function writeFile(data: ClientInfo, filename: string) {
     // Write JSON data to a file
     // Write JSON data to a file
-    fs.writeFile('info.json', JSON.stringify(data, null, 2), (err) => {
+    fs.writeFile(`${filename}.json`, JSON.stringify(data, null, 2), (err) => {
         if (err) {
         if (err) {
             console.error('Error writing file', err);
             console.error('Error writing file', err);
         } else {
         } else {

+ 1 - 1
test/socket/socket-test-server.ts

@@ -31,7 +31,7 @@ incomingRequest.subscribe((request: BaseMessage) => {
 
 
 function returnResponse(request: BaseMessage): Observable<BaseMessage> {
 function returnResponse(request: BaseMessage): Observable<BaseMessage> {
     return new Observable((observer) => {
     return new Observable((observer) => {
-        prepareResponseMessages(10, 1000).subscribe({
+        prepareResponseMessages(30, 1000).subscribe({
             next: (message: BaseMessage) => {
             next: (message: BaseMessage) => {
                 message.header.messageID = request.header.messageID
                 message.header.messageID = request.header.messageID
                 console.log(`Generating response message for ${request.header.messageID}`)
                 console.log(`Generating response message for ${request.header.messageID}`)

+ 1 - 1
test/socket/socket.service.ts

@@ -91,7 +91,6 @@ export class SocketService {
                         // resume operation Release them buffer
                         // resume operation Release them buffer
                         /* local client isOnline need not be mutated, since this is a new connection. However the previous intance of client Connection State
                         /* local client isOnline need not be mutated, since this is a new connection. However the previous intance of client Connection State
                         inside the retransmission needs to be updated to release the buffered values.*/
                         inside the retransmission needs to be updated to release the buffered values.*/
-                        clientObj.clientConnectionState.next('ONLINE')
                         let subscription = clientObj.buffer.returnBufferedMessages().subscribe(output => {
                         let subscription = clientObj.buffer.returnBufferedMessages().subscribe(output => {
                             // console.log(output)
                             // console.log(output)
                             if (clientIsOnline.getValue() === true) {
                             if (clientIsOnline.getValue() === true) {
@@ -100,6 +99,7 @@ export class SocketService {
                                 subscription.unsubscribe()
                                 subscription.unsubscribe()
                             }
                             }
                         })
                         })
+                        clientObj.clientConnectionState.next('ONLINE')
                     } else {
                     } else {
                         console.log(this.connectedClients)
                         console.log(this.connectedClients)
                         console.log(`Existing Client is not found`)
                         console.log(`Existing Client is not found`)

+ 44 - 0
test/socket/temp-log-service.ts

@@ -0,0 +1,44 @@
+import mongoose, { Schema, Document, Model, Mongoose } from 'mongoose';
+import { AppMessageSchema } from '../../dependencies/logging/dependencies/msgutil/dependencies/fisappmessagejsdistribution/src/message/common/appmessageschema';
+import { Message } from '../../dependencies/logging/dependencies/msgutil/dependencies/fisappmessagejsdistribution/src/test/schema/fisappmessageschematest';
+import { BaseMessage, MessageHeader } from '../../dependencies/logging/dependencies/msgutil/dependencies/dependencies';
+
+// Define the User Service class
+export class MongoService {
+    private MongooseConnection: Mongoose
+
+    constructor(client: string) {
+        this.connectDB(client)
+    }
+    // Connect to MongoDB
+    private async connectDB(client: string): Promise<void> {
+        try {
+            const mongoURI = 'mongodb://localhost:27017/' + client // Replace with your MongoDB URI
+            this.MongooseConnection = await mongoose.connect(mongoURI);
+            console.log('MongoDB connected');
+        } catch (err) {
+            console.error('Error connecting to MongoDB:', err);
+        }
+    }
+
+    // Store Message
+    public async write(data: BaseMessage, requestID: string, err_func?: any) {
+        let messageModel: Model<any> = this.MongooseConnection.model(requestID, MessageSchema)
+        messageModel.create({ data: JSON.stringify(data) }).then(() => {
+            // lol do nothing.
+        }).catch((err) => {
+            err_func(err)
+            console.log(`MongoError: ${err.message}`)
+        })
+    }
+}
+
+// Define an interface for the document
+interface IMessage extends Document {
+    data: string
+}
+
+// Define a schema
+const MessageSchema: Schema = new Schema({
+    data: { type: String, required: true },
+});