Pārlūkot izejas kodu

further refinements

enzo 1 gadu atpakaļ
vecāks
revīzija
504fe63403

+ 1 - 1
.env

@@ -13,4 +13,4 @@ ReconnectionAttempt = '1000'
 
 TimeOut = '10'
 
-MaxBufferLoad = '1'
+MaxBufferLoad = '20'

+ 1 - 0
interfaces/general.interface.ts

@@ -70,6 +70,7 @@ export interface ChannelAttribute {
 }
 
 export interface ConnectionRequest {
+    database: string,
     server: ServerRequest,
     client: ClientRequest
 }

+ 19 - 9
services/fis.retransmission.service.ts

@@ -6,16 +6,16 @@ require('dotenv').config();
 
 // Implement status chain refactoring
 export class FisRetransmissionService {
-    private mongoUrl: string = process.env.MONGO + 'emergencyStorage'
+    private mongoUrl: string = process.env.MONGO as string
     private bufferedStorage: Message[] = []
     private mongoConnection: any
     private messageModel: any
     private maximumBufferLength: number = parseInt(process.env.MaxBufferLoad as string) // please configure at environment
     // private statusReport: Subject<ReportStatus> = new Subject()
 
-    constructor() {
+    constructor(private databaseName: string) {
         // Connect to mongoDB. 
-        this.manageMongoConnection()
+        this.manageMongoConnection(databaseName)
     }
 
     // Main function that intercepts outgoing messages by communicating || intepreting report status from grpc connection as indicator 
@@ -99,12 +99,21 @@ export class FisRetransmissionService {
             /* Stop buffering the message in local instance, but start saving them in database. Must first transfer the ones in local buffer before redirecting the 
             flow from applicationOutgoingMessage into Mongo */
             if (report.code == ColorCode.RED) {
-                console.log(`Connection status report: Server down. ${report.message} lol`)
+                console.log(`Connection status report: ${report.message}`)
+                if (report.payload) {
+                    console.log(`Rebuffering ${report.payload.message?.appData?.msgId} into storage...`)
+                    this.saveToMongo(report.payload)
+                }
+                console.log(`Connection status report && ${report.message ?? 'No Message'}`)
                 let status: Status = 1
                 if (status === 1) {
                     messageStreamToMongo = this.activateMongoStreamSubscription(messageStreamToMongo, applicationOutgoingMessage)
                     if (!messageStreamToMongo) status = 0
                 }
+                if (status === 1) {
+                    messageReleaseSubscription = this.deactivateReleaseSubscription(messageReleaseSubscription)
+                    if (messageReleaseSubscription) status = 0
+                }
                 if (status === 1) {
                     messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)
                     if (messageBufferSubscription) status = 0
@@ -331,12 +340,13 @@ export class FisRetransmissionService {
     }
 
     // Connect to designated mongodatabase.
-    private async connectToMongoDatabase(): Promise<any> {
+    private async connectToMongoDatabase(databaseName: string): Promise<any> {
         return new Promise((resolve, reject) => {
             let status: Status = 1
             if (status = 1) {
-                console.log(this.mongoUrl)
-                this.mongoConnection = mongoose.createConnection(this.mongoUrl)
+                let database = this.mongoUrl + databaseName
+                console.log(database)
+                this.mongoConnection = mongoose.createConnection(database)
                 this.mongoConnection.on('error', (error) => {
                     console.error('Connection error:', error);
                     resolve('')
@@ -350,10 +360,10 @@ export class FisRetransmissionService {
     }
 
     // Manage mongoCOnnectino. The logic used would be different across differnet application. This will loop the process indefinitely os it is always trying to connect to database.
-    private async manageMongoConnection(): Promise<boolean> {
+    private async manageMongoConnection(databaseName: string): Promise<boolean> {
         while (true) {
             try {
-                await this.connectToMongoDatabase()
+                await this.connectToMongoDatabase(databaseName)
             } catch (error) {
                 console.log(`Something Wrong occured. Please check at manageMongoConnection`)
             }

+ 22 - 28
services/grpc.service.method.ts

@@ -3,7 +3,7 @@ import { Subject, Subscription } from "rxjs";
 import { ReportStatus, ColorCode, Message, MessageLog, ConnectionAttribute, ConnectionRequest, GrpcConnectionType } from "../interfaces/general.interface";
 import { Status } from '@grpc/grpc-js/build/src/constants';
 import { v4 as uuidv4 } from 'uuid'
-const message_proto = require('./protos/server.proto')
+import { message_proto } from './protos/server.proto'
 export class GrpcServiceMethod {
 
     public async create(request: ConnectionRequest, connectionAttribute: ConnectionAttribute): Promise<any> {
@@ -38,7 +38,7 @@ export class GrpcServiceMethod {
             connectionAttribute.ConnectionID.remote = connectionAttribute.inComing.ChannelID + connectionAttribute.outGoing.ChannelID
             let report: ReportStatus = {
                 code: ColorCode.GREEN,
-                message: `ConnectionID: ${connectionAttribute.ConnectionID.local} && ${connectionAttribute.ConnectionID.remote} `,
+                message: `ConnectionID acquired. Informing Restranmission to release Messages...`,
             }
             connectionAttribute.connectionStatus.next(report)
             console.log(connectionAttribute)
@@ -77,25 +77,25 @@ export class GrpcServiceMethod {
                 consecutiveResolutions++;
                 // console.log(`Reconnection Attempt: ${consecutiveResolutions}`)
                 alreadyHealthCheck = true
-                // If there are x consecutive resolutions, log an error and break the loop
-                if (consecutiveResolutions >= parseInt(process.env.ReconnectionAttempt as string) && redErrorEmission == false) {
+                if (redErrorEmission == false) {
                     redErrorEmission = true
-                    console.error(`Connection failed ${consecutiveResolutions} times. Stopping connection attempts.`);
+                    // console.error(`Connection failed ${consecutiveResolutions} times. Stopping connection attempts.`);
                     let error: ReportStatus = {
                         code: ColorCode.RED,
-                        message: 'Initiate Doomsday protocol....',
+                        message: 'Server is not responding. Proceed to buffer.',
                     }
                     statusControl.next(error)
                 }
-                if (consecutiveResolutions < parseInt(process.env.ReconnectionAttempt as string) && yellowErrorEmission == false) {
-                    yellowErrorEmission = true
-                    let error: ReportStatus = {
-                        code: ColorCode.YELLOW,
-                        // message: `Reconnection Attempt: ${consecutiveResolutions}. Server has yet to respond`
-                        message: `Attempting reconnection... Server has yet to respond`,
-                    }
-                    statusControl.next(error);
-                }
+                // Comment it out if Client wishes to use YELLOW for memory buffer instead of persistent storage buffer
+                // if (consecutiveResolutions < parseInt(process.env.ReconnectionAttempt as string) && yellowErrorEmission == false) {
+                //     yellowErrorEmission = true
+                //     let error: ReportStatus = {
+                //         code: ColorCode.YELLOW,
+                //         // message: `Reconnection Attempt: ${consecutiveResolutions}. Server has yet to respond`
+                //         message: `Attempting reconnection... Server has yet to respond`,
+                //     }
+                //     statusControl.next(error);
+                // }
             } catch (error) {
                 // Connection did not resolve, reset the count
                 consecutiveResolutions = 0;
@@ -184,9 +184,7 @@ export class GrpcServiceMethod {
         return new Promise(async (resolve, reject) => {
             const client = new message_proto.Message(server, grpc.credentials.createInsecure());
             // perform check to see if server is alive, if not terminate this grpc instant and create again
-            this.checkConnectionHealth(client, connectionAttribute.connectionStatus, alreadyHealthCheck).catch(() => {
-                resolve('')
-            })
+
             let outGoingInfo: any = {
                 channelID: uuidv4(),
                 publisherID: uuidv4(),
@@ -203,10 +201,9 @@ export class GrpcServiceMethod {
                     console.log(`Message trasmission operation is successful`)
                     // RPC completed successfully
                 } if (status == grpc.status.UNAVAILABLE) {
-                    let report = {
-                        code: ColorCode.YELLOW,
+                    let report: ReportStatus = {
+                        code: ColorCode.RED,
                         message: `Server doesn't seem to be alive. Error returned.`,
-                        from: `Server Streaming Client Instance`
                     }
                     connectionAttribute.connectionStatus.next(report)
                     resolve('No connection established. Server is not responding..')
@@ -231,20 +228,17 @@ export class GrpcServiceMethod {
     }
 
 
-    // Check connection To be Update. This function is destroying my code flow
+    // THis is no longer necesarry after the introduction of connection Attribute. But it is still useful for checking for the other side's health
     public async checkConnectionHealth(client: any, statusControl: Subject<ReportStatus>, alreadyHealthCheck: boolean): Promise<boolean> {
         return new Promise((resolve, reject) => {
             client.Check({}, (error, response) => {
                 if (response) {
                     console.log(`GRPC Health check status: ${response.status} Server Connected`);
-                    let report: ReportStatus = {
-                        code: ColorCode.GREEN,
-                        message: `Good to go!!!`,
-                    }
-                    statusControl.next(report)
+                    // Intepret the response status and implement code logic or handler
+                    resolve(response.status)
                 } else {
                     if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
-                    resolve(false)
+                    reject(false)
                 }
             })
         })

+ 1 - 2
services/protos/server.proto.ts

@@ -10,6 +10,5 @@ const packageDefinition = protoLoader.loadSync(PROTO_PATH, {
   defaults: true,
   oneofs: true,
 });
-const message_proto: any = grpc.loadPackageDefinition(packageDefinition).message;
-module.exports = message_proto;
+export const message_proto: any = grpc.loadPackageDefinition(packageDefinition).message;
 //# sourceMappingURL=health.js.map

+ 12 - 12
services/server-client.service.ts

@@ -13,7 +13,7 @@ export class ServerClientManager {
     public generateConnection(request: ConnectionRequest) {
         let initialReport: ReportStatus = { code: ColorCode.YELLOW, message: 'Initialization of the subject' }
         let reportSubject: BehaviorSubject<ReportStatus> = new BehaviorSubject(initialReport)
-        let retransmission: FisRetransmissionService = new FisRetransmissionService()
+        let retransmission: FisRetransmissionService = new FisRetransmissionService(request.database)
         let messageToBePublished: Subject<Message> = retransmission.handleMessage(request.server.messageToBePublishedfromApplication, reportSubject)
 
         let connectionAttribute: ConnectionAttribute = {
@@ -58,17 +58,17 @@ export class ServerClientManager {
         //             console.log(`Remote alive`)
         //         }
 
-                // if (connectionStatus.localStatus == ConnectionStatus.GREEN && connectionStatus.remoteStatus == ConnectionStatus.GREEN) {
-                //     let report: ReportStatus = {
-                //         code: ColorCode.GREEN,
-                //         message: `Both Local and Remote are connected`
-                //     }
-                //     // reportSubject.next(report)
-                //     console.log(`Both local and remote alive`)
-                // }
-            // },
-            // error: (err) => console.error(err),
-            // complete: () => { }
+        // if (connectionStatus.localStatus == ConnectionStatus.GREEN && connectionStatus.remoteStatus == ConnectionStatus.GREEN) {
+        //     let report: ReportStatus = {
+        //         code: ColorCode.GREEN,
+        //         message: `Both Local and Remote are connected`
+        //     }
+        //     // reportSubject.next(report)
+        //     console.log(`Both local and remote alive`)
+        // }
+        // },
+        // error: (err) => console.error(err),
+        // complete: () => { }
         // })
 
     }

+ 31 - 30
test/grpc1.ts

@@ -13,6 +13,7 @@ let targetserver2: string = 'localhost:3002'
 let hostServer: string = 'localhost:3000'
 let array: any[] = [] // Used for testing                     
 let connectionRequest: ConnectionRequest = {
+  database: 'grpc1',
   server: {
     serverUrl: hostServer,
     connectionType: 'GRPC',
@@ -54,39 +55,39 @@ connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
 connectionService.generateConnection(connectionRequest)
 
 /* Simple Test */
-let generateFakeMessagesToBePublished = stream().pipe(take(10))
-generateFakeMessagesToBePublished.subscribe({
-  next: message => {
-    let payload: Message = {
-      id: hostServer,
-      message: message
-    }
-    connectionRequest.server.messageToBePublishedfromApplication.next(payload)
-  }
-})
+// let generateFakeMessagesToBePublished = stream().pipe(take(10))
+// generateFakeMessagesToBePublished.subscribe({
+//   next: message => {
+//     let payload: Message = {
+//       id: hostServer,
+//       message: message
+//     }
+//     connectionRequest.server.messageToBePublishedfromApplication.next(payload)
+//   }
+// })
 
 
 /* Complex Test: Expected out come, both must receive 14 message by the end. Havent try to disconnect.*/
-// setTimeout(() => {
-//   let message = {
-//     id: parsedMessages[10].appData.msgId,
-//     message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
-//   }
-//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
-// }, 3000)
-// setTimeout(() => {
-//   let message = {
-//     id: parsedMessages[11].appData.msgId,
-//     message: parsedMessages[11]// Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
-//   }
-//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
-// }, 4000)
-// setTimeout(() => {
-//   console.log(`All received data: ${array.length}`)
-// }, 10000)
-// setTimeout(() => {
-//   console.log(`All received data: ${array.length}`)
-// }, 20000)
+setTimeout(() => {
+  let message = {
+    id: parsedMessages[10].appData.msgId,
+    message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
+  }
+  connectionRequest.server.messageToBePublishedfromApplication.next(message)
+}, 3000)
+setTimeout(() => {
+  let message = {
+    id: parsedMessages[11].appData.msgId,
+    message: parsedMessages[11]// Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
+  }
+  connectionRequest.server.messageToBePublishedfromApplication.next(message)
+}, 4000)
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 10000)
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 20000)
 
 
 // this is just to publish an array of fake data as a Subject

+ 23 - 22
test/grpc2.ts

@@ -13,6 +13,7 @@ let targetserver2: string = 'localhost:3002'
 let hostServer: string = 'localhost:3001'
 let array: any[] = [] // Used for testing                     
 let connectionRequest: ConnectionRequest = {
+  database: 'grpc2',
   server: {
     serverUrl: hostServer,
     connectionType: 'GRPC',
@@ -59,34 +60,34 @@ connectionService.generateConnection(connectionRequest)
 //   next: message => {
 //     let payload: Message = {
 //       id: hostServer,
-//       message: JSON.stringify(message)
+//       message: message
 //     }
 //     connectionRequest.server.messageToBePublishedfromApplication.next(payload)
 //   }
 // })
 
 
-/* Complex Test */
-// setTimeout(() => {
-//   let message = {
-//     id: parsedMessages[10].appData.msgId,
-//     message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
-//   }
-//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
-// }, 3000)
-// setTimeout(() => {
-//   let message = {
-//     id: parsedMessages[11].appData.msgId,
-//     message: parsedMessages[11]// Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
-//   }
-//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
-// }, 4000)
-// setTimeout(() => {
-//   console.log(`All received data: ${array.length}`)
-// }, 10000)
-// setTimeout(() => {
-//   console.log(`All received data: ${array.length}`)
-// }, 20000)
+/* Complex Test: Expected out come, both must receive 14 message by the end. Havent try to disconnect.*/
+setTimeout(() => {
+  let message = {
+    id: parsedMessages[10].appData.msgId,
+    message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
+  }
+  connectionRequest.server.messageToBePublishedfromApplication.next(message)
+}, 3000)
+setTimeout(() => {
+  let message = {
+    id: parsedMessages[11].appData.msgId,
+    message: parsedMessages[11]// Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
+  }
+  connectionRequest.server.messageToBePublishedfromApplication.next(message)
+}, 4000)
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 10000)
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 20000)
 
 
 // this is just to publish an array of fake data as a Subject

+ 23 - 22
test/grpc3.ts

@@ -13,6 +13,7 @@ let targetserver2: string = 'localhost:3002'
 let hostServer: string = 'localhost:3002'
 let array: any[] = [] // Used for testing                     
 let connectionRequest: ConnectionRequest = {
+  database: 'grpc2',
   server: {
     serverUrl: hostServer,
     connectionType: 'GRPC',
@@ -59,34 +60,34 @@ connectionService.generateConnection(connectionRequest)
 //   next: message => {
 //     let payload: Message = {
 //       id: hostServer,
-//       message: JSON.stringify(message)
+//       message: message
 //     }
 //     connectionRequest.server.messageToBePublishedfromApplication.next(payload)
 //   }
 // })
 
 
-/* Complex Test */
-// setTimeout(() => {
-//   let message = {
-//     id: parsedMessages[10].appData.msgId,
-//     message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
-//   }
-//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
-// }, 3000)
-// setTimeout(() => {
-//   let message = {
-//     id: parsedMessages[11].appData.msgId,
-//     message: parsedMessages[11]// Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
-//   }
-//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
-// }, 4000)
-// setTimeout(() => {
-//   console.log(`All received data: ${array.length}`)
-// }, 10000)
-// setTimeout(() => {
-//   console.log(`All received data: ${array.length}`)
-// }, 20000)
+/* Complex Test: Expected out come, both must receive 14 message by the end. Havent try to disconnect.*/
+setTimeout(() => {
+  let message = {
+    id: parsedMessages[10].appData.msgId,
+    message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
+  }
+  connectionRequest.server.messageToBePublishedfromApplication.next(message)
+}, 3000)
+setTimeout(() => {
+  let message = {
+    id: parsedMessages[11].appData.msgId,
+    message: parsedMessages[11]// Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
+  }
+  connectionRequest.server.messageToBePublishedfromApplication.next(message)
+}, 4000)
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 10000)
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 20000)
 
 
 // this is just to publish an array of fake data as a Subject