瀏覽代碼

updated with comments

Enzo 9 月之前
父節點
當前提交
c98602c562
共有 8 個文件被更改,包括 139 次插入362 次删除
  1. 19 0
      README.md
  2. 92 34
      services/buffer.service.ts
  3. 0 306
      services/grpc.service.method.bak
  4. 5 1
      services/grpc.service.method.ts
  5. 6 4
      services/server-client.service.ts
  6. 15 15
      test/grpc1.ts
  7. 1 1
      test/grpc2.ts
  8. 1 1
      test/grpc3.ts

+ 19 - 0
README.md

@@ -19,3 +19,22 @@ cd FIs-Retransmission
 npm install
 
 
+```
+
+## NOTE for Server Client Interface
+
+Current way it starst is by declaring the request connection and then calling the server client interface to create the connections based on the details provided
+Right, the way it works, it that the sender and receiver will first pass in the connection on the designated target to communicate with, and with that info, the 
+server client interface will create the connection attribute correspoding to the requests made. Then, the grpc client methods will send these information concerning t
+the attributes, which is called connection attributes to the designated servers to request for a channel. Then the server on the other side will then first verify the
+connection attribute to determine whether the connection exist in the local array. (Note: This is predetermined clients that are expected to be connected.)
+
+The latest intention was that we have established the matter redesign the server client interface, to create the connection dynamically according the channel request 
+that is coming in, and server client service, will assign a unique identifier for the client application to be used to identify themselves when they come back online 
+upon disruption. Of course, how the client choose to store that identifier corresponding to the server tha assigned it has yet to be decided, but it can as simple as 
+a json file, as long as it is something that can assess when the application comes back online. On the note for the server that asign a unique identifier, it will
+also generate the connection attribute to be stored in it's own memory, so that it too when receiving a client request to establish a channel, can perform the 
+necessary validation to see whether if it's a client that has been connected, based on whether or not that client provide  the needed information to identify itself, 
+to which in this case, would be the previously assigned identifier.
+
+Please Note: The retransmission has disabled mongo storage at the moment due to it's faulty nature. It will work fine with local instance buffer array.

+ 92 - 34
services/buffer.service.ts

@@ -8,7 +8,7 @@ export class BufferService {
   private messageStream: Subject<Message>;
   private connectionState: BehaviorSubject<ConnectionState>;
   private messageBuffer: Message[] = [];
-  private messageModel: Model<Message> | undefined;
+  private messageModel!: Model<Message>
   private readonly dbUrl: string = process.env.MONGO as string;
 
   constructor(
@@ -21,14 +21,16 @@ export class BufferService {
     this.connectionState = connectionStateSubject;
     this.setupSubscriptions(); // Note: The handle buffer will push the data in local array before pushing to mongo via initial check up model
 
+    /* Disabled for now due to missing data not during transmision. The issue was suspected to be it's async nature of confusing timing
+    when it was queued into the event queue. Sometimes the messages will be late to be saved */
     // this.initializeDatabaseConnection(dbName)
-    //   .then((connection: mongoose.Connection) => {
+    //   .then(async (connection: mongoose.Connection) => {
     //     const grpcMessageSchema = require("../models/message.schema");
     //     this.messageModel = connection.model<Message>(
     //       "Message",
     //       grpcMessageSchema
     //     );
-    //     this.transferLocalBufferToMongoDB(); // transfer all data from local array into mongodb after the mongo setup is complete
+    //     await this.transferLocalBufferToMongoDB(); // transfer all data from local array into mongodb after the mongo setup is complete
     //   })
     //   .catch((error) => {
     //     console.error("Database initialization failed:", error);
@@ -41,6 +43,11 @@ export class BufferService {
     return this.messageStream as Observable<Message>;
   }
 
+  public getStateObservable(): BehaviorSubject<ConnectionState> {
+    return this.connectionState;
+  }
+
+  // To subscrcibe for the message stream as well as the connection state
   private setupSubscriptions(): void {
     this.messageStream.subscribe({
       next: (message: Message) => this.handleIncomingMessage(message),
@@ -105,6 +112,14 @@ export class BufferService {
         await this.messageModel.create(message);
         this.messageModel.countDocuments({}).then((count) => {
           console.log(`Message${(message.message as MessageLog).appData.msgId} saved to MongoDB buffer. There is ${count} messages in datatbase under ${this.bufferIdentifier} at the moment.`);
+
+          // if connection status okay 
+          // if(this.connectionState.getValue().status == "DIRECT_PUBLISH")
+          // {
+          //   console.log("Message count release " + count);
+          //   // Then release back to message stream
+          //   this.releaseBufferedMessages(this.messageStream);
+          // }
         });
       } catch (error) {
         console.error("Error saving message to MongoDB:", error);
@@ -119,37 +134,84 @@ export class BufferService {
   private releaseBufferedMessages(
     messageFromBuffer: Subject<Message>
   ): Promise<boolean> {
-    return new Promise((resolve, reject) => {
+    return new Promise(async (resolve, reject) => {
       if (this.messageModel) {
-        this.messageModel.countDocuments({}).then((count) => {
-          console.log(`There is ${count} messages in database under ${this.bufferIdentifier} at the moment. Releasing them....`);
-        });
-        const stream = this.messageModel.find().cursor();
+        try {
+          // use then 
+          let countPromise = checkMessageCount(this.messageModel, this.bufferIdentifier);
+          countPromise.then(async (amount) => {
+            console.log("Amount1:" + amount);
 
-        stream.on("data", async (message) => {
-          // Process each message individually`
-          messageFromBuffer.next(message);
-        });
+            // let countPromise = checkMessageCount(this.messageModel, this.bufferIdentifier);
+            // countPromise.then(async (amount)=>{
 
