فهرست منبع

proper transmissoin to corresponding requests

enzo 11 ماه پیش
والد
کامیت
571f629e45
11فایلهای تغییر یافته به همراه325 افزوده شده و 187 حذف شده
  1. 0 60
      grpc/grpc1.ts
  2. 0 57
      grpc/grpc2.ts
  3. 25 2
      index.ts
  4. 2 1
      interfaces/general.interface.ts
  5. 3 2
      package.json
  6. 32 32
      services/fis.retransmission.service.ts
  7. 72 33
      services/grpc.service.ts
  8. 77 0
      test/grpc1.ts
  9. 76 0
      test/grpc2.ts
  10. 0 0
      test/health.ts
  11. 38 0
      test/test.ts

+ 0 - 60
grpc/grpc1.ts

@@ -1,60 +0,0 @@
-import { Subject } from "rxjs";
-import { GrpcService } from "../services/grpc.service";
-import * as fs from 'fs'
-import { FisErrorHandlingService } from "../services/error.handling.service.fis";
-import { ReportStatus } from "../interfaces/general.interface";
-
-const messagesJSON: any = fs.readFileSync('payload.json')
-const errorHandlingService: FisErrorHandlingService = new FisErrorHandlingService()
-const gprcService: GrpcService = new GrpcService()
-
-let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial 
-let dataMessages = stream() // Emulate messges to be sent over to target server
-let messageToBePublished: Subject<any> = new Subject()
-let statusControl: Subject<ReportStatus> = new Subject()
-
-/* For server streaming */
-errorHandlingService.handleMessage(dataMessages, statusControl).subscribe((messages) => {
-  messageToBePublished.next(messages)
-})
-let server1 = 'localhost:3000'
-gprcService.createGrpcInstance(server1, messageToBePublished, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' })
-
-
-/* For bidiretional streaming*/
-// errorHandlingService.handleMessage(dataMessages, statusControl).subscribe((messages) => {
-//   messageToBePublished.next(messages)
-// })
-// let server1 = 'localhost:3000'
-// gprcService.createGrpcInstance(server1, messageToBePublished, statusControl, { instanceType: 'server', serviceMethod: 'bidirectional' })
-
-
-// setTimeout(() => {
-//   gprcService.stopServer(server1).then((res) => {
-//     gprcService.getAllGrpcServerConnectionInstance()
-//   })
-// }, 3000)
-
-// setTimeout(() => {
-//   gprcService.createGrpcServerStreamingServer(server1).then((res) => {
-//     gprcService.getAllGrpcServerConnectionInstance()
-//   })
-// }, 10000)
-
-
-// this is just to publish an array of fake data as a Subject
-function stream(): Subject<any> {
-  let result: Subject<any> = new Subject()
-  let messages: any[] = parsedMessages
-  let count = 0
-  const intervalId = setInterval(() => {
-    result.next(messages[count]);
-    count++;
-    if (count >= 1000) {
-      clearInterval(intervalId);
-      result.complete();
-    }
-  }, 500)
-
-  return result
-}

+ 0 - 57
grpc/grpc2.ts

@@ -1,57 +0,0 @@
-import * as fs from 'fs'
-import { Subject } from 'rxjs';
-import { ColorCode, ReportStatus } from '../interfaces/general.interface';
-import { GrpcService } from '../services/grpc.service';
-import { FisErrorHandlingService } from '../services/error.handling.service.fis';
-
-// Subject for bidirectional communication
-const errorHandlingService: FisErrorHandlingService = new FisErrorHandlingService()
-const grpcService: GrpcService = new GrpcService()
-const messagesJSON: any = fs.readFileSync('payload.json')
-let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial 
-let messageToBeReleased: Subject<any> = new Subject() // Sample message to be transmitted over to target server
-let statusControl: Subject<ReportStatus> = new Subject() // Listening for error events and states
-let dataMessages = stream() // Emulate messges to be sent over to target server
-let server1: string = 'localhost:3000'
-let unaryRequestSubject: Subject<any> = new Subject()
-
-/* Server Streaming Test case */
-errorHandlingService.handleMessage(unaryRequestSubject, statusControl).subscribe((messages) => {
-  messageToBeReleased.next(messages)
-})
-grpcService.createGrpcInstance(server1, messageToBeReleased, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
-
-/* Bidirectional streaming test case */
-// errorHandlingService.handleMessage(dataMessages, statusControl).subscribe((messages) => {
-//   messageToBeReleased.next(messages)
-// })
-// grpcService.createGrpcInstance(server1, messageToBeReleased, statusControl, { instanceType: 'client', serviceMethod: 'bidirectional' })
-// }
-
-setTimeout(() => {
-  messageToBeReleased.next(parsedMessages[0])
-}, 1000)
-setTimeout(() => {
-  messageToBeReleased.next(parsedMessages[1])
-}, 4000)
-setTimeout(() => {
-  unaryRequestSubject.next(parsedMessages[2])
-}, 7000)
-
-// this is just to publish an array of fake data as a Subject
-function stream(): Subject<any> {
-  let result: Subject<any> = new Subject()
-  let messages: any[] = parsedMessages
-  let count = 0
-  const intervalId = setInterval(() => {
-    result.next(messages[count]);
-    count++;
-    if (count >= 1000) {
-      clearInterval(intervalId);
-      result.complete();
-    }
-  }, 500)
-
-  return result
-}
-

+ 25 - 2
index.ts

@@ -1,2 +1,25 @@
-export * from './services/error.handling.service.fis';
-export * from './services/grpc.service';
+import { Subject, from } from 'rxjs';
+import * as fs from 'fs'
+export * from './services/fis.retransmission.service';
+export * from './services/grpc.service';
+
+const messagesJSON: any = fs.readFileSync('payload.json')
+let parsedMessages = JSON.parse(messagesJSON)
+function generateFakeStreamResponse(request: any): Subject<any> {
+    let res: Subject<any> = new Subject()
+    from(parsedMessages.slice(0, 10)).subscribe({
+      next: element => {
+        let message = {
+          id: request.id,
+          message: element
+        }
+        res.next(message)
+      },
+      error: error => console.error(error),
+      complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
+    })
+    return res
+  }
+  
+  generateFakeStreamResponse({ id: '123', message: 'DOnt lie to me boy' }).subscribe((e => console.log(e)))
+  

+ 2 - 1
interfaces/general.interface.ts

@@ -31,7 +31,8 @@ export interface ServerResponse {
 export interface ReportStatus {
     code: ColorCode,
     message: string,
-    payload?: any
+    payload?: any,
+    from: string | any
 }
 // https://grpc.io/docs/what-is-grpc/core-concepts/
 export interface GrpcConnectionType {

+ 3 - 2
package.json

@@ -8,8 +8,9 @@
     "watch": "tsc -p tsconfig.json --watch",
     "test": "echo \"Error: no test specified\" && exit 1",
     "generatedata": "node services/utility/generateData.js",
-    "grpc1": "node grpc/grpc1.js",
-    "grpc2": "node grpc/grpc2.js"
+    "grpc1": "node test/grpc1.js",
+    "grpc2": "node test/grpc2.js",
+    "testing": "node test/test.js"
   },
   "author": "",
   "license": "ISC",

+ 32 - 32
services/error.handling.service.fis.ts → services/fis.retransmission.service.ts

@@ -2,13 +2,13 @@ import * as _ from 'lodash'
 import * as fs from 'fs'
 import mongoose, { Model, Schema } from 'mongoose';
 import { Observable, Subject, Subscription, from } from 'rxjs'
-import { ColorCode, MessageLog, ReportStatus } from '../interfaces/general.interface'
+import { ColorCode, ReportStatus } from '../interfaces/general.interface'
 require('dotenv').config();
 
 // Implement status chain refactoring
-export class FisErrorHandlingService {
+export class FisRetransmissionService {
     private mongoUrl: string = process.env.MONGO + 'emergencyStorage'
-    private bufferedStorage: MessageLog[] = []
+    private bufferedStorage: any[] = []
     private mongoConnection: any
     private messageModel: any
     private maximumBufferLength: number = parseInt(process.env.MaxBufferLoad as string) // right now just put as 15
@@ -19,8 +19,8 @@ export class FisErrorHandlingService {
     }
 
     // Main function that intercepts outgoing messages by communicating || intepreting report status from grpc connection as indicator 
-    public handleMessage(messageToBePublished: Subject<MessageLog>, statusReport: Subject<ReportStatus>): Subject<MessageLog> {
-        let releaseMessageSubject: Subject<MessageLog> = new Subject() // A return value
+    public handleMessage(messageToBePublished: Subject<any>, statusReport: Subject<ReportStatus>): Subject<any> {
+        let releaseMessageSubject: Subject<any> = new Subject() // A return value
         // Using the concept of toggling to improve the eficacy of subscription control && data flow
         let messageReleaseSubscription: Subscription | null = null
         let messageBufferSubscription: Subscription | null = null
@@ -44,7 +44,7 @@ export class FisErrorHandlingService {
                     if (!messageReleaseSubscription) status = -1
                 }
                 if (status === 1) {
-                    this.releaseMessageFromLocalBuffer(this.bufferedStorage).then((resObs: Observable<MessageLog>) => {
+                    this.releaseMessageFromLocalBuffer(this.bufferedStorage).then((resObs: Observable<any>) => {
                         resObs.subscribe({
                             next: message => releaseMessageSubject.next(message),
                             error: err => console.error(err),
@@ -59,7 +59,7 @@ export class FisErrorHandlingService {
                     })
                 }
                 if (status === 1) {
-                    this.releaseMessageFromMongoStorage().then((resObs: Subject<MessageLog>) => {
+                    this.releaseMessageFromMongoStorage().then((resObs: Subject<any>) => {
                         resObs.subscribe({
                             next: message => releaseMessageSubject.next(message),
                             error: err => console.error(err),
@@ -77,7 +77,7 @@ export class FisErrorHandlingService {
             }
             if (report.code == ColorCode.YELLOW) {
                 if (report.payload) {
-                    console.log(`Rebuffering ${report.payload.appData?.msgId} into buffer...`)
+                    console.log(`Rebuffering ${report.payload.message?.appData?.msgId} into buffer...`)
                     this.bufferedStorage.push(report.payload)
                 }
                 console.log(`Connection status report && ${report.message ?? 'No Message'}`)
@@ -109,7 +109,7 @@ export class FisErrorHandlingService {
                     if (messageBufferSubscription) status = -1
                 }
                 if (status === 1) {
-                    this.transferBufferedMessagseToMongoStorage(this.bufferedStorage, messageBufferSubscription).then((res: MessageLog[]) => {
+                    this.transferBufferedMessageToMongoStorage(this.bufferedStorage, messageBufferSubscription).then((res: any[]) => {
                         if (res.length !== this.bufferedStorage.length || this.bufferedStorage.length > 0) status = -1 // this promise function should return an empty array
                     })
                 }
@@ -132,10 +132,10 @@ export class FisErrorHandlingService {
                 console.log(`Buffer length exceeds limit imposed!!!`)
                 let report: ReportStatus = {
                     code: ColorCode.RED,
-                    message: `Buffer is exceeding limit. Initiate storage transfer to designated database. `
+                    message: `Buffer is exceeding limit. Initiate storage transfer to designated database.`,
+                    from: `Error Handling Service`
                 }
                 statusReport.next(report)
-
             }
         })
     }
@@ -144,8 +144,8 @@ export class FisErrorHandlingService {
     private activateReleaseSubscription(messageReleaseSubscription, messageToBePublished, releaseMessageSubject): Subscription | null {
         if (!messageReleaseSubscription) {
             messageReleaseSubscription = messageToBePublished.subscribe({
-                next: (message: MessageLog) => {
-                    console.log(`Releasing ${message.appData.msgId}...`);
+                next: (message: any) => {
+                    console.log(`Releasing ${message.message.appData.msgId}...`);
                     releaseMessageSubject.next(message);
                 },
                 error: (err) => console.error(err),
@@ -171,11 +171,11 @@ export class FisErrorHandlingService {
     }
 
     // Begin to push the incoming messages into local instantarray
-    private activateBufferSubscription(bufferStorage: MessageLog[], messageBufferSubscription: Subscription | null, messageToBePublished: Subject<any>): Subscription | null {
+    private activateBufferSubscription(bufferStorage: any[], messageBufferSubscription: Subscription | null, messageToBePublished: Subject<any>): Subscription | null {
         if (!messageBufferSubscription) {
             messageBufferSubscription = messageToBePublished.subscribe({
-                next: (message: MessageLog) => {
-                    console.log(`Buffering ${message.appData.msgId}...  Local array length: ${bufferStorage.length}`);
+                next: (message: any) => {
+                    console.log(`Buffering ${message.message.appData.msgId}...  Local array length: ${bufferStorage.length}`);
                     bufferStorage.push(message)
                 },
                 error: (err) => console.error(err),
@@ -204,8 +204,8 @@ export class FisErrorHandlingService {
     private activateMongoStreamSubscription(messageStreamToMongo: Subscription | null, messageToBePublished: Subject<any>): Subscription | null {
         if (!messageStreamToMongo) {
             messageStreamToMongo = messageToBePublished.subscribe({
-                next: (message: MessageLog) => {
-                    console.log(`Saving ${message.appData.msgId}...`);
+                next: (message: any) => {
+                    console.log(`Saving ${message.message.appData.msgId}...`);
                     this.saveToMongo(message)
                 },
                 error: (err) => console.error(err),
@@ -231,22 +231,22 @@ export class FisErrorHandlingService {
     }
 
     // Store in json file in this project folder. To be enabled in future
-    private async transferMessageToLocalStorage(message: Subject<MessageLog>): Promise<void> {
-        let localArray: MessageLog[] = this.bufferedStorage
+    private async transferMessageToLocalStorage(message: Subject<any>): Promise<void> {
+        let localArray: any[] = this.bufferedStorage
         let filename = `localstorage.json`;
 
         while (localArray.length > 0) {
             let objectToWrite = this.bufferedStorage[0];
             await writeMessage(objectToWrite, filename)
         }
-        message.subscribe((message: MessageLog) => {
+        message.subscribe((message: any) => {
             writeMessage(message, filename)
         })
 
         if (localArray.length < 1) this.bufferedStorage = localArray
         console.log('Local Array is empty. Finished transferring to files.')
 
-        async function writeMessage(message: MessageLog, filename: string) {
+        async function writeMessage(message: any, filename: string) {
             try {
                 let stringifiedMessage = JSON.stringify(message);
                 await fs.promises.appendFile(filename, stringifiedMessage + "\r\n")
@@ -259,11 +259,11 @@ export class FisErrorHandlingService {
     }
 
     // To be used by mongoStreamSubscription to perform the saving execution
-    private async saveToMongo(message: MessageLog): Promise<boolean> {
+    private async saveToMongo(message: any): Promise<boolean> {
         return new Promise((resolve, reject) => {
             // let messageModel: Model<any> = this.mongoConnection.model('Message', require('../models/message.schema'))
             this.messageModel.create(message).then(() => {
-                console.log(`Saved MessageID ${message.appData.msgId} into ${this.mongoUrl}`);
+                console.log(`Saved MessageID ${message.message.appData.msgId} into ${this.mongoUrl}`);
                 resolve(true)
             }).catch((err) => {
                 console.log(`MongoSaveError: ${err.message}`)
@@ -273,13 +273,13 @@ export class FisErrorHandlingService {
     }
 
     // As the name implies, transder all the messages from the local instance into mongoStorage. Local instance should be emptied after transfer is completed
-    private async transferBufferedMessagseToMongoStorage(bufferedMessage: MessageLog[], messageBufferSubscription): Promise<MessageLog[]> {
+    private async transferBufferedMessageToMongoStorage(bufferedMessage: any[], messageBufferSubscription): Promise<any[]> {
         return new Promise((resolve, reject) => {
-            let bufferedStorage: Observable<MessageLog> = from(bufferedMessage)
+            let bufferedStorage: Observable<any> = from(bufferedMessage)
             bufferedStorage.subscribe({
-                next: (message: MessageLog) => {
+                next: (message: any) => {
                     this.saveToMongo(message).then((res) => {
-                        console.log(`Message ${message.appData.msgId} saved successfully...`)
+                        console.log(`Message ${message.message.appData.msgId} saved successfully...`)
                     }).catch((err) => console.error(err))
                 },
                 error: (error) => {
@@ -298,12 +298,12 @@ export class FisErrorHandlingService {
     }
 
     // Transfer stored messages from the local instance back into the stream to be returned to the caller.
-    private async releaseMessageFromLocalBuffer(bufferedStorage: MessageLog[]): Promise<Observable<MessageLog>> {
+    private async releaseMessageFromLocalBuffer(bufferedStorage: any[]): Promise<Observable<any>> {
         return new Promise((resolve, reject) => {
             if (bufferedStorage.length > 1) {
                 let caseVariable = this.bufferedStorage.length > 1;
                 console.log(`Releasing data from local buffer instance. There ${caseVariable ? "is" : "are"} ${this.bufferedStorage.length} messages...`);
-                let returnArrayObs: Observable<MessageLog> = from(bufferedStorage)
+                let returnArrayObs: Observable<any> = from(bufferedStorage)
                 resolve(returnArrayObs)
             } else {
                 let message = `There is no data in stored in local instance`
@@ -313,9 +313,9 @@ export class FisErrorHandlingService {
     }
 
     // Transder all the stored messages in designated mongo databases. It should be empty after all the data has been transferred.
-    private async releaseMessageFromMongoStorage(): Promise<Subject<MessageLog>> {
+    private async releaseMessageFromMongoStorage(): Promise<Subject<any>> {
         return new Promise((resolve, reject) => {
-            let dataSubject: Subject<MessageLog> = new Subject()
+            let dataSubject: Subject<any> = new Subject()
             this.extractAllMessages(dataSubject)
             resolve(dataSubject)
         })

+ 72 - 33
services/grpc.service.ts

@@ -6,9 +6,19 @@ const message_proto = require('./protos/server.proto')
 
 export class GrpcService {
     private grpcServerConnection: any = {}
+    private incomingRequest: Subject<any> = new Subject()
+    private incomingResponse: Subject<any> = new Subject()
 
     constructor() { }
 
+    public getIncomingRequest(): Subject<any> {
+        return this.incomingRequest
+    }
+
+    public getIncomingResponse(): Subject<any> {
+        return this.incomingResponse
+    }
+
     public async stopServer(serverUrl: string): Promise<any> {
         return new Promise((resolve, reject) => {
             if (this.grpcServerConnection[serverUrl]) {
@@ -71,7 +81,8 @@ export class GrpcService {
                     console.error(`Connection failed ${consecutiveResolutions} times. Stopping connection attempts.`);
                     let error: ReportStatus = {
                         code: ColorCode.RED,
-                        message: 'Initiate Doomsday protocol....'
+                        message: 'Initiate Doomsday protocol....',
+                        from: `GRPC instance management`
                     }
                     statusControl.next(error)
                 }
@@ -80,7 +91,8 @@ export class GrpcService {
                     let error: ReportStatus = {
                         code: ColorCode.YELLOW,
                         // message: `Reconnection Attempt: ${consecutiveResolutions}. Server has yet to respond`
-                        message: `Attempting reconnection... Server has yet to respond`
+                        message: `Attempting reconnection... Server has yet to respond`,
+                        from: `GRPC instance management`
                     }
                     statusControl.next(error);
                 }
@@ -121,7 +133,8 @@ export class GrpcService {
                         console.log(`Client connected from: ${call.getPeer()}`);
                         let report: ReportStatus = {
                             code: ColorCode.GREEN,
-                            message: `Client connected!!`
+                            message: `Client connected!!`,
+                            from: `Bidirectional Instance`
                         }
                         statusControl.next(report)
 
@@ -133,7 +146,8 @@ export class GrpcService {
                                     let report: ReportStatus = {
                                         code: ColorCode.YELLOW,
                                         message: `Client is not alive.....`,
-                                        payload: payload
+                                        payload: payload,
+                                        from: `Bidirectional Instance`
                                     }
                                     statusControl.next(report) // no connection. Tell buffer service to stop releasing messages
                                     subscription.unsubscribe() // i still dont understand why i wrote this here
@@ -271,45 +285,58 @@ export class GrpcService {
 
                 server.addService(message_proto.Message.service, {
                     HandleMessage: (call) => { // this is for bidirectional streaming. Need to have another one for unary calls for web clients
-                        let requestId = call.request // unary request from client to be responded with a stream
-                        console.log(`Processing unary call.... requestId: ${requestId.id}`)
-                        console.log(`Client connected from: ${call.getPeer()}`);
-
                         let report: ReportStatus = { //let the flow come through 
                             code: ColorCode.GREEN,
-                            message: `Client connected!!`
+                            message: `Client connected!!`,
+                            from: `Server Streaming Instance`
                         }
                         statusControl.next(report)
 
-                        let subscription: Subscription = messageToBeStream.pipe(take(15)).subscribe({
-                            next: (payload: any) => {
-                                let noConnection = call.cancelled // check connection for each and every message
-                                if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check 
-                                    let report: ReportStatus = {
-                                        code: ColorCode.YELLOW,
-                                        message: `Client is not alive.....`,
-                                        payload: payload
-                                    }
-                                    statusControl.next(report)
-                                    subscription.unsubscribe()
-                                } else {
-                                    console.log(`Sending ${payload.appData.msgId} in respond to unary ${requestId.id}`)
-                                    let message: string = JSON.stringify(payload)
-                                    call.write({ message })
+                        let request = call.request // unary request from client to be responded with a stream
+                        console.log(`Received unary call.... request: ${request.id}`)
+                        this.incomingRequest.next(request)
+                        console.log(`Client connected from: ${call.getPeer()}`);
+
 
+                        let subscription: Subscription = messageToBeStream.subscribe({
+                            next: (response: any) => {
+                                if (response.id == request.id) {
+                                    // console.log(`${response.id} vs ${request.id}`)
+                                    // Check who's response it belongs to
+                                    let noConnection = call.cancelled // check connection for each and every message
+                                    if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check 
+                                        let report: ReportStatus = {
+                                            code: ColorCode.YELLOW,
+                                            message: `Client is not alive.....`,
+                                            payload: response,
+                                            from: `Server Streaming Instance`
+                                        }
+                                        statusControl.next(report)
+                                        subscription.unsubscribe()
+                                    } else {
+                                        console.log(`Sending ${response.message.appData.msgId} in respond to unary ${request.id}`)
+                                        // let respond: string = JSON.stringify(response.message)
+                                        let message = {
+                                            id: response.id,
+                                            message: JSON.stringify(response.message)
+                                        }
+                                        // console.log(message)
+                                        call.write(message)
+                                    }
                                 }
                             },
                             error: err => {
                                 console.error(err)
                                 let report: ReportStatus = {
                                     code: ColorCode.YELLOW,
-                                    message: `Message streaming error`
+                                    message: `Message streaming error`,
+                                    from: `Server Streaming Instance`
                                 }
                                 statusControl.next(report)
                                 subscription.unsubscribe()
                             },
                             complete: () => {
-                                console.log(`Stream response completed for ${requestId.id}`)
+                                console.log(`Stream response completed for ${request.id}`)
                                 subscription.unsubscribe()
                                 // call.end()
                             }
@@ -342,7 +369,10 @@ export class GrpcService {
     private async createServerStreamingClient(server: string, alreadyHealthCheck: boolean, unaryRequestSubject: Subject<any>, statusControl: Subject<ReportStatus>): Promise<string> {
         return new Promise(async (resolve, reject) => {
             const client = new message_proto.Message(server, grpc.credentials.createInsecure());
-            this.checkConnectionHealth(client, statusControl, alreadyHealthCheck) // atcually there's no need for this 
+            this.checkConnectionHealth(client, statusControl, alreadyHealthCheck) // perform check to see if server is alive, if not terminate this grpc instant and create again
+
+            /* Memory leak some where here */
+
             unaryRequestSubject.subscribe({
                 next: (request: any) => {
                     let message = {
@@ -363,20 +393,28 @@ export class GrpcService {
                             resolve('No connection established. Server is not responding..')
                             let report = {
                                 code: ColorCode.YELLOW,
-                                message: `Server doesn't seem to be alive. Error returned.`
+                                message: `Server doesn't seem to be alive. Error returned.`,
+                                from: `Server Streaming Client Instance`
                             }
                             statusControl.next(report)
                         }
                     });
 
                     call.on('data', (data: any) => {
-                        console.log(`Received stream response from Server. Receiver: ${message.id}`);
+                        // console.log(`Received stream response from Server. Receiver: ${message.id}`);
+                        let response = {
+                            id: data.id,
+                            message: JSON.parse(data.message)
+                        }
+                        console.log(response)
+                        this.incomingResponse.next(response)
                     });
 
                     call.on('error', (err) => {
                         let report = {
                             code: ColorCode.YELLOW,
-                            message: `Server doesn't seem to be alive. Error returned.`
+                            message: `Server doesn't seem to be alive. Error returned.`,
+                            from: `Server Streaming Client Instance`
                         }
                         statusControl.next(report)
                         // resolve(err)
@@ -386,7 +424,8 @@ export class GrpcService {
                         console.log(`Streaming Response is completed`)
                         let report = {
                             code: ColorCode.YELLOW,
-                            message: `Server doesn't seem to be alive. Error returned.`
+                            message: `Server doesn't seem to be alive. Error returned.`,
+                            from: `Server Streaming Client Instance`
                         }
                         statusControl.next(report)
                         // subscription.unsubscribe(); // this is not correct i am just destroying the entire operation. i should be terminating the instance to which i think it does by it self
@@ -403,7 +442,6 @@ export class GrpcService {
                 complete: () => { } // should not complete since this is an indefinite listening process to transmit requests made by relevant client application
             })
 
-
         })
     }
 
@@ -415,7 +453,8 @@ export class GrpcService {
                     console.log(`GRPC Health check status: ${response.status} Server Connected`);
                     let report: ReportStatus = {
                         code: ColorCode.GREEN,
-                        message: `Good to go!!!`
+                        message: `Good to go!!!`,
+                        from: `GRPC health check`
                     }
                     statusControl.next(report)
                 } else {

+ 77 - 0
test/grpc1.ts

@@ -0,0 +1,77 @@
+import { Subject, from, take, takeUntil } from "rxjs";
+import { GrpcService } from "../services/grpc.service";
+import * as fs from 'fs'
+import { FisRetransmissionService } from "../services/fis.retransmission.service";
+import { ReportStatus } from "../interfaces/general.interface";
+
+const messagesJSON: any = fs.readFileSync('payload.json')
+const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
+const gprcService: GrpcService = new GrpcService()
+let incomingRequest: Subject<any> = gprcService.getIncomingRequest()
+let applicationOutgoingResponse: Subject<any> = new Subject()
+let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial 
+let dataMessages = stream() // Emulate messges to be sent over to target server
+let messageToBePublished: Subject<any> = new Subject()
+let statusControl: Subject<ReportStatus> = new Subject()
+
+incomingRequest.subscribe({
+  next: request => { // this whole thing is for 1 request
+    if (request.id == '0000') {
+      console.log(`Just checking for buffer. Dont do anything else!`)
+    } else {
+      generateFakeStreamResponse(request).subscribe({
+        next: (responseMessage) => {
+          // console.log(`Processing request:${request.id}....`)
+          applicationOutgoingResponse.next(responseMessage)
+        },
+        error: error => console.error(error),
+        complete: () => {
+          console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
+        }
+      })
+    }
+  },
+  error: error => console.error(error),
+  complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+})
+
+/* For server streaming */
+errorHandlingService.handleMessage(applicationOutgoingResponse, statusControl).subscribe((messages) => {
+  messageToBePublished.next(messages)
+})
+let server1 = 'localhost:3000'
+gprcService.createGrpcInstance(server1, messageToBePublished, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' })
+
+
+// this is just to publish an array of fake data as a Subject
+function stream(): Subject<any> {
+  let result: Subject<any> = new Subject()
+  let messages: any[] = parsedMessages
+  let count = 0
+  const intervalId = setInterval(() => {
+    result.next(messages[count]);
+    count++;
+    if (count >= 1000) {
+      clearInterval(intervalId);
+      result.complete();
+    }
+  }, 500)
+
+  return result
+}
+
+function generateFakeStreamResponse(request: any): Subject<any> {
+  let res: Subject<any> = new Subject()
+  stream().pipe(take(10)).subscribe({
+    next: element => {
+      let message = {
+        id: request.id, // Caller's 
+        message: element
+      }
+      res.next(message)
+    },
+    error: error => console.error(error),
+    complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
+  })
+  return res
+}

+ 76 - 0
test/grpc2.ts

@@ -0,0 +1,76 @@
+import * as fs from 'fs'
+import { Subject, groupBy, mergeMap, toArray } from 'rxjs';
+import { ColorCode, ReportStatus } from '../interfaces/general.interface';
+import { GrpcService } from '../services/grpc.service';
+import { FisRetransmissionService } from '../services/fis.retransmission.service';
+
+// Subject for bidirectional communication
+const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
+const grpcService: GrpcService = new GrpcService()
+const messagesJSON: any = fs.readFileSync('payload.json')
+let incomingResponse: Subject<any> = grpcService.getIncomingResponse()
+let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial 
+let messageToBeReleased: Subject<any> = new Subject() // Sample message to be transmitted over to target server
+let statusControl: Subject<ReportStatus> = new Subject() // Listening for error events and states
+let dataMessages = stream() // Emulate messges to be sent over to target server
+let server1: string = 'localhost:3000'
+let unaryRequestSubject: Subject<any> = new Subject()
+let array: any[] = []
+
+incomingResponse.subscribe({
+  next: request => {
+    console.log(`To be distributed to request:${request.id} => message: ${request.message.appData.msgId}`)
+    array.push(request)
+    // Have to create a function that creates observables/subjects corresponding to the request made by the client to stream the responses 
+
+    /* now is one request will have it's own listener. If the client is down all listeners instantiated from all the request are terminated.
+    Server now doesn't care because when it proceses all the request, the response are merged into one channel. My not so clever solution is that
+    when client starts, it wil have to first send a request specifically just to grab the data that was loss, and then let the internal 
+    application do the sorting */
+
+    /* To really see how this work, i will create an array. Since i wil pump in 3 request, the server side is hardcoded atm to stream 10 messagse
+    for one request. I am going to terminate the client halfway through, change the code, so that when i start the client again, it will only send
+    1 request over, there making another request for 10 more. But of course, in the real implementation, this can be a initializer just to see if 
+    there's any buffered messages to be sent over
+     */
+  }
+})
+
+/* Server Streaming Test case */
+errorHandlingService.handleMessage(unaryRequestSubject, statusControl).subscribe((messages) => {
+  messageToBeReleased.next(messages)
+})
+grpcService.createGrpcInstance(server1, messageToBeReleased, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
+
+messageToBeReleased.next(parsedMessages[0])
+setTimeout(() => {
+  messageToBeReleased.next(parsedMessages[1])
+}, 2000)
+setTimeout(() => {
+  messageToBeReleased.next(parsedMessages[2])
+}, 3000)
+setTimeout(() => {
+  messageToBeReleased.next(parsedMessages[3])
+}, 5000)
+
+setTimeout(() => {
+  console.log(`Total messages received: ${array.length}`)
+}, 11000)
+
+// this is just to publish an array of fake data as a Subject
+function stream(): Subject<any> {
+  let result: Subject<any> = new Subject()
+  let messages: any[] = parsedMessages
+  let count = 0
+  const intervalId = setInterval(() => {
+    result.next(messages[count]);
+    count++;
+    if (count >= 1000) {
+      clearInterval(intervalId);
+      result.complete();
+    }
+  }, 500)
+
+  return result
+}
+

+ 0 - 0
grpc/health.ts → test/health.ts


+ 38 - 0
test/test.ts

@@ -0,0 +1,38 @@
+import { Subject, from } from 'rxjs';
+import { groupBy, mergeMap } from 'rxjs/operators';
+
+// Sample response data
+const responseData = [
+  { who: 'user1', message: 'Hello from user1' },
+  { who: 'user2', message: 'Hi there from user2' },
+  { who: 'user1', message: 'Another message from user1' },
+  { who: 'user3', message: 'Message from user3' },
+];
+
+// Create a Subject to receive the response
+const responseSubject = new Subject();
+
+// Use groupBy to split the response into observables based on 'who'
+const groupedSubjects: Record<string, Subject<any>> = {};
+
+// Create Observables based on 'who' property
+const groupedObservables = responseData.map((message) => {
+  if (!groupedSubjects[message.who]) {
+    groupedSubjects[message.who] = new Subject();
+  }
+  return groupedSubjects[message.who].asObservable();
+});
+
+// Merge all Observables into a single stream
+from(groupedObservables)
+  .pipe(
+    mergeMap((obs) => obs)
+  )
+  .subscribe((message) => {
+    console.log(`Received message from ${message.who}: ${message.message}`);
+  });
+
+// Push data into the responseSubject
+responseData.forEach((data) => {
+  responseSubject.next(data);
+});