Ver Fonte

revised grpc transport mechanism

enzo há 11 meses atrás
pai
commit
58b4fb5f5d
10 ficheiros alterados com 329 adições e 343 exclusões
  1. 2 2
      .env
  2. 1 1
      interfaces/general.interface.ts
  3. 1 0
      package.json
  4. 37 35
      services/fis.retransmission.service.ts
  5. 15 19
      services/grpc.service.ts
  6. 58 219
      services/service.method.ts
  7. 61 22
      test/grpc1.ts
  8. 62 45
      test/grpc2.ts
  9. 92 0
      test/grpc3.ts
  10. 0 0
      test/health.ts

+ 2 - 2
.env

@@ -9,8 +9,8 @@ BUILD_PROCESS=yes
 
 Storage = 'Mongo'
 
-ReconnectionAttempt = '60'
+ReconnectionAttempt = '1000'
 
 TimeOut = '10'
 
-MaxBufferLoad = '10'
+MaxBufferLoad = '1'

+ 1 - 1
interfaces/general.interface.ts

@@ -40,7 +40,7 @@ export interface GrpcConnectionType {
     serviceMethod: '' | 'unary' | 'server streaming' | 'client streaming' | 'bidirectional'
 }
 
-export interface GrpcMessage {
+export interface Message {
     id: string,
     message: MessageLog | string
 }

+ 1 - 0
package.json

@@ -10,6 +10,7 @@
     "generatedata": "node services/utility/generateData.js",
     "grpc1": "node test/grpc1.js",
     "grpc2": "node test/grpc2.js",
+    "grpc3": "node test/grpc3.js",
     "testing": "node test/test.js"
   },
   "author": "",

+ 37 - 35
services/fis.retransmission.service.ts

@@ -1,17 +1,16 @@
 import * as _ from 'lodash'
-import * as fs from 'fs'
 import mongoose, { Model, Schema } from 'mongoose';
 import { Observable, Subject, Subscription, from } from 'rxjs'