-        stream.on("error", (error) => {
-          console.error("Error streaming messages from MongoDB:", error);
-          reject(error);
-        });
+            //   console.log("Amount2:"+amount);
+            // })
 
-        stream.on("end", async () => {
-          // Delete the data once it has been streamed
-          try {
-            if (this.messageModel) {
-              await this.messageModel.deleteMany({});
-              console.log("Data in Mongo deleted successfully.");
-            } else {
-              console.log(`Message Mongoose Model is not intiated properly...`);
+            while (amount > 0) {
+              console.log("AmountInLoop1:" + amount)
+              try {
+                await extractData(messageFromBuffer, this.messageModel); // New function to process a batch
+              } catch (processError) {
+                console.error('Error processing batch:', processError);
+              }
+              console.log('Checking again...');
+              amount = await checkMessageCount(this.messageModel, this.bufferIdentifier);
+              console.log("AmountInLoop:" + amount)
             }
-          } catch (err) {
-            console.error("Error deleting data:", err);
-          }
+            console.log('All messages extracted.');
+          })
+          let amount: number = await countPromise
+
           resolve(true);
-        });
+        } catch (error) {
+          console.error('Error in releaseBufferedMessages:', error);
+          reject(false);
+        }
+
+        async function checkMessageCount(messageModel: Model<Message>, bufferIdentifier: string): Promise<any> {
+          return new Promise((resolve, reject) => {
+            messageModel.countDocuments({}).then((count) => {
+              console.log(`There is ${count} messages in database under ${bufferIdentifier} at the moment. Releasing them....`);
+              resolve(count)
+            }).catch((error) => {
+              console.error(error)
+              reject(error)
+            })
+          })
+        }
+
+        // Stream all the data inside the database out and deleting them
+        async function extractData(messageFromBuffer: Subject<Message>, messageModel: Model<Message>): Promise<any> {
+          return new Promise((resolve, reject) => {
+            const stream = messageModel.find().cursor();
+
+            stream.on("data", async (message) => {
+              // Process each message individually`
+              messageFromBuffer.next(message);
+            });
+
+            stream.on("error", (error) => {
+              console.error("Error streaming messages from MongoDB:", error);
+              reject(error);
+            });
+
+            stream.on("end", async () => {
+              // Delete the data once it has been streamed
+              try {
+                if (messageModel) {
+                  await messageModel.deleteMany({});
+                  console.log("Data in Mongo deleted successfully.");
+                } else {
+                  console.log(`Message Mongoose Model is not intiated properly...`);
+                }
+              } catch (err) {
+                console.error("Error deleting data:", err);
+              }
+              resolve(true);
+            });
+          })
+        }
       }
       if (!this.messageModel) {
         // If MongoDB model is not defined, use the local buffer
@@ -167,13 +229,9 @@ export class BufferService {
     });
   }
 
