enzo hai 9 meses
pai
achega
f59218ac25
Modificáronse 4 ficheiros con 36 adicións e 69 borrados
  1. 26 25
      services/buffer.service.ts
  2. 0 18
      services/grpc.service.method.ts
  3. 5 13
      test/grpc1.ts
  4. 5 13
      test/grpc2.ts

+ 26 - 25
services/buffer.service.ts

@@ -6,10 +6,7 @@ import { resolve } from 'path';
 
 export class BufferService {
     private messageStream: Subject<Message>
-    // private messageFromApplication: Subject<Message>;
-    // private messageFromBuffer: Subject<Message>
     private connectionState: BehaviorSubject<ConnectionState>
-    // private currentSource: Subject<Message>
     private messageBuffer: Message[] = [];
     private messageModel: Model<Message> | undefined;
     private readonly dbUrl: string = process.env.MONGO as string;
@@ -21,21 +18,18 @@ export class BufferService {
         dbName: string
     ) {
         this.messageStream = messageFromApp;
-        // this.messageFromBuffer = new Subject<Message>();
-        // this.messageFromApplication = messageFromApp;
-        // this.currentSource = this.messageFromBuffer
         this.connectionState = connectionStateSubject;
         this.setupSubscriptions(); // Note: The handle buffer will push the data in local array before pushing to mongo via initial check up model
 
         /* Disable for now. Use local array first */
-        // this.initializeDatabaseConnection(dbName).then((connection: mongoose.Connection) => {
-        //     const grpcMessageSchema = require('../models/message.schema');
-        //     this.messageModel = connection.model<Message>('Message', grpcMessageSchema)
-        //     this.transferLocalBufferToMongoDB() // transfer all data from local array into mongodb after the mongo setup is complete
-        // }).catch(error => {
-        //     console.error('Database initialization failed:', error);
-        //     // Implement retry logic or additional error handling here
-        // });
+        this.initializeDatabaseConnection(dbName).then((connection: mongoose.Connection) => {
+            const grpcMessageSchema = require('../models/message.schema');
+            this.messageModel = connection.model<Message>('Message', grpcMessageSchema)
+            this.transferLocalBufferToMongoDB() // transfer all data from local array into mongodb after the mongo setup is complete
+        }).catch(error => {
+            console.error('Database initialization failed:', error);
+            // Implement retry logic or additional error handling here
+        });
     }
 
     public getMessages(): Observable<Message> {
@@ -140,6 +134,10 @@ export class BufferService {
     private releaseBufferedMessages(messageFromBuffer: Subject<Message>): Promise<boolean> {
         return new Promise((resolve, reject) => {
             if (this.messageModel) {
+                this.messageModel.countDocuments({}).then((count) => {
+                    console.log(`There is ${count} messages in datatbase buffer at the moment. Releasing them....`);
+
+                })
                 const stream = this.messageModel.find().cursor();
 
                 stream.on('data', async (message) => {
@@ -186,18 +184,21 @@ export class BufferService {
     }
 
     private async transferLocalBufferToMongoDB(): Promise<void> {
-        if (this.messageModel) {
-            this.messageBuffer.forEach(async message => {
-                try {
-                    if (this.messageModel) {
-                        await this.messageModel.create(message);
+        return new Promise((resolve, reject) => {
+            console.log(`Releasing buffer Message: currently there is ${this.messageBuffer.length}. Transferring to database...`)
+            if (this.messageModel) {
+                this.messageBuffer.forEach(async message => {
+                    try {
+                        if (this.messageModel) {
+                            await this.messageModel.create(message);
+                        }
+                    } catch (error) {
+                        console.error('Error transferring message to MongoDB:', error);
                     }
-                } catch (error) {
-                    console.error('Error transferring message to MongoDB:', error);
-                }
-            })
-            this.messageBuffer = []; // Clear local buffer after transferring
-        }
+                })
+                this.messageBuffer = []; // Clear local buffer after transferring
+            }
+        })
     }
 
     // Additional methods as required...

+ 0 - 18
services/grpc.service.method.ts

@@ -11,11 +11,9 @@ export class GrpcServiceMethod {
     private callRequestsFromRemote: ServerWritableStreamImpl<any, ResponseType>[] = []
 
     public async create(request: ConnectionRequest, connectionAttribute: ConnectionAttribute): Promise<any> {
-
         // Assuming currently only one client
         this.createGrpcInstance(request.server.serverUrl, { instanceType: 'server' }, connectionAttribute)
         this.createGrpcInstance(request.client.targetServer, { instanceType: 'client' }, connectionAttribute)
-        // connectionAttribute.outGoing.MessageToBePublished?.subscribe(e => console.log((e.message as MessageLog).appData.msgId))
     }
 
     // For testing only
@@ -50,11 +48,6 @@ export class GrpcServiceMethod {
         if (connectionAttribute.outGoing.StreamID && connectionAttribute.inComing.StreamID) {
             connectionAttribute.ConnectionID.local = connectionAttribute.outGoing.StreamID + connectionAttribute.inComing.StreamID
             connectionAttribute.ConnectionID.remote = connectionAttribute.inComing.StreamID + connectionAttribute.outGoing.StreamID
-            // let report: ReportStatus = {
-            //     code: ColorCode.GREEN,
-            //     message: `ConnectionID acquired. Informing Restranmission to release Messages...`,
-            // }
-
         }
     }
 
@@ -91,10 +84,6 @@ export class GrpcServiceMethod {
                 if (redErrorEmission == false) {
                     redErrorEmission = true
                     console.error(`Connection failed ${consecutiveResolutions} times. Stopping connection attempts.`);
-                    // let error: ReportStatus = {
-                    //     code: ColorCode.YELLOW,
-                    //     message: 'Server is not responding. Proceed to buffer.',
-                    // }
                     let error: ConnectionState = {
                         status: 'BUFFER',
                         reason: `Server is not responding...`
@@ -207,7 +196,6 @@ export class GrpcServiceMethod {
             }
             this.generateAdditionalAttributes(connectionAttribute, {}, outGoingInfo)
 
-            // connectionAttribute.ConnectionID = connectionAttribute.outGoing.ChannelID + (connectionAttribute.inComing.ChannelID ?? 'undefined')
             let call = client.HandleMessage({ id: server, message: JSON.stringify(outGoingInfo) })
             console.log(`Sending request to ${server} to open response channel...`)
 
@@ -216,11 +204,6 @@ export class GrpcServiceMethod {
                     console.log(`Message trasmission operation is successful`)
                     // RPC completed successfully
                 } if (status == grpc.status.UNAVAILABLE) {
-                    // let report: ReportStatus = {
-                    //     code: ColorCode.YELLOW,
-                    //     message: `Server doesn't seem to be alive. Error returned.`,
-                    //     payload: this.messageToBeSendOver ?? `There's no message at the moment...`
-                    // }
                     let report: ConnectionState = {
                         status: 'BUFFER',
                         reason: `Server doesn't seem to be alive. Error returned.`,
@@ -239,7 +222,6 @@ export class GrpcServiceMethod {
                 if (connectionAttribute.inComing.MessageToBeReceived) {
                     connectionAttribute.inComing.MessageToBeReceived.next(response)
                 }
-                // console.log(`Received ${(response.message as MessageLog).appData.msgId}`)
             });
 
             call.on('error', (err) => {

+ 5 - 13
test/grpc1.ts

@@ -1,4 +1,4 @@
-import { Subject, from, take } from 'rxjs';
+import { Subject, from, interval, take } from 'rxjs';
 import { Message, MessageLog, ConnectionRequest } from '../interfaces/general.interface';
 import { GrpcServiceMethod } from '../services/grpc.service.method';
 import { readFileSync } from 'fs';
@@ -39,7 +39,7 @@ connectionService.generateConnection(connectionRequest)
 
 // let generateFakeMessagesToBePublished = stream().pipe(take(1000))
 
-// let generateFakeMessagesToBePublished = from(parsedMessages).pipe(take(30000))
+// let generateFakeMessagesToBePublished = from(parsedMessages).pipe(take(10000))
 // generateFakeMessagesToBePublished.subscribe({
 //   next: message => {
 //     let payload: Message = {
@@ -342,14 +342,6 @@ function generateFakeStreamResponse(request: any): Subject<any> {
 }
 
 /* Checking the values by the end of the test */
-// function logDataLengthAfterDelay(delay) {
-//   setTimeout(() => {
-//     console.log(`All received data: ${array.length}`);
-//   }, delay);
-// }
-
-// const delays = [5000, 10000, 15000, 20000, 25000];
-
-// delays.forEach((delay) => {
-//   logDataLengthAfterDelay(delay);
-// });
+interval(5000).subscribe(() => {
+  console.log(`All received data: ${array.length}`);
+});

+ 5 - 13
test/grpc2.ts

@@ -1,4 +1,4 @@
-import { Subject, from, take } from 'rxjs';
+import { Subject, from, interval, take } from 'rxjs';
 import { Message, MessageLog, ConnectionRequest } from '../interfaces/general.interface';
 import { GrpcServiceMethod } from '../services/grpc.service.method';
 import { readFileSync } from 'fs';
@@ -14,7 +14,7 @@ let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages g
 let targetserver: string = 'localhost:3000'
 let targetserver2: string = 'localhost:3002'
 let hostServer: string = 'localhost:3001'
-let intervalToStreamOutGoingMessage: number = 0.5
+let intervalToStreamOutGoingMessage: number = 1
 let array: Message[] = []
 
 
@@ -333,14 +333,6 @@ function generateFakeStreamResponse(request: any): Subject<any> {
 }
 
 /* Checking the values by the end of the test */
-function logDataLengthAfterDelay(delay) {
-  setTimeout(() => {
-    console.log(`All received data: ${array.length}`);
-  }, delay);
-}
-
-const delays = [5000, 10000, 15000, 20000, 25000, 30000];
-
-delays.forEach((delay) => {
-  logDataLengthAfterDelay(delay);
-});
+interval(5000).subscribe(() => {
+  console.log(`All received data: ${array.length}`);
+});