-import { ColorCode, GrpcMessage, MessageLog, ReportStatus, Status } from '../interfaces/general.interface'
+import { ColorCode, Message, MessageLog, ReportStatus, Status } from '../interfaces/general.interface'
 require('dotenv').config();
 
 // Implement status chain refactoring
 export class FisRetransmissionService {
     private mongoUrl: string = process.env.MONGO + 'emergencyStorage'
-    private bufferedStorage: any[] = []
+    private bufferedStorage: Message[] = []
     private mongoConnection: any
     private messageModel: any
-    private maximumBufferLength: number = parseInt(process.env.MaxBufferLoad as string) // right now just put as 15
+    private maximumBufferLength: number = parseInt(process.env.MaxBufferLoad as string) // please configure at environment
 
     constructor() {
         // Connect to mongoDB. 
@@ -19,15 +18,16 @@ export class FisRetransmissionService {
     }
 
     // Main function that intercepts outgoing messages by communicating || intepreting report status from grpc connection as indicator 
-    public handleMessage(messageToBePublished: Subject<GrpcMessage>, statusReport: Subject<ReportStatus>): Subject<GrpcMessage> {
-        let releaseMessageSubject: Subject<GrpcMessage> = new Subject() // A return value
-        // Using the concept of toggling to improve the eficacy of subscription control && data flow
+    public handleMessage(applicationOutgoingMessage: Subject<Message>, statusReport: Subject<ReportStatus>): Subject<Message> {
+        let releaseMessageSubject: Subject<Message> = new Subject() // Every message subscribed from applicationOutgoingMessage will be released through this subject
         let messageReleaseSubscription: Subscription | null = null
         let messageBufferSubscription: Subscription | null = null
         let messageStreamToMongo: Subscription | null = null
-        this.checkBufferLimit(messageToBePublished, statusReport)
+        this.checkBufferLimit(applicationOutgoingMessage, statusReport)
         statusReport.subscribe((report: ReportStatus) => {
-            if (report.code == ColorCode.GREEN) {
+            /* Green should release all data from buffer and mongo and also redirect the applicationOutgoingMessage back into the return subject(releaseMessageSubject)
+            if there's any. */
+            if (report.code == ColorCode.GREEN) { 
                 console.log(`Connection status report && ${report.message ?? 'No Message'}`)
                 /* Status Chain begins */
                 let status: Status = 1
@@ -40,11 +40,11 @@ export class FisRetransmissionService {
                     if (messageBufferSubscription) status = 0
                 }
                 if (status === 1) {
-                    messageReleaseSubscription = this.activateReleaseSubscription(messageReleaseSubscription, messageToBePublished, releaseMessageSubject)
+                    messageReleaseSubscription = this.activateReleaseSubscription(messageReleaseSubscription, applicationOutgoingMessage, releaseMessageSubject)
                     if (!messageReleaseSubscription) status = 0
                 }
                 if (status === 1) {
-                    this.releaseMessageFromLocalBuffer(this.bufferedStorage).then((resObs: Observable<GrpcMessage>) => {
+                    this.releaseMessageFromLocalBuffer(this.bufferedStorage).then((resObs: Observable<Message>) => {
                         resObs.subscribe({
                             next: message => releaseMessageSubject.next(message),
                             error: err => console.error(err),
@@ -59,7 +59,7 @@ export class FisRetransmissionService {
                     })
                 }
                 if (status === 1) {
-                    this.releaseMessageFromMongoStorage().then((resObs: Subject<GrpcMessage>) => {
+                    this.releaseMessageFromMongoStorage().then((resObs: Subject<Message>) => {
                         resObs.subscribe({
                             next: message => releaseMessageSubject.next(message),
                             error: err => console.error(err),
@@ -73,19 +73,18 @@ export class FisRetransmissionService {
                 if (status === 0) {
                     console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
                 }
-
             }
+            /* Start buffering the messages coming in from applicationOutgonigMessages and also stop it from flowing into the release subject */
             if (report.code == ColorCode.YELLOW) {
                 if (report.payload) {
                     console.log(`Rebuffering ${report.payload.message?.appData?.msgId} into buffer...`)
                     this.bufferedStorage.push(report.payload)
                 }
                 console.log(`Connection status report && ${report.message ?? 'No Message'}`)
-
                 let status: Status = 1
                 /* Status Chain begins */
                 if (status === 1) {
-                    messageBufferSubscription = this.activateBufferSubscription(this.bufferedStorage, messageBufferSubscription, messageToBePublished)
+                    messageBufferSubscription = this.activateBufferSubscription(this.bufferedStorage, messageBufferSubscription, applicationOutgoingMessage)
                     if (!messageBufferSubscription) status = 0
                 }
                 if (status === 1) {
@@ -96,12 +95,13 @@ export class FisRetransmissionService {
                     console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
                 }
             }
+            /* Stop buffering the message in local instance, but start saving them in database. Must first transfer the ones in local buffer before redirecting the 
+            flow from applicationOutgoingMessage into Mongo */
             if (report.code == ColorCode.RED) {
                 console.log(`Connection status report: Server down. ${report.message} lol`)
-
                 let status: Status = 1
                 if (status === 1) {
-                    messageStreamToMongo = this.activateMongoStreamSubscription(messageStreamToMongo, messageToBePublished)
+                    messageStreamToMongo = this.activateMongoStreamSubscription(messageStreamToMongo, applicationOutgoingMessage)
                     if (!messageStreamToMongo) status = 0
                 }
                 if (status === 1) {
@@ -116,7 +116,6 @@ export class FisRetransmissionService {
                 if (status === 0) {
                     console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
                 }
-
             }
             if (!report.code) {
                 console.log(`Unknown message...`)
@@ -125,7 +124,9 @@ export class FisRetransmissionService {
         return releaseMessageSubject
     }
 
-    private checkBufferLimit(message: Subject<GrpcMessage>, statusReport: Subject<ReportStatus>) {
+    // IF Buffer exceeds a certain limit, it will trigger RED. Configure in .env file. There's the concern of 2 RED status, one from this and another from other means.
+    // Behaviour of this needs to be investigated further
+    private checkBufferLimit(message: Subject<Message>, statusReport: Subject<ReportStatus>) {
         let status: Status = 1
         if (status = 1) {
             message.subscribe(() => {
@@ -144,12 +145,12 @@ export class FisRetransmissionService {
     }
 
     // Release the incoming Messages to be returned to the caller
-    private activateReleaseSubscription(messageReleaseSubscription: Subscription | null, messageToBePublished: Subject<GrpcMessage>, releaseMessageSubject: Subject<GrpcMessage>): Subscription | null {
+    private activateReleaseSubscription(messageReleaseSubscription: Subscription | null, applicationOutgoingMessage: Subject<Message>, releaseMessageSubject: Subject<Message>): Subscription | null {
         let status: Status = 1
         if (status = 1) {
             if (!messageReleaseSubscription) {
-                messageReleaseSubscription = messageToBePublished.subscribe({
-                    next: (message: GrpcMessage) => {
+                messageReleaseSubscription = applicationOutgoingMessage.subscribe({
+                    next: (message: Message) => {
                         console.log(`Releasing ${(message.message as MessageLog).appData.msgId}...`);
                         releaseMessageSubject.next(message);
                     },
@@ -165,7 +166,7 @@ export class FisRetransmissionService {
         return messageReleaseSubscription
     }
 
-    // Stop the incoming Messaes to be returned to caller
+    // Stop the incoming Messages to be returned to caller
     private deactivateReleaseSubscription(messageReleaseSubscription: Subscription | null): Subscription | null {
         let status: Status = 1
         if (status = 1) {
@@ -181,11 +182,11 @@ export class FisRetransmissionService {
     }
 
     // Begin to push the incoming messages into local instantarray
-    private activateBufferSubscription(bufferStorage: GrpcMessage[], messageBufferSubscription: Subscription | null, messageToBePublished: Subject<GrpcMessage>): Subscription | null {
+    private activateBufferSubscription(bufferStorage: Message[], messageBufferSubscription: Subscription | null, applicationOutgoingMessage: Subject<Message>): Subscription | null {
         let status: Status = 1
         if (status = 1) {
             if (!messageBufferSubscription) {
-                messageBufferSubscription = messageToBePublished.subscribe({
+                messageBufferSubscription = applicationOutgoingMessage.subscribe({
                     next: (message: any) => {
                         console.log(`Buffering ${(message.message as MessageLog).appData.msgId}...  Local array length: ${bufferStorage.length}`);
                         bufferStorage.push(message)
@@ -220,11 +221,11 @@ export class FisRetransmissionService {
     }
 
     // Change the streaming direction of the incoming messages into mongo streaming subject( to be saved in local databse )
-    private activateMongoStreamSubscription(messageStreamToMongo: Subscription | null, messageToBePublished: Subject<GrpcMessage>): Subscription | null {
+    private activateMongoStreamSubscription(messageStreamToMongo: Subscription | null, applicationOutgoingMessage: Subject<Message>): Subscription | null {
         let status: Status = 1
         if (status = 1) {
             if (!messageStreamToMongo) {
-                messageStreamToMongo = messageToBePublished.subscribe({
+                messageStreamToMongo = applicationOutgoingMessage.subscribe({
                     next: (message: any) => {
                         console.log(`Saving ${(message.message as MessageLog).appData.msgId}...`);
                         this.saveToMongo(message)
@@ -258,7 +259,7 @@ export class FisRetransmissionService {
     }
 
     // To be used by mongoStreamSubscription to perform the saving execution
-    private async saveToMongo(message: GrpcMessage): Promise<boolean> {
+    private async saveToMongo(message: Message): Promise<boolean> {
         return new Promise((resolve, reject) => {
             // let messageModel: Model<any> = this.mongoConnection.model('Message', require('../models/message.schema'))
             this.messageModel.create(message).then(() => {
@@ -272,11 +273,11 @@ export class FisRetransmissionService {
     }
 
     // As the name implies, transder all the messages from the local instance into mongoStorage. Local instance should be emptied after transfer is completed
-    private async transferBufferedMessageToMongoStorage(bufferedMessage: GrpcMessage[], messageBufferSubscription: Subscription | null): Promise<GrpcMessage[]> {
+    private async transferBufferedMessageToMongoStorage(bufferedMessage: Message[], messageBufferSubscription: Subscription | null): Promise<Message[]> {
         return new Promise((resolve, reject) => {
             let status: Status = 1
             if (status = 1) {
-                let bufferedStorage: Observable<GrpcMessage> = from(bufferedMessage)
+                let bufferedStorage: Observable<Message> = from(bufferedMessage)
                 bufferedStorage.subscribe({
                     next: (message: any) => {
                         this.saveToMongo(message).then((res) => {
@@ -300,14 +301,14 @@ export class FisRetransmissionService {
     }
 
     // Transfer stored messages from the local instance back into the stream to be returned to the caller.
-    private async releaseMessageFromLocalBuffer(bufferedStorage: GrpcMessage[]): Promise<Observable<GrpcMessage>> {
+    private async releaseMessageFromLocalBuffer(bufferedStorage: Message[]): Promise<Observable<Message>> {
         return new Promise((resolve, reject) => {
             let status: Status = 1
             if (status = 1) {
                 if (bufferedStorage.length > 1) {
                     let caseVariable = this.bufferedStorage.length > 1;
                     console.log(`Releasing data from local buffer instance. There ${caseVariable ? "is" : "are"} ${this.bufferedStorage.length} messages...`);
-                    let returnArrayObs: Observable<GrpcMessage> = from(bufferedStorage)
+                    let returnArrayObs: Observable<Message> = from(bufferedStorage)
                     resolve(returnArrayObs)
                 } else {
                     let message = `There is no data in stored in local instance`
@@ -318,11 +319,11 @@ export class FisRetransmissionService {
     }
 
     // Transder all the stored messages in designated mongo databases. It should be empty after all the data has been transferred.
-    private async releaseMessageFromMongoStorage(): Promise<Subject<GrpcMessage>> {
+    private async releaseMessageFromMongoStorage(): Promise<Subject<Message>> {
         return new Promise((resolve, reject) => {
             let status: Status = 1
             if (status = 1) {
-                let dataSubject: Subject<GrpcMessage> = new Subject()
+                let dataSubject: Subject<Message> = new Subject()
                 this.extractAllMessages(dataSubject)
                 resolve(dataSubject)
             }
@@ -360,7 +361,8 @@ export class FisRetransmissionService {
         }
     }
 
-    public async extractAllMessages(subjectArgs: Subject<GrpcMessage>): Promise<void> {
+    // This will be used to release all the hostage messages once the light is green.
+    public async extractAllMessages(subjectArgs: Subject<Message>): Promise<void> {
         // Need to resolve the issue of streaming in a specific order that is sequential
         let status: Status = 1
         if (status = 1) {

+ 15 - 19
services/grpc.service.ts

@@ -1,21 +1,16 @@
 import { Subject, Subscription, take, takeUntil } from 'rxjs';
-import { ColorCode, GrpcConnectionType, MessageLog, ReportStatus } from '../interfaces/general.interface';
+import { ColorCode, GrpcConnectionType, Message, ReportStatus } from '../interfaces/general.interface';
 import { GrpcServiceMethod } from './service.method';
 
 
 export class GrpcService {
     private grpcServerConnection: any = {}
-    private incomingRequest: Subject<any> = new Subject()
-    private incomingResponse: Subject<any> = new Subject()
+    private incomingMessage: Subject<any> = new Subject()
 
     constructor(private grpcServiceMethod: GrpcServiceMethod) { }
 
-    public getIncomingRequest(): Subject<any> {
-        return this.incomingRequest
-    }
-
-    public getIncomingResponse(): Subject<any> {
-        return this.incomingResponse
+    public getIncomingMessage(): Subject<Message> {
+        return this.incomingMessage
     }
 
     public async stopServer(serverUrl: string): Promise<any> {
@@ -45,8 +40,8 @@ export class GrpcService {
     }
 
     // To be migrated into a service in the immediate future
-    public async createGrpcInstance(serverUrl: string, messageToBePublished: Subject<MessageLog>, reportStatus: Subject<ReportStatus>, connectionType: GrpcConnectionType) {
-        let messageToBeTransmitted: Subject<MessageLog> = messageToBePublished
+    public async createGrpcInstance(serverUrl: string, reportStatus: Subject<ReportStatus>, connectionType: GrpcConnectionType, messageToBePublished?: Subject<Message>) {
+        let messageToBeTransmitted: Subject<Message> = messageToBePublished ?? new Subject()
         let statusControl: Subject<ReportStatus> = reportStatus
         let consecutiveResolutions = 0;
         let lastResolutionTime = Date.now();
@@ -56,18 +51,19 @@ export class GrpcService {
 
         while (true) {
             try {
-                if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'bidirectional') {
-                    await this.grpcServiceMethod.createBidirectionalStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl, this.incomingResponse);
-                }
                 if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'server streaming') {
-                    await this.grpcServiceMethod.createServerStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl, this.incomingResponse);
-                }
-                if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'bidirectional') {
-                    await this.grpcServiceMethod.createGrpcBidirectionalServer(serverUrl, messageToBeTransmitted, statusControl, this.grpcServerConnection, this.incomingRequest)
+                    await this.grpcServiceMethod.createServerStreamingClient(serverUrl, alreadyHealthCheck, statusControl, this.incomingMessage);
                 }
                 if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'server streaming') {
-                    await this.grpcServiceMethod.createServerStreamingServer(serverUrl, alreadyHealthCheck, messageToBePublished, statusControl, this.grpcServerConnection, this.incomingRequest)
+                    await this.grpcServiceMethod.createServerStreamingServer(serverUrl, this.grpcServerConnection, messageToBeTransmitted)
                 }
+                /* To be enabled again if there's a need for bidiretional streaming */
+                // if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'bidirectional') {
+                //     await this.grpcServiceMethod.createBidirectionalStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl, this.incomingMessage);
+                // }
+                // if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'bidirectional') {
+                //     await this.grpcServiceMethod.createGrpcBidirectionalServer(serverUrl, messageToBeTransmitted, statusControl, this.grpcServerConnection)
+                // }
                 // If connection resolves (indicating failure), increment the count
                 consecutiveResolutions++;
                 // console.log(`Reconnection Attempt: ${consecutiveResolutions}`)

+ 58 - 219
services/service.method.ts

@@ -1,18 +1,17 @@
 import * as grpc from '@grpc/grpc-js';
 import { Subject, Subscription } from "rxjs";
-import { ReportStatus, ColorCode, GrpcMessage, MessageLog } from "../interfaces/general.interface";
+import { ReportStatus, ColorCode, Message, MessageLog } from "../interfaces/general.interface";
 import { Status } from '@grpc/grpc-js/build/src/constants';
+import { error } from 'console';
 const message_proto = require('./protos/server.proto')
 
 export class GrpcServiceMethod {
 
+    // Create Server Instance to stream all application Outgoing messages
     public async createServerStreamingServer(
         serverUrl: string,
-        alreadyHealthCheck: boolean,
-        messageToBeStream: Subject<any>,
-        statusControl: Subject<ReportStatus>,
         grpcServerConnection: any,
-        incomingRequest: Subject<GrpcMessage>
+        messageToBeStream: Subject<Message>
     ): Promise<any> { // '0.0.0.0:3001'
         return new Promise((resolve, reject) => {
             try {
@@ -20,70 +19,32 @@ export class GrpcServiceMethod {
                 let server: grpc.Server = new grpc.Server();
                 // Add the streamingData function to the gRPC service
                 // Define your message_proto.Message service methods
-
                 server.addService(message_proto.Message.service, {
                     HandleMessage: (call) => {
-                        incomingRequest.next(call.request)
-                        console.log(call.request)
-                        console.log(`Intializing main stream response. Confirmation from ${call.request.id}`)
-                        // This will be the main channel for streaming them response messages
-                        let report: ReportStatus = { //let the flow come through 
-                            code: ColorCode.GREEN,
-                            message: `Client connected!!`,
-                            from: `Server Streaming Instance`
-                        }
-                        statusControl.next(report)
+                        // console.log(call.request)
+                        console.log(`Intializing stream. Opening Channel. Confirmation from ${call.getPeer()}`)
 
                         let subscription: Subscription = messageToBeStream.subscribe({
-                            next: (response: GrpcMessage) => {
-                                // console.log(`${response.id} vs ${request.id}`)
-                                // Check who's response it belongs to
-                                let noConnection = call.cancelled // check connection for each and every message
-                                if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check 
-                                    let report: ReportStatus = {
-                                        code: ColorCode.YELLOW,
-                                        message: `Client is not alive.....`,
-                                        payload: response,
-                                        from: `Server Streaming Instance`
-                                    }
-                                    statusControl.next(report)
-                                    subscription.unsubscribe()
-                                } else {
-                                    console.log(`Sending ${(response.message as MessageLog).appData.msgId} in respond to request: ${call.request.id}`)
-                                    let message = {
-                                        id: response.id,
-                                        message: JSON.stringify(response.message)
-                                    }
-                                    call.write(message)
+                            next: (response: Message) => {
+                                console.log(`Sending ${(response.message as MessageLog).appData.msgId}`)
+                                let message = {
+                                    id: response.id,
+                                    message: JSON.stringify(response.message)
                                 }
+                                call.write(message)
                             },
                             error: err => {
                                 console.error(err)
-                                let report: ReportStatus = {
-                                    code: ColorCode.YELLOW,
-                                    message: `Message streaming error`,
-                                    from: `Server Streaming Instance`
-                                }
-                                statusControl.next(report)
                                 subscription.unsubscribe()
+                                resolve('')
                             },
                             complete: () => {
                                 console.log(`Stream response completed for ${call.request.id}`)
                                 subscription.unsubscribe()
+                                resolve('')
                                 // call.end()
                             }
                         })
-
-                        if (call.request.id != '0000') {
-                            console.log(call.request)
-                            /* Case from handling incoming request from clients. This no longer takes into consideration where the request is coming
-                            from. If the client is subscribed to the server, it will receive it's due. */
-                            // console.log(`Client connected from: ${call.getPeer()}`);
-                            let request = call.request // unary request from client to be responded with a stream
-                            console.log(`Received unary call.... request: ${request.id}`)
-                            call.cancel()
-                        }
-
                     },
 
                     Check: (_, callback) => {
@@ -108,191 +69,68 @@ export class GrpcServiceMethod {
         })
     }
 
-    // Create a server streaming call. Please note that the structure of the code would not be the same as bidirectional because of it's unary nature
+    // Send a request over to the other server to open a channel for this server to emit/stream messages over
     public async createServerStreamingClient(
         server: string,
         alreadyHealthCheck: boolean,
-        unaryRequestSubject: Subject<any>,
         statusControl: Subject<ReportStatus>,
-        incomingResponse: Subject<GrpcMessage>
+        incomingMessage: Subject<Message>
     ): Promise<string> {
         return new Promise(async (resolve, reject) => {
             const client = new message_proto.Message(server, grpc.credentials.createInsecure());
 
-            unaryRequestSubject.subscribe({
-                next: (request: any) => {
-                    let message = {
-                        id: request.id,
-                        message: JSON.stringify(request.message)
+            // perform check to see if server is alive, if not terminate this grpc instant and create again
+            this.checkConnectionHealth(client, statusControl, alreadyHealthCheck).catch((error) => {
+                resolve('')
+            })
+            // this is where the request sending logic occurs
+            let call = client.HandleMessage({ id: `0000`, message: `Intiate Main Stream Channel Response` })
+            console.log(`Sending request to open response channel...`)
+
+            call.on('status', (status: Status) => {
+                if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
+                    console.log(`Message trasmission operation is successful`)
+                    // RPC completed successfully
+                } if (status == grpc.status.UNAVAILABLE) {
+                    let report = {
+                        code: ColorCode.YELLOW,
+                        message: `Server doesn't seem to be alive. Error returned.`,
+                        from: `Server Streaming Client Instance`
                     }
-                    console.log(message)
-                    console.log(`Sending request: ${message.id} over to server....`)
-                    let call = client.HandleMessage(message)
-
-                    call.on('status', (status: Status) => {
-                        if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
-                            console.log(`Message trasmission operation is successful`)
-                            // RPC completed successfully
-                        } if (status == grpc.status.UNAVAILABLE) {
-                            resolve('No connection established. Server is not responding..')
-                            let report = {
-                                code: ColorCode.YELLOW,
-                                message: `Server doesn't seem to be alive. Error returned.`,
-                                payload: request,
-                                from: `Server Streaming Client Instance`
-                            }
-                            statusControl.next(report)
-                        }
-                    });
-
-                    call.on('data', (data: any) => {
-                        let response: GrpcMessage = {
-                            id: data.id,
-                            message: JSON.parse(data.message)
-                        }
-                        incomingResponse.next(response)
-                        console.log((response.message as MessageLog).appData.msgId)
-                    });
-
-                    call.on('error', (err) => {
-                    });
-
-                    call.on('end', () => { // this is for gracefull || willfull termination from the server
-                        console.log(`Terminating Stream Request. Directing response to main channel`)
-                    });
+                    statusControl.next(report)
+                    resolve('No connection established. Server is not responding..')
+                }
+            });
 
-                },
-                error: error => {
-                    console.error(error),
-                        resolve(error)
-                },
-                complete: () => { } // should not complete since this is an indefinite listening process to transmit requests made by relevant client application
-            })
+            call.on('data', (data: any) => {
+                // standard procedure. convert back the data and pass to the application to be processed
+                let response: Message = {
+                    id: data.id,
+                    message: JSON.parse(data.message)
+                }
+                incomingMessage.next(response)
+                console.log((response.message as MessageLog).appData.msgId)
+            });
 
-            unaryRequestSubject.next({ id: `0000`, message: `Intiate Main Stream Channel Response` })
-
-            this.checkConnectionHealth(client, statusControl, alreadyHealthCheck) // perform check to see if server is alive, if not terminate this grpc instant and create again
-            // initiate(statusControl, incomingResponse).then(() => {
-            //     streamRequest(unaryRequestSubject, statusControl)
-            // }).catch(() => {
-            //     resolve('Trigger Reconnection logic. Terminate this client instance and creating new ones')
-            // })
-
-            // async function intialize(statusControl: Subject<any>, incomingResponse: Subject<any>) {
-            // async function initiate(statusControl: Subject<ReportStatus>, incomingResponse: Subject<any>) {
-            //     let greenlight: ReportStatus = {
-            //         code: ColorCode.GREEN,
-            //         message: `Initial Client set up. Release unary Request`,
-            //         from: `Server Streaming Client Instance`
-            //     }
-            //     statusControl.next(greenlight)
-            //     let report: ReportStatus = {
-            //         code: ColorCode.YELLOW,
-            //         message: `Server doesn't seem to be alive. Error returned.`,
-            //         from: `Server Streaming Client Instance`
-            //     }
-            //     let call = client.HandleMessage({
-            //         id: '0000',
-            //         message: `Establishing channel for response stream. Channel for response!`
-            //     })
-
-            //     call.on('status', (status: Status) => {
-            //         // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
-            //         // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming
-            //         if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
-            //             console.log(`Message trasmission operation is successful`)
-            //             resolve('')
-            //         } if (status == grpc.status.UNAVAILABLE) {
-            //             resolve('No connection established. Server is not responding..')
-            //             statusControl.next(report)
-            //             reject()
-            //         }
-            //     });
-
-            //     // This is and should be the only channel for response. THe demultiplexing will be handled by application logic
-            //     call.on('data', (data: any) => {
-            //         // console.log(`Received stream response from Server. Receiver: ${message.id}`);
-            //         let response = {
-            //             id: data.id,
-            //             message: JSON.parse(data.message)
-            //         }
-            //         // console.log(response)
-            //         incomingResponse.next(response)
-            //     });
-
-            //     call.on('error', (err) => {
-            //         statusControl.next(report)
-            //     });
-
-            //     call.on('end', () => { // this is for gracefull || willfull termination from the server
-            //         console.log(`Streaming Response is completed`)
-            //         statusControl.next(report)
-            //     });
-            // }
-
-            // // }
-
-            // // function streamRequest(unaryRequestSubject: Subject<any>, statusControl: Subject<any>) {
-            // // Just send request, no need to listen to response. IT will be handled by the channel above.
-            // function streamRequest(unaryRequestSubject: Subject<GrpcMessage>, statusControl: Subject<ReportStatus>) {
-            //     unaryRequestSubject.subscribe({
-            //         next: (request: any) => {
-            //             let message = {
-            //                 id: request.id,
-            //                 message: JSON.stringify(request)
-            //             }
-            //             console.log(`Sending request: ${message.id} over to server....`)
-            //             const call = client.HandleMessage(message)
-
-            //             call.on('status', (status: Status) => {
-            //                 if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
-            //                     console.log(`Message trasmission operation is successful`)
-            //                     // RPC completed successfully
-            //                 } if (status == grpc.status.UNAVAILABLE) {
-            //                     resolve('No connection established. Server is not responding..')
-            //                     let report = {
-            //                         code: ColorCode.YELLOW,
-            //                         message: `Server doesn't seem to be alive. Error returned.`,
-            //                         payload: request,
-            //                         from: `Server Streaming Client Instance`
-            //                     }
-            //                     statusControl.next(report)
-            //                 }
-            //             });
-
-            //             // call.on('data', (data: any) => {
-            //             //     let response = {
-            //             //         data: data.id,
-            //             //         message: JSON.parse(data.message)
-            //             //     }
-            //             //     console.log(response.message.appData.msgId)
-            //             // });
-
-            //             call.on('error', (err) => {
-            //             });
-
-            //             call.on('end', () => { // this is for gracefull || willfull termination from the server
-            //                 console.log(`Terminating Stream Request. Directing response to main channel`)
-            //             });
-
-            //         },
-            //         error: error => {
-            //             console.error(error),
-            //                 resolve(error)
-            //         },
-            //         complete: () => { } // should not complete since this is an indefinite listening process to transmit requests made by relevant client application
-            //     })
-            // }
+            call.on('error', (err) => {
+                resolve('')
+            });
 
+            // call.on('end', () => { // this is for gracefull || willfull termination from the server
+            //     console.log(`Terminating Stream Request. Directing response to main channel`)
+            //     resolve('')
+            // });
         })
     }
 
+
+
+    /* ----------------All the functions below are for Bi-directional streaming. Subject to be deleted if decided not in use---------------- */
     public async createGrpcBidirectionalServer(
         serverUrl: string,
         messageToBeStream: Subject<any>,
         statusControl: Subject<ReportStatus>,
         grpcServerConnection: any,
-        incomingRequest: Subject<GrpcMessage>
     ): Promise<any> { // '0.0.0.0:3001'
         return new Promise((resolve, reject) => {
             try {
@@ -394,7 +232,7 @@ export class GrpcServiceMethod {
         alreadyHealthCheck: boolean,
         messageToBeTransmitted: Subject<any>,
         statusControl: Subject<ReportStatus>,
-        incomingResponse: Subject<GrpcMessage>
+        incomingResponse: Subject<Message>
     ): Promise<string> {
         let subscription: any
         let unsubscribed: boolean = false
@@ -469,6 +307,7 @@ export class GrpcServiceMethod {
                     statusControl.next(report)
                 } else {
                     if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
+                    resolve(false)
                 }
             })
         })

+ 61 - 22
test/grpc1.ts

@@ -1,25 +1,28 @@
-import { Subject, from, take, takeUntil } from "rxjs";
-import { GrpcService } from "../services/grpc.service";
 import * as fs from 'fs'
-import { FisRetransmissionService } from "../services/fis.retransmission.service";
-import { ReportStatus } from "../interfaces/general.interface";
-import { GrpcServiceMethod } from "../services/service.method";
+import { Subject, groupBy, mergeMap, take, toArray } from 'rxjs';
+import { ColorCode, Message, MessageLog, ReportStatus } from '../interfaces/general.interface';
+import { GrpcService } from '../services/grpc.service';
+import { FisRetransmissionService } from '../services/fis.retransmission.service';
+import { GrpcServiceMethod } from '../services/service.method';
 
-const messagesJSON: any = fs.readFileSync('payload.json')
+// Subject for bidirectional communication
 const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
-const gprcService: GrpcService = new GrpcService(new GrpcServiceMethod()) 
-let incomingRequest: Subject<any> = gprcService.getIncomingRequest()
-let applicationOutgoingResponse: Subject<any> = new Subject()
+const grpcService: GrpcService = new GrpcService(new GrpcServiceMethod())
+const messagesJSON: any = fs.readFileSync('payload.json')
+let incomingMessages: Subject<any> = grpcService.getIncomingMessage()
 let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial 
-let dataMessages = stream() // Emulate messges to be sent over to target server
-let messageToBePublished: Subject<any> = new Subject()
-let statusControl: Subject<ReportStatus> = new Subject()
+let messageToBeReleased: Subject<any> = new Subject() // Sample message to be transmitted over to target server
+let applicationOutgoingResponse: Subject<Message> = new Subject()
+let statusControl: Subject<ReportStatus> = new Subject() // Listening for error events and states
+let targetserver: string = 'localhost:3001'
+let hostServer: string = 'localhost:3000'
+let array: any[] = [] // Used for testing
 
-incomingRequest.subscribe({
-  next: request => { // this whole thing is for 1 request
-    if (request.id == '0000') {
-      console.log(`Just checking for buffer. Dont do anything else!`)
-    } else {
+// Handler for the incoming Messages from the other side. 
+incomingMessages.subscribe({
+  next: request => {
+    // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
+    if ((request.message as MessageLog).appData.msgPayload == 'Query') {
       generateFakeStreamResponse(request).subscribe({
         next: (responseMessage) => {
           // console.log(`Processing request:${request.id}....`)
@@ -30,18 +33,43 @@ incomingRequest.subscribe({
           console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
         }
       })
+    } else {
+      array.push(request)
+      console.log(`Received messages from the other side: ${(request.message as MessageLog).appData.msgId}`)
     }
   },
   error: error => console.error(error),
   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
 })
 
-/* For server streaming */
+// Open channel for sending messages across.
 errorHandlingService.handleMessage(applicationOutgoingResponse, statusControl).subscribe((messages) => {
-  messageToBePublished.next(messages)
+  messageToBeReleased.next(messages)
 })
-let server1 = 'localhost:3000'
-gprcService.createGrpcInstance(server1, messageToBePublished, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' })
+grpcService.createGrpcInstance(hostServer, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' }, messageToBeReleased)
+
+grpcService.createGrpcInstance(targetserver, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
+
+setTimeout(() => {
+  let message = {
+    id: parsedMessages[10].appData.msgId,
+    message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
+  }
+  applicationOutgoingResponse.next(message)
+}, 2000)
+setTimeout(() => {
+  let message = {
+    id: parsedMessages[11].appData.msgId,
+    message: parsedMessages[11]// Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
+  }
+  applicationOutgoingResponse.next(message)
+}, 3000)
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 10000)
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 20000)
 
 
 // this is just to publish an array of fake data as a Subject
@@ -60,9 +88,10 @@ function stream(): Subject<any> {
   return result
 }
 
+
 function generateFakeStreamResponse(request: any): Subject<any> {
   let res: Subject<any> = new Subject()
-  stream().pipe(take(10)).subscribe({
+  stream().pipe(take(7)).subscribe({
     next: element => {
       let message = {
         id: request.id, // Caller's 
@@ -75,3 +104,13 @@ function generateFakeStreamResponse(request: any): Subject<any> {
   })
   return res
 }
+
+/* Extra NOTEs:
+So let's say when this host makes a request and send it over to the other side to process. And say this request
+is actually a query, so it will take some times for the other side to process the data. But what happens when the 
+other side down, that means i won't get my query. Is this the responsibility of the application logic of the other
+side? To keep track of the message request sent over? 
+Personal opinion it should be the responsibility of the application logic to keep track of the request they are 
+processing, especially when the server goes down whilst they are streaning back the response. Because for this 
+retransmission service, it shouldn't care anymore. It just make sure to get the message to pass over and buffer
+the message when established client instance could not talk to the server on the other side.*/

+ 62 - 45
test/grpc2.ts

@@ -1,6 +1,6 @@
 import * as fs from 'fs'
-import { Subject, groupBy, mergeMap, toArray } from 'rxjs';
-import { ColorCode, GrpcMessage, MessageLog, ReportStatus } from '../interfaces/general.interface';
+import { Subject, groupBy, mergeMap, take, toArray } from 'rxjs';
+import { ColorCode, Message, MessageLog, ReportStatus } from '../interfaces/general.interface';
 import { GrpcService } from '../services/grpc.service';
 import { FisRetransmissionService } from '../services/fis.retransmission.service';
 import { GrpcServiceMethod } from '../services/service.method';
@@ -9,66 +9,68 @@ import { GrpcServiceMethod } from '../services/service.method';
 const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
 const grpcService: GrpcService = new GrpcService(new GrpcServiceMethod())
 const messagesJSON: any = fs.readFileSync('payload.json')
-let incomingResponse: Subject<any> = grpcService.getIncomingResponse()
+let incomingMessages: Subject<any> = grpcService.getIncomingMessage()
 let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial 
 let messageToBeReleased: Subject<any> = new Subject() // Sample message to be transmitted over to target server
+let applicationOutgoingResponse: Subject<Message> = new Subject()
 let statusControl: Subject<ReportStatus> = new Subject() // Listening for error events and states
-let dataMessages = stream() // Emulate messges to be sent over to target server
-let server1: string = 'localhost:3000'
-let unaryRequestSubject: Subject<GrpcMessage> = new Subject()
-let array: any[] = []
+let targetserver: string = 'localhost:3000'
+let hostServer: string = 'localhost:3001'
+let array: any[] = []// Used for testing
 
-incomingResponse.subscribe({
+// Handler for the incoming Messages from the other side. 
+incomingMessages.subscribe({
   next: request => {
-    array.push(request)
-    console.log(`To be distributed to request:${request.id} => message: ${(request.message as MessageLog).appData.msgId}`)
-    /* Here's the plan. Since each and every single call that the client make, the server will open a stream specifically for that 
-    client. Now. A new strategy to cater to the revised solution. Everytime a client start up, it will first make a request call
-    spefically to open a stream specifically for response. THe demulitplexing will be delegated outside. The grpc instance should
-    not be bothered with hashydabery that's going on. Server side just need to receive the request, (must terminate the stream 
-    response if it eats up too much memory), and then respond with the stream established earlier when the client first send the 
-    first request call.
-    New dilemma, there will be some changes to be followed as well. Since i want to integrate the termination of the stream and in
-    this case, the stream will terminate right after it sends a acknoledgement saying that the request is being processed. But due 
-    to the reconnection logic integration, the client will treat server.end as a server failure, and will try to execute the 
-    reconnection logic. Should not be hard to change.
-     */
-  }
+    // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
+    if ((request.message as MessageLog).appData.msgPayload == 'Query') {
+      generateFakeStreamResponse(request).subscribe({
+        next: (responseMessage) => {
+          // console.log(`Processing request:${request.id}....`)
+          applicationOutgoingResponse.next(responseMessage)
+        },
+        error: error => console.error(error),
+        complete: () => {
+          console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
+        }
+      })
+    } else {
+      array.push(request)
+      console.log(`Received messages from the other side: ${(request.message as MessageLog).appData.msgId}`)
+    }
+  },
+  error: error => console.error(error),
+  complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
 })
 
-/* Server Streaming Test case */
-errorHandlingService.handleMessage(unaryRequestSubject, statusControl).subscribe((messages) => {
+// Open channel for sending messages across.
+errorHandlingService.handleMessage(applicationOutgoingResponse, statusControl).subscribe((messages) => {
   messageToBeReleased.next(messages)
 })
-grpcService.createGrpcInstance(server1, messageToBeReleased, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
+grpcService.createGrpcInstance(hostServer, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' }, messageToBeReleased)
+
+grpcService.createGrpcInstance(targetserver, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
 
 setTimeout(() => {
-  let request = {
-    id: parsedMessages[1].appData.msgId,
-    message: parsedMessages[1]
+  let message = {
+    id: parsedMessages[10].appData.msgId,
+    message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
   }
-  unaryRequestSubject.next(request)
-}, 1000)
+  applicationOutgoingResponse.next(message)
+}, 2000)
 setTimeout(() => {
-  let request = {
-    id: parsedMessages[2].appData.msgId,
-    message: parsedMessages[2]
+  let message = {
+    id: parsedMessages[11].appData.msgId,
+    message: parsedMessages[11]// Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
   }
-  unaryRequestSubject.next(request)
+  applicationOutgoingResponse.next(message)
 }, 3000)
 setTimeout(() => {
-  let request = {
-    id: parsedMessages[3].appData.msgId,
-    message: parsedMessages[3]
-  }
-  unaryRequestSubject.next(request)
-}, 5000)
-setTimeout(() => {
-  console.log(`Full amount received: ${array.length}`)
+  console.log(`All received data: ${array.length}`)
 }, 10000)
 setTimeout(() => {
-  console.log(`Full amount received: ${array.length}`)
-}, 12000)
+  console.log(`All received data: ${array.length}`)
+}, 20000)
+
 
 // this is just to publish an array of fake data as a Subject
 function stream(): Subject<any> {
@@ -83,7 +85,22 @@ function stream(): Subject<any> {
       result.complete();
     }
   }, 500)
-
   return result
 }
 
+function generateFakeStreamResponse(request: any): Subject<any> {
+  let res: Subject<any> = new Subject()
+  stream().pipe(take(5)).subscribe({
+    next: element => {
+      let message = {
+        id: request.id, // Caller's 
+        message: element
+      }
+      res.next(message)
+    },
+    error: error => console.error(error),
+    complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
+  })
+  return res
+}
+

+ 92 - 0
test/grpc3.ts

@@ -0,0 +1,92 @@
+import * as fs from 'fs'
+import { Subject, groupBy, mergeMap, take, toArray } from 'rxjs';
+import { ColorCode, Message, MessageLog, ReportStatus } from '../interfaces/general.interface';
+import { GrpcService } from '../services/grpc.service';
+import { FisRetransmissionService } from '../services/fis.retransmission.service';
+import { GrpcServiceMethod } from '../services/service.method';
+
+// Subject for bidirectional communication
+const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
+const grpcService: GrpcService = new GrpcService(new GrpcServiceMethod())
+const messagesJSON: any = fs.readFileSync('payload.json')
+let incomingMessages: Subject<any> = grpcService.getIncomingMessage()
+let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial 
+let messageToBeReleased: Subject<any> = new Subject() // Sample message to be transmitted over to target server
+let applicationOutgoingResponse: Subject<Message> = new Subject()
+let statusControl: Subject<ReportStatus> = new Subject() // Listening for error events and states
+let targetserver: string = 'localhost:3000'
+let hostServer: string = 'localhost:3001'
+let array: any[] = []
+
+incomingMessages.subscribe({
+  next: request => {
+    // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
+    if ((request.message as MessageLog).appData.msgPayload == 'Query') {
+      generateFakeStreamResponse(request).subscribe({
+        next: (responseMessage) => {
+          // console.log(`Processing request:${request.id}....`)
+          applicationOutgoingResponse.next(responseMessage)
+        },
+        error: error => console.error(error),
+        complete: () => {
+          console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
+        }
+      })
+    } else {
+      array.push(request)
+      console.log(`Received messages from the other side: ${(request.message as MessageLog).appData.msgId}`)
+    }
+  },
+  error: error => console.error(error),
+  complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+})
+
+// Open channel for sending messages across.
+errorHandlingService.handleMessage(applicationOutgoingResponse, statusControl).subscribe((messages) => {
+  messageToBeReleased.next(messages)
+})
+grpcService.createGrpcInstance(hostServer, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' }, messageToBeReleased)
+
+grpcService.createGrpcInstance(targetserver, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
+
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 10000)
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 20000)
+
+
+// this is just to publish an array of fake data as a Subject
+function stream(): Subject<any> {
+  let result: Subject<any> = new Subject()
+  let messages: any[] = parsedMessages
+  let count = 0
+  const intervalId = setInterval(() => {
+    result.next(messages[count]);
+    count++;
+    if (count >= 1000) {
+      clearInterval(intervalId);
+      result.complete();
+    }
+  }, 500)
+  return result
+}
+
+
+function generateFakeStreamResponse(request: any): Subject<any> {
+  let res: Subject<any> = new Subject()
+  stream().pipe(take(5)).subscribe({
+    next: element => {
+      let message = {
+        id: request.id, // Caller's 
+        message: element
+      }
+      res.next(message)
+    },
+    error: error => console.error(error),
+    complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
+  })
+  return res
+}
+

+ 0 - 0
test/health.ts