-  public getStateObservable(): BehaviorSubject<ConnectionState> {
-    return this.connectionState;
-  }
-
   private async transferLocalBufferToMongoDB(): Promise<void> {
     return new Promise((resolve, reject) => {
-      console.log(`Transferring local array buffered Message: currently there is ${this.messageBuffer.length}. Transferring to database...`);
+      console.log(`Transferring local array buffered Message under ${this.bufferIdentifier}: currently there is ${this.messageBuffer.length}. Transferring to database...`);
       if (this.messageModel) {
         let locallyBufferedMessage: Observable<Message> = from(this.messageBuffer);
         locallyBufferedMessage.subscribe({
@@ -191,7 +249,7 @@ export class BufferService {
           complete: () => {
             if (this.messageModel) {
               this.messageModel.countDocuments({}).then((count) => {
-                console.log(`Local buffered message transfer completed. There is a total of ${count} messages in database at the moment.`)
+                console.log(`Local buffered message transfer completed. There is a total of ${count} messages in database ${this.bufferIdentifier} at the moment.`)
                 this.messageBuffer = [] // Clear local buffer after transferring
               });
             }

+ 0 - 306
services/grpc.service.method.bak

@@ -1,306 +0,0 @@
-import * as grpc from '@grpc/grpc-js';
-import { Observable, Subject, Subscription } from "rxjs";
-import { Message, ConnectionAttribute, GrpcConnectionType, ConnectionState, MessageLog, ConnectionStatus, StreamAttribute, State } from "../interfaces/general.interface";
-import { Status } from '@grpc/grpc-js/build/src/constants';
-import { message_proto } from './protos/server.proto'
-import * as _ from 'lodash'
-export class GrpcServiceMethod {
-    private connectionAttributes: ConnectionAttribute[] = []
-    private updateConnectionStatusFlag: boolean = false
-    private server: grpc.Server | any
-    private messageToBeSendOver: Message | any
-    private clientRequest: Subject<ConnectionAttribute> = new Subject()
-    private localServerStatus: Subject<any> = new Subject()
-
-    public async create(connectionAttribute: ConnectionAttribute, connectionAttributes: ConnectionAttribute[]): Promise<any> {
-        this.connectionAttributes = connectionAttributes
-        return new Promise((resolve, reject) => {
-            this.createGrpcInstance({ instanceType: 'server' }, connectionAttribute)
-            this.createGrpcInstance({ instanceType: 'client' }, connectionAttribute)
-            resolve('Just putting it here for now....')
-        })
-    }
-
-    private async createGrpcInstance(grpcType: GrpcConnectionType, connectionAttribute: ConnectionAttribute) {
-        // Reconnection Logic here
-        while (true) {
-            try {
-                let recreatePromise = new Promise((resolve) => {
-                    if (grpcType.instanceType == 'server' && !this.server) {
-                        this.createServerStreamingServer(connectionAttribute).then(() => {
-                            resolve('recreate')
-                        })
-                    }
-                    if (grpcType.instanceType == 'client') {
-                        this.createServerStreamingClient(connectionAttribute).then(() => {
-                            resolve('recreate')
-                        })
-                    }
-                })
-                await recreatePromise
-            } catch (error) {
-                console.error('Connection attempt failed:', error);
-            }
-            await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
-            // timeout generate message to trigger this reconnection
-        }
-    }
-
-    // Create Server Instance to stream all application Outgoing messages    
-    public async createServerStreamingServer(connectionAttribute: ConnectionAttribute): Promise<any> { // '0.0.0.0:3001'
-        return new Promise((resolve, reject) => {
-            try {
-                let statusChain: State = 1
-                if (statusChain == 1) {
-                    // Bind and start the server
-                    this.server = new grpc.Server()
-                    this.server.bindAsync(connectionAttribute.outGoing.serverUrl, grpc.ServerCredentials.createInsecure(), () => {
-                        console.log(`gRPC server is running on ${connectionAttribute.outGoing.serverUrl}`);
-                        this.server.start();
-                    });
-                    this.localServerStatus.next({
-                        connectionStatus: 'ON',
-                        connectionID: connectionAttribute.ConnectionID.local,
-                        message: `${connectionAttribute.outGoing.serverUrl} started.`
-                    })
-                }
-
-                if (statusChain == 1 && !this.server) {
-                    this.server.addService(message_proto.Message.service, {
-                        HandleMessage: (call) => {
-                            /// add a checking for standard message request
-                            let clientInfo: ConnectionAttribute = JSON.parse(call.request.message)
-                            if (this.isConnectionAttribute(clientInfo)) {
-                                // this.clientRequest.next(clientInfo)
-                                console.log(`Initializing stream. Opening Channel... Confirmation from ${call.request.id}`)
-                                let result: ConnectionAttribute | undefined = this.connectionAttributes.find(connectionAttribute => connectionAttribute.ConnectionID.local = clientInfo.ConnectionID.remote)
-                                if (!result) {
-                                    console.log(`No connectionID match. This should be new connection...`)
-                                    let subscription: Subscription = connectionAttribute.outGoing.MessageToBePublished!.subscribe({
-                                        next: (response: Message) => {
-                                            console.log(`Sending from GRPC server: ${(response.message as MessageLog).appData.msgId} `)
-                                            let message = {
-                                                id: response.id,
-                                                message: JSON.stringify(response.message)
-                                            }
-                                            call.write(message)
-                                        },
-                                        error: err => {
-                                            console.error(err)
-                                            subscription.unsubscribe()
-                                            resolve('')
-                                        },
-                                        complete: () => {
-                                            console.log(`Stream response completed for ${call.request.id}`)
-                                            subscription.unsubscribe()
-                                            resolve('')
-                                        }
-                                    })
-                                    let report: ConnectionState = {
-                                        status: 'DIRECT_PUBLISH'
-                                    }
-                                    connectionAttribute.connectionStatus!.next(report)
-                                }
-                                if (result && result.outGoing.connectionState == `OFF`) {
-                                    // reassign previously matched buffer
-                                    console.log(`Connection info found in array matched. Assigning buffer....`)
-                                    let subscription: Subscription = result.outGoing.MessageToBePublished!.subscribe({
-                                        next: (response: Message) => {
-                                            console.log(`Sending from GRPC server: ${(response.message as MessageLog).appData.msgId} `)
-                                            let message = {
-                                                id: response.id,
-                                                message: JSON.stringify(response.message)
-                                            }
-                                            call.write(message)
-                                        },
-                                        error: err => {
-                                            console.error(err)
-                                            subscription.unsubscribe()
-                                            resolve('')
-                                        },
-                                        complete: () => {
-                                            console.log(`Stream response completed for ${call.request.id}`)
-                                            subscription.unsubscribe()
-                                            resolve('')
-                                        }
-                                    })
-                                    let report: ConnectionState = {
-                                        status: 'DIRECT_PUBLISH'
-                                    }
-                                    result.connectionStatus!.next(report)
-                                }
-
-                            } else {
-                                console.error(`INVALID REQUEST! Client request does not fulfill criteria`)
-                            }
-                        },
-                        Check: (_, callback) => {
-                            // for now it is just sending the status message over to tell the client it is alive
-                            // For simplicity, always return "SERVING" as status
-                            callback(null, { status: 'SERVING' });
-                        },
-                    });
-                }
-            }
-            catch (error) {
-                resolve(error)
-            }
-        })
-    }
-
-    // Send a request over to the other server to open a channel for this server to emit/stream messages over
-    public async createServerStreamingClient(connectionAttribute: ConnectionAttribute): Promise<string> {
-        return new Promise(async (resolve, reject) => {
-            const client = new message_proto.Message(connectionAttribute.inComing.serverUrl, grpc.credentials.createInsecure());
-            let localInfo: ConnectionAttribute = { // need to make a new copy where it doesn't reference the subjects, otherwise circular ref error
-                ConnectionID: connectionAttribute.ConnectionID,
-                outGoing: {
-                    StreamID: connectionAttribute.outGoing.StreamID,
-                    PublisherID: connectionAttribute.outGoing.PublisherID,
-                    SubscriberID: connectionAttribute.outGoing.SubscriberID,
-                    serverUrl: connectionAttribute.outGoing.serverUrl,
-                    MessageToBePublished: null,
-                    MessageToBeReceived: null
-                },
-                inComing: {
-                    StreamID: connectionAttribute.inComing.StreamID,
-                    PublisherID: connectionAttribute.inComing.PublisherID,
-                    SubscriberID: connectionAttribute.inComing.SubscriberID,
-                    serverUrl: connectionAttribute.inComing.serverUrl,
-                    MessageToBePublished: null,
-                    MessageToBeReceived: null
-                },
-                connectionStatus: null
-            }
-            let call = client.HandleMessage({ id: connectionAttribute.inComing.serverUrl, message: JSON.stringify(localInfo) })
-
-            console.log(`Sending request to ${connectionAttribute.inComing.serverUrl} to open response channel...`)
-
-            call.on('status', (status: Status) => {
-                if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
-                    console.log(`Message trasmission operation is successful`)
-                    // RPC completed successfully
-                } if (status == grpc.status.UNAVAILABLE) {
-                    let report: ConnectionState = {
-                        status: 'BUFFER',
-                        reason: `Server doesn't seem to be alive. Error returned.`,
-                        payload: this.messageToBeSendOver ?? `There's no message at the moment...`
-                    }
-                    connectionAttribute.connectionStatus!.next(report)
-                    let clientStatusUpdateInfo: any = {
-                        connectionStatus: 'OFF',
-                        connectionID: connectionAttribute.ConnectionID.remote,
-                        message: `${connectionAttribute.outGoing.serverUrl} started.`
-                    }
-                    this.clientRequest.next(clientStatusUpdateInfo)
-                    resolve('No connection established. Server is not responding..')
-                }
-            });
-
-            call.on('data', (data: any) => {
-                let response: Message = {
-                    id: data.id,
-                    message: JSON.parse(data.message)
-                }
-                if (connectionAttribute.inComing.MessageToBeReceived) {
-                    connectionAttribute.inComing.MessageToBeReceived.next(response)
-                }
-            });
-
-            call.on('error', (err) => {
-                console.error(err)
-                resolve('')
-            });
-        })
-    }
-
-
-    // THis is no longer necesarry after the introduction of connection Attribute. But it is still useful for checking for the other side's health
-    public async checkConnectionHealth(client: any, statusControl: Subject<ConnectionState>, alreadyHealthCheck: boolean): Promise<boolean> {
-        return new Promise((resolve, reject) => {
-            client.Check({}, (error, response) => {
-                if (response) {
-                    console.log(`GRPC Health check status: ${response.status} Server Connected`);
-                    // Intepret the response status and implement code logic or handler
-                    resolve(response.status)
-                } else {
-                    if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
-                    reject(false)
-                }
-            })
-        })
-    }
-
-    // TO check or validate if the client request meets the criteria
-    private isConnectionAttribute(obj: any): obj is ConnectionAttribute {
-        const isMatch = (
-            typeof obj.ConnectionID === 'object' && // Further checks can be added based on the structure of ConnectionID
-            isStreamAttribute(obj.outGoing) &&
-            isStreamAttribute(obj.inComing) &&
-            (obj.connectionStatus === null || obj.connectionStatus instanceof Subject)
-        );
-
-        if (isMatch) {
-            console.log('gRPC client call matches ConnectionAttribute type');
-        } else {
-            console.log('gRPC client call does not match ConnectionAttribute type');
-        }
-
-        return isMatch;
-        function isStreamAttribute(obj: any): obj is StreamAttribute {
-            return (
-                (typeof obj.StreamID === 'string' || obj.StreamID === undefined) &&
-                (typeof obj.PublisherID === 'string' || obj.PublisherID === undefined) &&
-                (typeof obj.SubscriberID === 'string' || obj.SubscriberID === undefined) &&
-                // Check other properties like PublisherInstance and SubscriberInstance based on their expected types
-                (typeof obj.serverUrl === 'string' || obj.serverUrl === undefined) &&
-                // Check connectionState based on its type, assuming it's an enum or similar
-                (obj.MessageToBePublished === null || obj.MessageToBePublished instanceof Observable) &&
-                (obj.MessageToBeReceived === null || obj.MessageToBeReceived instanceof Subject)
-            );
-        }
-    }
-
-    // update the referenced array 
-    // public updateConnectionStatus(connectionAttributeArray: ConnectionAttribute[]) {
-    //     if (this.updateConnectionStatusFlag === false) {
-    //         this.updateConnectionStatusFlag = true
-    //         this.localServerStatus.subscribe({
-    //             next: (notification: any) => {
-    //                 if (notification.connectionStatus === `ON`) {
-    //                     let connectionAttribute = connectionAttributeArray.find(connection => connection.ConnectionID.local == notification.connectionID)
-    //                     if (connectionAttribute) {
-    //                         connectionAttribute.outGoing.connectionState = 'ON'
-    //                         console.log(`Local Connection ${notification.connectionID} updated. {${connectionAttribute.outGoing.connectionState}}`)
-    //                     } else {
-    //                         console.log(`Local Connection ${notification.connectionID} attribute is not found.`)
-    //                     }
-    //                 }
-    //             },
-    //             error: err => console.error(err),
-    //             complete: () => { }
-    //         })
-    //         this.clientRequest.subscribe({
-    //             next: (clientConnectionAttribute: ConnectionAttribute) => {
-    //                 console.log(`Received a request from ${clientConnectionAttribute.outGoing.serverUrl}`)
-    //                 let connectionAttribute = connectionAttributeArray.find(connection => connection.ConnectionID.remote == clientConnectionAttribute.ConnectionID.local)
-    //                 if (connectionAttribute) {
-    //                     connectionAttribute.inComing.connectionState = 'ON'
-    //                     console.log(`Client Connection ${clientConnectionAttribute.ConnectionID.local} updated. {${connectionAttribute.inComing.connectionState}}`)
-    //                 } else {
-    //                     console.log(`Client Connection Attribute ${clientConnectionAttribute.inComing.PublisherID} is not found`)
-    //                 }
-    //             },
-    //             error: err => console.error(err),
-    //             complete: () => { }
-    //         })
-    //     } else {
-    //         console.log(`Update Connection Status already called.`)
-    //     }
-    // }
-}
-// https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
-
-
-
-

+ 5 - 1
services/grpc.service.method.ts

@@ -4,13 +4,14 @@ import { Message, ConnectionAttribute, GrpcConnectionType, ConnectionState, Stre
 import { Status } from '@grpc/grpc-js/build/src/constants';
 import { message_proto } from './protos/server.proto'
 import * as _ from 'lodash'
+import { ServerClientManager } from './server-client.service';
 export class GrpcServiceMethod {
     private connectionAttribute: ConnectionAttribute | undefined
     private connectionAttributes: ConnectionAttribute[] = []
     private server: grpc.Server | any
     private messageToBeSendOver: Message | any
     private clientRequest: Subject<ConnectionAttribute> = new Subject()
-
+  
 
     // public interface for service client to establish connection. They will give server and client information as a pair
     public async create(connectionAttribute: ConnectionAttribute, connectionAttributes: ConnectionAttribute[]): Promise<any> {
@@ -55,6 +56,9 @@ export class GrpcServiceMethod {
             this.server.addService(message_proto.Message.service, {
                 HandleMessage: (call) => {
                     let clientRequest: ConnectionAttribute = JSON.parse(call.request.message)
+                    /* Receive request, it will talk server client service, scs will do the checking instead of doing them here,
+                    two scenario. if this is a new client, then a new subject is assigned.
+                    IF this is previous channel, an existing subject is assigned back. */
                     // client Request validation
                     if (this.isConnectionAttribute(clientRequest)) {
                         // Check if this connection exists

+ 6 - 4
services/server-client.service.ts

@@ -23,8 +23,10 @@ export class ServerClientManager {
     }
 
     constructor() {
+        // this.generateConnection({})
     }
 
+    // create connection based on the request from the application
     public async generateConnection(request: ConnectionRequest): Promise<any> {
         return new Promise(async (resolve, reject) => {
             let initialReport: ConnectionState
@@ -92,6 +94,7 @@ export class ServerClientManager {
             }
 
             if (statusChain == 1) {
+                // to prevent duplicate connection from being created
                 await this.checkConnectionAttribute(connectionAttribute!).then((res) => {
                     if (res == true) {
                         console.log(`Connection<${connectionAttribute.ConnectionID.local}> already exists `)
@@ -111,12 +114,11 @@ export class ServerClientManager {
                 }
                 // For each connection type:
                 if (request.client!.connectionType == 'GRPC') {
-                    // this.grpcService.updateConnectionStatus(this.connectionAttributes)
                     this.grpcService.create(connectionAttribute!, this.connectionAttributes).then(() => {
-                        // logic here
+                       // logic here
                     }).catch(() => {
-                        errorString = `Something wrong with gRPC methods`
-                        statusChain = -1
+                       errorString = `Something wrong with gRPC methods`
+                       statusChain = -1
                     })
                 }
             }

+ 15 - 15
test/grpc1.ts

@@ -6,7 +6,7 @@ import { ServerClientManager } from '../services/server-client.service';
 import mongoose from 'mongoose';
 
 // Connect to MongoDB
-// mongoose.connect('mongodb://localhost:27017/grpc1')
+// mongoose.connect('mongodb://localhost:27017/G0')
 // const Message = mongoose.model('Message', require('../models/message.schema'))
 // Subject for bidirectional communication
 const connectionService: ServerClientManager = new ServerClientManager()
@@ -171,19 +171,19 @@ connectionService.generateConnection(connectionRequest2).then((res) => {
   // console.log(res)
 })
 
-let generateFakeMessagesToBePublished = from(parsedMessages).pipe(take(50))
-generateFakeMessagesToBePublished.subscribe({
-  next: message => {
-    let payload: Message = {
-      id: hostServer,
-      message: message
-    }
-    // connectionRequest1.server!.messageToBePublishedFromApplication.next(payload)
-    // connectionRequest2.server!.messageToBePublishedFromApplication.next(payload)
-  },
-  error: error => console.error(error),
-  complete: () => console.log(`Completed`)
-})
+// let generateFakeMessagesToBePublished = from(parsedMessages).pipe(take(50))
+// generateFakeMessagesToBePublished.subscribe({
+//   next: message => {
+//     let payload: Message = {
+//       id: hostServer,
+//       message: message
+//     }
+//     connectionRequest1.server!.messageToBePublishedFromApplication.next(payload)
+//     connectionRequest2.server!.messageToBePublishedFromApplication.next(payload)
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Completed`)
+// })
 
 
 stream().subscribe({
@@ -329,7 +329,7 @@ function stream(): Subject<any> {
   const intervalId = setInterval(() => {
     result.next(messages[count]);
     count++;
-    if (count >= 300) {
+    if (count >= 100) {
       clearInterval(intervalId);
       result.complete();
     }

+ 1 - 1
test/grpc2.ts

@@ -5,7 +5,7 @@ import { readFileSync } from 'fs';
 import { ServerClientManager } from '../services/server-client.service';
 import mongoose from 'mongoose';
 
-mongoose.connect('mongodb://localhost:27017/grpc2')
+mongoose.connect('mongodb://localhost:27017/G1')
 const Message = mongoose.model('Message', require('../models/message.schema'))
 // Subject for bidirectional communication
 const connectionService: ServerClientManager = new ServerClientManager()

+ 1 - 1
test/grpc3.ts

@@ -5,7 +5,7 @@ import { readFileSync } from 'fs';
 import { ServerClientManager } from '../services/server-client.service';
 import mongoose from 'mongoose';
 
-mongoose.connect('mongodb://localhost:27017/grpc2')
+mongoose.connect('mongodb://localhost:27017/G2')
 const Message = mongoose.model('Message', require('../models/message.schema'))
 // Subject for bidirectional communication
 const connectionService: ServerClientManager = new ServerClientManager()