Переглянути джерело

added attribution class and restructure server client and grpc

enzo 10 місяців тому
батько
коміт
9da36b30b3

+ 1 - 1
.env

@@ -13,4 +13,4 @@ ReconnectionAttempt = '1000'
 
 TimeOut = '10'
 
-MaxBufferLoad = '20'
+MaxBufferLoad = '1000000'

+ 8 - 12
interfaces/general.interface.ts

@@ -8,12 +8,6 @@ export enum ColorCode {
     'YELLOW' = 'YELLOW',
     'RED' = 'RED'
 }
-
-export enum ConnectionStatus {
-    'GREEN' = 'GREEN',
-    'YELLOW' = 'YELLOW',
-    'RED' = 'RED'
-}
 export interface messageTransmissionInterface {
     id?: string,
     state: '' | 'attempt to send' | 'failed sent' | 'sent successfully',
@@ -55,12 +49,12 @@ export type Status = -1 | 0 | 1 // For status chain effect
 
 export interface ConnectionAttribute {
     ConnectionID: ConnectionID,
-    outGoing: ChannelAttribute,
-    inComing: ChannelAttribute,
+    outGoing: StreamAttribute,
+    inComing: StreamAttribute,
     connectionStatus: Subject<ReportStatus>
 }
-export interface ChannelAttribute {
-    ChannelID?: string,
+export interface StreamAttribute {
+    StreamID?: string,
     PublisherID?: string,
     SubscriberID?: string,
     PublisherInstance?: any,
@@ -70,17 +64,19 @@ export interface ChannelAttribute {
 }
 
 export interface ConnectionRequest {
-    database: string,
+    database?: string,
     server: ServerRequest,
     client: ClientRequest
 }
 
 export interface ServerRequest {
+    name: string,
     serverUrl: string,
     connectionType: 'GRPC' | 'HTTP' | 'Socket',
     messageToBePublishedfromApplication: Subject<Message>
 }
 export interface ClientRequest {
+    name: string,
     targetServer: string,
     connectionType: 'GRPC' | 'HTTP' | 'Socket',
     messageToBeReceivedFromRemote: Subject<Message>
@@ -89,4 +85,4 @@ export interface ClientRequest {
 export interface ConnectionID {
     local: string,
     remote: string
-}
+}

+ 129 - 0
services/attributes/attribute.ts

@@ -0,0 +1,129 @@
+import { Observable } from "rxjs";
+import { MessageLog } from "../../interfaces/general.interface";
+
+abstract class RemoteSR {
+    remoteSrId: string | undefined
+    msgLogger: MessageLogger
+    msgErrorLogger: MessageErrorLogger
+    Transporter: Transporter
+
+    constructor(remoteSrId: string, msgLogger: MessageLogger, msgErrorLogger: MessageErrorLogger, transporter: Transporter) {
+        this.remoteSrId = remoteSrId
+        this.msgLogger = msgLogger
+        this.msgErrorLogger = msgErrorLogger
+        this.Transporter = transporter
+    }
+}
+
+abstract class RemoteSRClient extends RemoteSR {
+    abstract RemoteSR
+    constructor(remoteSrId: string, msgLogger: MessageLogger, msgErrorLogger: MessageErrorLogger, transporter: Transporter) {
+        super(remoteSrId, msgLogger, msgErrorLogger, transporter)
+    }
+
+    abstract send(msg: any): Observable<any>
+
+    abstract emit(msg: any)
+}
+
+abstract class RemoteRSClient {
+    resObs: Observable<any>
+
+    constructor(resObs: Observable<any>) {
+        this.resObs = resObs;
+    }
+}
+
+class RemoteSender extends RemoteSR {
+    constructor(remoteSrId: string, msgLogger: MessageLogger, msgErrorLogger: MessageErrorLogger, transporter: Transporter) {
+        super(remoteSrId, msgLogger, msgErrorLogger, transporter)
+    }
+}
+
+class RemoteReceiver extends RemoteSR {
+    receivableObs: Observable<any>
+    constructor(remoteSrId: string, msgLogger: MessageLogger, msgErrorLogger: MessageErrorLogger, transporter: Transporter) {
+        super(remoteSrId, msgLogger, msgErrorLogger, transporter)
+        this.receivableObs = transporter.transporter // Assuming this is where it returns an observable
+    }
+}
+
+
+class MessageLogger extends RemoteSR {
+    msgId: string
+    msg: MessageLog
+
+    constructor(msgId: string, msg: MessageLog, remoteSrId: string, msgLogger: MessageLogger, msgErrorLogger: MessageErrorLogger, transporter: Transporter) {
+        super(remoteSrId, msgLogger, msgErrorLogger, transporter)
+        this.msgId = msgId
+        this.msg = msg
+    }
+}
+
+class MessageErrorLogger extends RemoteSR {
+    msgId: string
+
+    constructor(msgId: string, remoteSrId: string, msgLogger: MessageLogger, msgErrorLogger: MessageErrorLogger, transporter: Transporter) {
+        super(remoteSrId, msgLogger, msgErrorLogger, transporter)
+        this.msgId = msgId
+    }
+}
+
+
+class Transporter extends RemoteSR {
+    transporter: any
+    constructor(remoteSrId: string, msgLogger: MessageLogger, msgErrorLogger: MessageErrorLogger, transporter: Transporter) {
+        super(remoteSrId, msgLogger, msgErrorLogger, transporter)
+        this.transporter = transporter
+    }
+}
+
+class RemoteReceiverClient extends RemoteSRClient {
+    RemoteSR: any;
+    constructor(remoteSrId: string, msgLogger: MessageLogger, msgErrorLogger: MessageErrorLogger, transporter: Transporter) {
+        super(remoteSrId, msgLogger, msgErrorLogger, transporter)
+    }
+
+    send(msg: any): Observable<any> {
+        let newObservable: Observable<any> = new Observable()
+        return newObservable
+    }
+
+    emit(msg: any) {
+    }
+
+}
+class RemoteSenderClient extends RemoteSRClient {
+    RemoteSR: any;
+    constructor(remoteSrId: string, msgLogger: MessageLogger, msgErrorLogger: MessageErrorLogger, transporter: Transporter) {
+        super(remoteSrId, msgLogger, msgErrorLogger, transporter)
+    }
+
+    send(msg: any): Observable<any> {
+        let newObservable: Observable<any> = new Observable()
+        return newObservable
+    }
+
+    emit(msg: any) {
+
+    }
+}
+
+class RemoteRSReceiverClient extends RemoteRSClient {
+    observables: Observable<any>[]
+
+    constructor(resObs: Observable<any>, observables: Observable<any>[]) {
+        super(resObs)
+        this.observables = observables
+    }
+
+    addObs(obs: Observable<any>) { }
+}
+
+class RemoteRSSenderClient extends RemoteRSClient {
+    receiver: any
+    constructor(resObs: Observable<any>, receiver: any) {
+        super(resObs)
+        this.receiver = receiver
+    }
+}

+ 32 - 21
services/fis.retransmission.service.ts

@@ -1,4 +1,3 @@
-import * as _ from 'lodash'
 import mongoose, { Model, Schema } from 'mongoose';
 import { BehaviorSubject, Observable, Subject, Subscription, from } from 'rxjs'
 import { ColorCode, Message, MessageLog, ReportStatus, Status } from '../interfaces/general.interface'
@@ -9,27 +8,26 @@ export class FisRetransmissionService {
     private mongoUrl: string = process.env.MONGO as string
     private bufferedStorage: Message[] = []
     private mongoConnection: any
-    private messageModel: any
+    private messageModel: Model<any> | null | undefined
     private maximumBufferLength: number = parseInt(process.env.MaxBufferLoad as string) // please configure at environment
-    // private statusReport: Subject<ReportStatus> = new Subject()
 
-    constructor(private databaseName: string) {
+    constructor(private databaseName: string, private statusReport: BehaviorSubject<ReportStatus>) {
         // Connect to mongoDB. 
         this.manageMongoConnection(databaseName)
     }
 
     // Main function that intercepts outgoing messages by communicating || intepreting report status from grpc connection as indicator 
-    public handleMessage(applicationOutgoingMessage: Subject<Message>, statusReport: BehaviorSubject<ReportStatus>): Subject<Message> {
+    public handleMessage(applicationOutgoingMessage: Subject<Message>): Subject<Message> {
         let releaseMessageSubject: Subject<Message> = new Subject() // Every message subscribed from applicationOutgoingMessage will be released through this subject
         let messageReleaseSubscription: Subscription | null = null
         let messageBufferSubscription: Subscription | null = null
         let messageStreamToMongo: Subscription | null = null
-        this.checkBufferLimit(applicationOutgoingMessage, statusReport)
-        statusReport.subscribe((report: ReportStatus) => {
+        this.checkBufferLimit(applicationOutgoingMessage, this.statusReport)
+        this.statusReport.subscribe((report: ReportStatus) => {
             /* Green should release all data from buffer and mongo and also redirect the applicationOutgoingMessage back into the return subject(releaseMessageSubject)
             if there's any. */
             if (report.code == ColorCode.GREEN) {
-                console.log(`Connection status report && ${report.message ?? 'No Message'}`)
+                // console.log(`Connection status report && ${report.message ?? 'No Message'}`)
                 /* Status Chain begins */
                 let status: Status = 1
                 if (status === 1) {
@@ -81,7 +79,7 @@ export class FisRetransmissionService {
                     console.log(`Rebuffering ${report.payload.message?.appData?.msgId} into buffer...`)
                     this.bufferedStorage.push(report.payload)
                 }
-                console.log(`Connection status report && ${report.message ?? 'No Message'}`)
+                // console.log(`Connection status report && ${report.message ?? 'No Message'}`)
                 let status: Status = 1
                 /* Status Chain begins */
                 if (status === 1) {
@@ -99,7 +97,7 @@ export class FisRetransmissionService {
             /* Stop buffering the message in local instance, but start saving them in database. Must first transfer the ones in local buffer before redirecting the 
             flow from applicationOutgoingMessage into Mongo */
             if (report.code == ColorCode.RED) {
-                console.log(`Connection status report: ${report.message}`)
+                // console.log(`Connection status report: ${report.message}`)
                 if (report.payload) {
                     console.log(`Rebuffering ${report.payload.message?.appData?.msgId} into storage...`)
                     this.saveToMongo(report.payload)
@@ -271,13 +269,17 @@ export class FisRetransmissionService {
     private async saveToMongo(message: Message): Promise<boolean> {
         return new Promise((resolve, reject) => {
             // let messageModel: Model<any> = this.mongoConnection.model('Message', require('../models/message.schema'))
-            this.messageModel.create(message).then(() => {
-                console.log(`Saved MessageID ${(message.message as MessageLog).appData.msgId} into ${this.mongoUrl}`);
-                resolve(true)
-            }).catch((err) => {
-                console.log(`MongoSaveError: ${err.message}`)
-                reject(err)
-            })
+            if (this.messageModel) {
+                this.messageModel.create(message).then(() => {
+                    console.log(`Saved MessageID ${(message.message as MessageLog).appData.msgId} into ${this.mongoUrl}`);
+                    resolve(true)
+                }).catch((err) => {
+                    console.log(`MongoSaveError: ${err.message}`)
+                    reject(err)
+                })
+            } else {
+                console.log(`Cant save message. Message Model is absent or not properly initialized`)
+            }
         })
     }
 
@@ -345,15 +347,20 @@ export class FisRetransmissionService {
             let status: Status = 1
             if (status = 1) {
                 let database = this.mongoUrl + databaseName
-                console.log(database)
+                console.log(`Connected to ${database}`)
                 this.mongoConnection = mongoose.createConnection(database)
                 this.mongoConnection.on('error', (error) => {
                     console.error('Connection error:', error);
                     resolve('')
                 });
                 this.mongoConnection.once('open', () => {
-                    console.log(`Connected to ${process.env.MONGO}`);
+                    // console.log(`Connected to ${process.env.MONGO}`);
+                    let report: ReportStatus = {
+                        code: ColorCode.RED,
+                        message: `Mongo storage available`
+                    }
                     this.messageModel = this.mongoConnection.model('Message', require('../models/message.schema'));
+                    this.statusReport.next(report)
                 });
             }
         })
@@ -387,8 +394,12 @@ export class FisRetransmissionService {
                     subjectArgs.complete();
                     // Delete the data once it has been streamed
                     try {
-                        await this.messageModel.deleteMany({});
-                        console.log('Data in Mongo deleted successfully.');
+                        if (this.messageModel) {
+                            await this.messageModel.deleteMany({});
+                            console.log('Data in Mongo deleted successfully.');
+                        } else {
+                            console.log(`Message Mongoose Model is not intiated properly...`)
+                        }
                     } catch (err) {
                         console.error('Error deleting data:', err);
                     }

+ 50 - 42
services/grpc.service.method.ts

@@ -4,7 +4,11 @@ import { ReportStatus, ColorCode, Message, MessageLog, ConnectionAttribute, Conn
 import { Status } from '@grpc/grpc-js/build/src/constants';
 import { v4 as uuidv4 } from 'uuid'
 import { message_proto } from './protos/server.proto'
+import { ServerWritableStreamImpl } from '@grpc/grpc-js/build/src/server-call';
 export class GrpcServiceMethod {
+    private server: grpc.Server | any
+    private messageToBeSendOver: Message | any
+    private callRequestsFromRemote: ServerWritableStreamImpl<any, ResponseType>[] = []
 
     public async create(request: ConnectionRequest, connectionAttribute: ConnectionAttribute): Promise<any> {
         // Assuming currently only one client
@@ -12,36 +16,44 @@ export class GrpcServiceMethod {
         this.createGrpcInstance(request.client.targetServer, { instanceType: 'client' }, connectionAttribute)
     }
 
+    // For testing only
+    public async shutDownServer(): Promise<string> {
+        return new Promise((resolve, reject) => {
+            console.log(`Shutting down servers...`)
+            if (this.server) {
+                this.callRequestsFromRemote[0].destroy()
+                this.callRequestsFromRemote[0].end()
+                this.server.forceShutdown()
+                let message: string = `Server shut down successfully!`
+                resolve(message)
+            }
+            if (!this.server) {
+                let errorMsg: string = `There's no active server here`
+                reject(errorMsg)
+            }
+        })
+    }
+
     private async generateAdditionalAttributes(connectionAttribute: ConnectionAttribute, clientInfo?: any, localInfo?: any) {
         if (clientInfo) {
-            connectionAttribute.inComing.ChannelID = clientInfo.channelID
+            connectionAttribute.inComing.StreamID = clientInfo.channelID
             connectionAttribute.inComing.PublisherID = clientInfo.publisherID
             connectionAttribute.inComing.SubscriberID = clientInfo.subscriberID
-            // let report: any = {
-            //     message: 'Remote Server Communication Established',
-            //     channelID: clientInfo.channelID
-            // }
-            // connectionAttribute.connectionStatus.next(report)
         }
         if (localInfo) {
-            connectionAttribute.outGoing.ChannelID = localInfo.channelID
+            connectionAttribute.outGoing.StreamID = localInfo.channelID
             connectionAttribute.outGoing.PublisherID = localInfo.publisherID
             connectionAttribute.outGoing.SubscriberID = localInfo.subscriberID
-            // let report: any = {
-            //     message: 'Local Server Communication Established',
-            //     channelID: localInfo.channelID
-            // }
-            // connectionAttribute.connectionStatus.next(report)
         }
-        if (connectionAttribute.outGoing.ChannelID && connectionAttribute.inComing.ChannelID) {
-            connectionAttribute.ConnectionID.local = connectionAttribute.outGoing.ChannelID + connectionAttribute.inComing.ChannelID
-            connectionAttribute.ConnectionID.remote = connectionAttribute.inComing.ChannelID + connectionAttribute.outGoing.ChannelID
+        if (connectionAttribute.outGoing.StreamID && connectionAttribute.inComing.StreamID) {
+            connectionAttribute.ConnectionID.local = connectionAttribute.outGoing.StreamID + connectionAttribute.inComing.StreamID
+            connectionAttribute.ConnectionID.remote = connectionAttribute.inComing.StreamID + connectionAttribute.outGoing.StreamID
             let report: ReportStatus = {
                 code: ColorCode.GREEN,
                 message: `ConnectionID acquired. Informing Restranmission to release Messages...`,
             }
             connectionAttribute.connectionStatus.next(report)
-            console.log(connectionAttribute)
+            // console.log(connectionAttribute)
         }
     }
 
@@ -54,7 +66,6 @@ export class GrpcServiceMethod {
         let statusControl: Subject<ReportStatus> = connectionAttribute.connectionStatus
         let consecutiveResolutions = 0;
         let lastResolutionTime = Date.now();
-        let alreadyHealthCheck: boolean = false
         let yellowErrorEmission: boolean = false
         let redErrorEmission: boolean = false
 
@@ -67,7 +78,7 @@ export class GrpcServiceMethod {
                         })
                     }
                     if (grpcType.instanceType == 'client') {
-                        this.createServerStreamingClient(serverUrl, alreadyHealthCheck, connectionAttribute).then(() => {
+                        this.createServerStreamingClient(serverUrl, connectionAttribute).then(() => {
                             resolve('recreate')
                         })
                     }
@@ -76,26 +87,16 @@ export class GrpcServiceMethod {
                 // If connection resolves (indicating failure), increment the count
                 consecutiveResolutions++;
                 // console.log(`Reconnection Attempt: ${consecutiveResolutions}`)
-                alreadyHealthCheck = true
                 if (redErrorEmission == false) {
                     redErrorEmission = true
                     // console.error(`Connection failed ${consecutiveResolutions} times. Stopping connection attempts.`);
                     let error: ReportStatus = {
-                        code: ColorCode.RED,
+                        code: ColorCode.YELLOW,
                         message: 'Server is not responding. Proceed to buffer.',
                     }
                     statusControl.next(error)
                 }
-                // Comment it out if Client wishes to use YELLOW for memory buffer instead of persistent storage buffer
-                // if (consecutiveResolutions < parseInt(process.env.ReconnectionAttempt as string) && yellowErrorEmission == false) {
-                //     yellowErrorEmission = true
-                //     let error: ReportStatus = {
-                //         code: ColorCode.YELLOW,
-                //         // message: `Reconnection Attempt: ${consecutiveResolutions}. Server has yet to respond`
-                //         message: `Attempting reconnection... Server has yet to respond`,
-                //     }
-                //     statusControl.next(error);
-                // }
+
             } catch (error) {
                 // Connection did not resolve, reset the count
                 consecutiveResolutions = 0;
@@ -108,7 +109,6 @@ export class GrpcServiceMethod {
                 consecutiveResolutions = 0;
                 yellowErrorEmission = false
                 redErrorEmission = false
-                alreadyHealthCheck = false
             }
             // Update the last resolution time
             lastResolutionTime = currentTime;
@@ -116,6 +116,7 @@ export class GrpcServiceMethod {
             // timeout generate message to trigger this reconnection
         }
     }
+
     // Create Server Instance to stream all application Outgoing messages
     public async createServerStreamingServer(
         serverUrl: string,
@@ -123,11 +124,13 @@ export class GrpcServiceMethod {
     ): Promise<any> { // '0.0.0.0:3001'
         return new Promise((resolve, reject) => {
             try {
-                // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
-                let server: grpc.Server = new grpc.Server();
-                server.addService(message_proto.Message.service, {
+                if (!this.server) {
+                    this.server = new grpc.Server();
+                }
+
+                this.server.addService(message_proto.Message.service, {
                     HandleMessage: (call) => {
-                        // Assign channel uuid
+                        this.callRequestsFromRemote.push(call)
                         let clientInfo = JSON.parse(call.request.message)
                         this.generateAdditionalAttributes(connectionAttribute, clientInfo)
 
@@ -135,7 +138,8 @@ export class GrpcServiceMethod {
                         if (connectionAttribute.outGoing.MessageToBePublished) {
                             let subscription: Subscription = connectionAttribute.outGoing.MessageToBePublished.subscribe({
                                 next: (response: Message) => {
-                                    console.log(`Sending ${(response.message as MessageLog).appData.msgId}`)
+                                    console.log(`Sending from GRPC server: ${(response.message as MessageLog).appData.msgId} `)
+                                    this.messageToBeSendOver = response
                                     let message = {
                                         id: response.id,
                                         message: JSON.stringify(response.message)
@@ -154,6 +158,7 @@ export class GrpcServiceMethod {
                                 }
                             })
                         }
+
                     },
 
                     Check: (_, callback) => {
@@ -163,9 +168,9 @@ export class GrpcServiceMethod {
                     },
                 });
                 // Bind and start the server
-                server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
+                this.server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
                     console.log(`gRPC server is running on ${serverUrl}`);
-                    server.start();
+                    this.server.start();
                 });
             }
             catch (error) {
@@ -178,12 +183,10 @@ export class GrpcServiceMethod {
     // Send a request over to the other server to open a channel for this server to emit/stream messages over
     public async createServerStreamingClient(
         server: string,
-        alreadyHealthCheck: boolean,
         connectionAttribute: ConnectionAttribute
     ): Promise<string> {
         return new Promise(async (resolve, reject) => {
             const client = new message_proto.Message(server, grpc.credentials.createInsecure());
-            // perform check to see if server is alive, if not terminate this grpc instant and create again
 
             let outGoingInfo: any = {
                 channelID: uuidv4(),
@@ -202,8 +205,9 @@ export class GrpcServiceMethod {
                     // RPC completed successfully
                 } if (status == grpc.status.UNAVAILABLE) {
                     let report: ReportStatus = {
-                        code: ColorCode.RED,
+                        code: ColorCode.YELLOW,
                         message: `Server doesn't seem to be alive. Error returned.`,
+                        payload: this.messageToBeSendOver ?? `There's no message at the moment...`
                     }
                     connectionAttribute.connectionStatus.next(report)
                     resolve('No connection established. Server is not responding..')
@@ -218,7 +222,7 @@ export class GrpcServiceMethod {
                 if (connectionAttribute.inComing.MessageToBeReceived) {
                     connectionAttribute.inComing.MessageToBeReceived.next(response)
                 }
-                console.log((response.message as MessageLog).appData.msgId)
+                // console.log(`Received ${(response.message as MessageLog).appData.msgId}`)
             });
 
             call.on('error', (err) => {
@@ -244,3 +248,7 @@ export class GrpcServiceMethod {
         })
     }
 }
+
+
+
+// https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md

+ 23 - 34
services/server-client.service.ts

@@ -1,20 +1,40 @@
 import { BehaviorSubject, Subject } from 'rxjs';
-import { ColorCode, Message, ReportStatus, ConnectionAttribute, ConnectionRequest, ConnectionStatus } from '../interfaces/general.interface';
+import { ColorCode, Message, ReportStatus, ConnectionAttribute, ConnectionRequest } from '../interfaces/general.interface';
 import { GrpcServiceMethod } from './grpc.service.method';
 import { FisRetransmissionService } from './fis.retransmission.service';
 
 export class ServerClientManager {
 
     private connectionAttributes: ConnectionAttribute[] = []
+    private request: ConnectionRequest | any
 
     constructor(private grpcService: GrpcServiceMethod) {
     }
 
+    public restartServerInDuration(time: number) {
+        console.log(this.request)
+        console.log(this.connectionAttributes)
+        this.grpcService.shutDownServer().then((msg: string) => {
+            console.log(msg)
+            setTimeout(() => {
+                this.generateConnection(this.request)
+            }, time * 1000)
+        })
+
+    }
+
     public generateConnection(request: ConnectionRequest) {
+        this.request = request
+        let database: string
+        if (request.database) {
+            database = request.database
+        } else {
+            database = request.server.name + request.client.name
+        }
         let initialReport: ReportStatus = { code: ColorCode.YELLOW, message: 'Initialization of the subject' }
         let reportSubject: BehaviorSubject<ReportStatus> = new BehaviorSubject(initialReport)
-        let retransmission: FisRetransmissionService = new FisRetransmissionService(request.database)
-        let messageToBePublished: Subject<Message> = retransmission.handleMessage(request.server.messageToBePublishedfromApplication, reportSubject)
+        let retransmission: FisRetransmissionService = new FisRetransmissionService(database, reportSubject)
+        let messageToBePublished: Subject<Message> = retransmission.handleMessage(request.server.messageToBePublishedfromApplication) ?? request.server.messageToBePublishedfromApplication
 
         let connectionAttribute: ConnectionAttribute = {
             ConnectionID: {
@@ -31,10 +51,6 @@ export class ServerClientManager {
             },
             connectionStatus: reportSubject // this is not related to report status for the retrasmission service
         }
-        // let connectionStatus: Record<string, ConnectionStatus> = {
-        //     localStatus: ConnectionStatus.RED,
-        //     remoteStatus: ConnectionStatus.RED
-        // }
 
         // This is default connection
         if (!request.server.connectionType) {
@@ -45,34 +61,7 @@ export class ServerClientManager {
             this.grpcService.create(request, connectionAttribute)
             this.connectionAttributes.push(connectionAttribute)
         }
-        // Check to make sure both the connection at local and remote is up, then report to retransmission service to release message
-        // connectionAttribute.connectionStatus.subscribe({
-        //     // Check this....
-        //     next: (element: any) => {
-        //         if (element.message == 'Local Server Communication Established') {
-        //             connectionStatus.localStatus = ConnectionStatus.GREEN
-        //             console.log(`Local alive`)
-        //         }
-        //         if (element.message == 'Remote Server Communication Established') {
-        //             connectionStatus.remoteStatus = ConnectionStatus.GREEN
-        //             console.log(`Remote alive`)
-        //         }
-
-        // if (connectionStatus.localStatus == ConnectionStatus.GREEN && connectionStatus.remoteStatus == ConnectionStatus.GREEN) {
-        //     let report: ReportStatus = {
-        //         code: ColorCode.GREEN,
-        //         message: `Both Local and Remote are connected`
-        //     }
-        //     // reportSubject.next(report)
-        //     console.log(`Both local and remote alive`)
-        // }
-        // },
-        // error: (err) => console.error(err),
-        // complete: () => { }
-        // })
-
     }
 
-
 }
 

+ 259 - 45
test/grpc1.ts

@@ -1,4 +1,4 @@
-import { Subject, take } from 'rxjs';
+import { Subject, from, take } from 'rxjs';
 import { Message, MessageLog, ConnectionRequest } from '../interfaces/general.interface';
 import { GrpcServiceMethod } from '../services/grpc.service.method';
 import { readFileSync } from 'fs';
@@ -11,40 +11,44 @@ let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages g
 let targetserver: string = 'localhost:3001'
 let targetserver2: string = 'localhost:3002'
 let hostServer: string = 'localhost:3000'
-let array: any[] = [] // Used for testing                     
+let array: any[] = [] // Used for testing    
+let intervalToStreamOutGoingMessage: number = 1
+
+
+/* Simple Test: 1 to 1 */
 let connectionRequest: ConnectionRequest = {
-  database: 'grpc1',
   server: {
+    name: 'g1',
     serverUrl: hostServer,
     connectionType: 'GRPC',
     messageToBePublishedfromApplication: new Subject<Message>()
   },
   client: {
+    name: 'g2',
     targetServer: targetserver,
     connectionType: 'GRPC',
     messageToBeReceivedFromRemote: new Subject<Message>()
   }
 }
 
-// Handler for the incoming Messages from the other side. 
-connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
-  next: request => {
-    // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
-    if ((request.message as MessageLog).appData.msgPayload == 'Query') {
-      generateFakeStreamResponse(request).subscribe({
-        next: (responseMessage: Message) => {
-          // console.log(`Processing request:${request.id}....`)
-          connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
-        },
-        error: error => console.error(error),
-        complete: () => {
-          console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
-        }
-      })
-    } else {
-      array.push(request)
-      console.log(`Received messages from the other side: ${(request.message as MessageLog).appData.msgId}`)
+connectionService.generateConnection(connectionRequest)
+
+// let generateFakeMessagesToBePublished = stream().pipe(take(1000))
+let generateFakeMessagesToBePublished = from(parsedMessages).pipe(take(3000))
+generateFakeMessagesToBePublished.subscribe({
+  next: message => {
+    let payload: Message = {
+      id: hostServer,
+      message: message
     }
+    connectionRequest.server.messageToBePublishedfromApplication.next(payload)
+  }
+})
+
+connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
+  next: response => {
+    console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+    array.push(response)
   },
   error: error => console.error(error),
   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
@@ -52,9 +56,98 @@ connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
 
 
 
-connectionService.generateConnection(connectionRequest)
+/* Complex Test: 1 to 1*/
+// let connectionRequest: ConnectionRequest = {
+//   server: {
+//     name: 'g1',
+//     serverUrl: hostServer,
+//     connectionType: 'GRPC',
+//     messageToBePublishedfromApplication: new Subject<Message>()
+//   },
+//   client: {
+//     name: 'g2',
+//     targetServer: targetserver,
+//     connectionType: 'GRPC',
+//     messageToBeReceivedFromRemote: new Subject<Message>()
+//   }
+// }
+
+// connectionService.generateConnection(connectionRequest)
+
+// setTimeout(() => {
+//   let message = {
+//     id: parsedMessages[10].appData.msgId,
+//     message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
+//   }
+//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
+// }, 3000)
+// setTimeout(() => {
+//   let message = {
+//     id: parsedMessages[11].appData.msgId,
+//     message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
+//   }
+//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
+// }, 4000)
+
+// Handler for the incoming Messages from the other side. 
+// connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
+//   next: request => {
+//     // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
+//     if ((request.message as MessageLog).appData.msgPayload == 'Query') {
+//       generateFakeStreamResponse(request).subscribe({
+//         next: (responseMessage: Message) => {
+//               console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+//           connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
+//         },
+//         error: error => console.error(error),
+//         complete: () => {
+//           console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
+//         }
+//       })
+//     } else {
+//       array.push(request)
+//       console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
+//     }
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
+
+
+
+/* Simple Test: 1 to Many */
+// let connectionRequest: ConnectionRequest = {
+//   server: {
+//     name: 'g1',
+//     serverUrl: hostServer,
+//     connectionType: 'GRPC',
+//     messageToBePublishedfromApplication: new Subject<Message>()
+//   },
+//   client: {
+//     name: 'g2',
+//     targetServer: targetserver,
+//     connectionType: 'GRPC',
+//     messageToBeReceivedFromRemote: new Subject<Message>()
+//   }
+// }
+// let connectionRequest2: ConnectionRequest = {
+//   server: {
+//     name: 'g1',
+//     serverUrl: hostServer,
+//     connectionType: 'GRPC',
+//     messageToBePublishedfromApplication: new Subject<Message>()
+//   },
+//   client: {
+//     name: 'g3',
+//     targetServer: targetserver2,
+//     connectionType: 'GRPC',
+//     messageToBeReceivedFromRemote: new Subject<Message>()
+//   }
+// }
+
+// connectionService.generateConnection(connectionRequest)
+// connectionService.generateConnection(connectionRequest2)
 
-/* Simple Test */
 // let generateFakeMessagesToBePublished = stream().pipe(take(10))
 // generateFakeMessagesToBePublished.subscribe({
 //   next: message => {
@@ -65,29 +158,136 @@ connectionService.generateConnection(connectionRequest)
 //     connectionRequest.server.messageToBePublishedfromApplication.next(payload)
 //   }
 // })
+// connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
+//   next: request => {
+//     console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+
+//     array.push(request)
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
+
+// connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
+//   next: request => {
+// console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+//     array.push(request)
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
+
+// connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({
+//   next: request => {
+//     array.push(request)
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
+
+
+
+
+
+
+/* Complex Test: 1 to Many */
+// let connectionRequest: ConnectionRequest = {
+//   server: {
+//     name: 'g1',
+//     serverUrl: hostServer,
+//     connectionType: 'GRPC',
+//     messageToBePublishedfromApplication: new Subject<Message>()
+//   },
+//   client: {
+//     name: 'g2',
+//     targetServer: targetserver,
+//     connectionType: 'GRPC',
+//     messageToBeReceivedFromRemote: new Subject<Message>()
+//   }
+// }
+// let connectionRequest2: ConnectionRequest = {
+//   server: {
+//     name: 'g1',
+//     serverUrl: hostServer,
+//     connectionType: 'GRPC',
+//     messageToBePublishedfromApplication: new Subject<Message>()
+//   },
+//   client: {
+//     name: 'g3',
+//     targetServer: targetserver2,
+//     connectionType: 'GRPC',
+//     messageToBeReceivedFromRemote: new Subject<Message>()
+//   }
+// }
+
+// connectionService.generateConnection(connectionRequest)
+// connectionService.generateConnection(connectionRequest2)
+
+// setTimeout(() => {
+//   let message = {
+//     id: parsedMessages[10].appData.msgId,
+//     message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
+//   }
+//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
+//   connectionRequest2.server.messageToBePublishedfromApplication.next(message)
+// }, 3000)
+// setTimeout(() => {
+//   let message = {
+//     id: parsedMessages[11].appData.msgId,
+//     message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
+//   }
+//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
+//   connectionRequest2.server.messageToBePublishedfromApplication.next(message)
+// }, 4000)
+
+// connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
+//   next: request => {
+//     // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is
+//     if ((request.message as MessageLog).appData.msgPayload == 'Query') {
+//       generateFakeStreamResponse(request).subscribe({
+//         next: (responseMessage: Message) => {
+//           connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
+//         },
+//         error: error => console.error(error),
+//         complete: () => {
+//           console.log(`Stream request for ${request.id} is queued.`) // should be indefinite
+//         }
+//       })
+//     } else {
+// console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+
+//       array.push(request)
+//       console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
+//     }
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
+// connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({
+//   next: request => {
+//     // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is
+//     if ((request.message as MessageLog).appData.msgPayload == 'Query') {
+//       generateFakeStreamResponse(request).subscribe({
+//         next: (responseMessage: Message) => {
+//           connectionRequest2.server.messageToBePublishedfromApplication.next(responseMessage)
+//         },
+//         error: error => console.error(error),
+//         complete: () => {
+//           console.log(`Stream request for ${request.id} is queued.`) // should be indefinite
+//         }
+//       })
+//     } else {
+//     console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+
+//       array.push(request)
+//       console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
+//     }
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
 
 
-/* Complex Test: Expected out come, both must receive 14 message by the end. Havent try to disconnect.*/
-setTimeout(() => {
-  let message = {
-    id: parsedMessages[10].appData.msgId,
-    message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
-  }
-  connectionRequest.server.messageToBePublishedfromApplication.next(message)
-}, 3000)
-setTimeout(() => {
-  let message = {
-    id: parsedMessages[11].appData.msgId,
-    message: parsedMessages[11]// Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
-  }
-  connectionRequest.server.messageToBePublishedfromApplication.next(message)
-}, 4000)
-setTimeout(() => {
-  console.log(`All received data: ${array.length}`)
-}, 10000)
-setTimeout(() => {
-  console.log(`All received data: ${array.length}`)
-}, 20000)
 
 
 // this is just to publish an array of fake data as a Subject
@@ -102,7 +302,7 @@ function stream(): Subject<any> {
       clearInterval(intervalId);
       result.complete();
     }
-  }, 500)
+  }, intervalToStreamOutGoingMessage)
   return result
 }
 
@@ -122,3 +322,17 @@ function generateFakeStreamResponse(request: any): Subject<any> {
   })
   return res
 }
+
+/* Checking the values by the end of the test */
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 5000)
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 10000)
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 15000)
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 20000)

+ 168 - 0
test/grpc1.v2.bak

@@ -0,0 +1,168 @@
+import { Subject, take } from 'rxjs';
+import { GrpcServiceMethod } from '../services/grpc.service.method';
+import { readFileSync } from 'fs';
+import { ConnectionRequest, Message } from '../interfaces/general.interface';
+import { ServerClientManager } from '../services/server-client.service';
+
+// Subject for bidirectional communication
+const connectionService: ServerClientManager = new ServerClientManager(new GrpcServiceMethod())
+const messagesJSON: any = readFileSync('payload.json')
+let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial 
+let targetserver: string = 'localhost:3001'
+let targetserver2: string = 'localhost:3002'
+let hostServer: string = 'localhost:3000'
+let array: any[] = [] // Used for testing                     
+let connectionRequest: ConnectionRequest = {
+  server: {
+    name: 'grpc1',
+    serverUrl: hostServer,
+    connectionType: 'GRPC',
+    messageToBePublishedfromApplication: new Subject<Message>()
+  },
+  client: [{
+    name: 'grpc2',
+    targetServer: targetserver,
+    connectionType: 'GRPC',
+    messageToBeReceivedFromRemote: new Subject<Message>()
+  }]
+}
+let client :ConnectionAttribute[] = [ 
+{ 
+  name:"con1"
+  ConnectionID: "aaa123-xxx123",
+  outGoing: {
+    Name?: string,
+    ChannelID?: "aaa123",
+    PublisherID?: "bbb123",
+    SubscriberID?: "ccc123", 
+}
+,
+  inComing: {
+    Name?: string,
+    ChannelID?: "xxx123",
+    PublisherID?: "yyy123",
+    SubscriberID?: "zzz123", 
+}
+,
+  connectionStatus: Subject<ReportStatus>
+}
+
+{ 
+  name:"con2"
+  ConnectionID: "aaa123xxx-xxx123xx",
+  outGoing: {
+    Name?: string,
+    ChannelID?: "aaa123xxx",
+    PublisherID?: "bbb123",
+    SubscriberID?: "ccc123xxx", 
+}
+,
+  inComing: {
+    Name?: string,
+    ChannelID?: "xxx123xx",
+    PublisherID?: "yyy123xxx",
+    SubscriberID?: "zzz123xxx", 
+}
+,
+  connectionStatus: Subject<ReportStatus>
+}
+]
+
+// Handler for the incoming Messages from the other side. 
+connectionRequest.client.forEach((client) => {
+  client.messageToBeReceivedFromRemote.subscribe({
+    next: request => {
+      // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
+      if ((request.message as MessageLog).appData.msgPayload == 'Query') {
+        generateFakeStreamResponse(request).subscribe({
+          next: (responseMessage: Message) => {
+            // console.log(`Processing request:${request.id}....`)
+            connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
+          },
+          error: error => console.error(error),
+          complete: () => {
+            console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
+          }
+        })
+      } else {
+        array.push(request)
+        console.log(`Received messages from the other side: ${(request.message as MessageLog).appData.msgId}`)
+      }
+    },
+    error: error => console.error(error),
+    complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+  })
+})
+
+
+
+connectionService.generateConnection(connectionRequest)
+
+/* Simple Test */
+// let generateFakeMessagesToBePublished = stream().pipe(take(10))
+// generateFakeMessagesToBePublished.subscribe({
+//   next: message => {
+//     let payload: Message = {
+//       id: hostServer,
+//       message: message
+//     }
+//     connectionRequest.server.messageToBePublishedfromApplication.next(payload)
+//   }
+// })
+
+
+/* Complex Test: Expected out come, both must receive 14 message by the end. Havent try to disconnect.*/
+setTimeout(() => {
+  let message = {
+    id: parsedMessages[10].appData.msgId,
+    message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
+  }
+  connectionRequest.server.messageToBePublishedfromApplication.next(message)
+}, 3000)
+setTimeout(() => {
+  let message = {
+    id: parsedMessages[11].appData.msgId,
+    message: parsedMessages[11]// Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
+  }
+  connectionRequest.server.messageToBePublishedfromApplication.next(message)
+}, 4000)
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 10000)
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 20000)
+
+
+// this is just to publish an array of fake data as a Subject
+function stream(): Subject<any> {
+  let result: Subject<any> = new Subject()
+  let messages: any[] = parsedMessages
+  let count = 0
+  const intervalId = setInterval(() => {
+    result.next(messages[count]);
+    count++;
+    if (count >= 1000) {
+      clearInterval(intervalId);
+      result.complete();
+    }
+  }, 500)
+  return result
+}
+
+
+function generateFakeStreamResponse(request: any): Subject<any> {
+  let res: Subject<any> = new Subject()
+  stream().pipe(take(7)).subscribe({
+    next: element => {
+      let message = {
+        id: request.id, // Caller's 
+        message: element
+      }
+      res.next(message)
+    },
+    error: error => console.error(error),
+    complete: () => console.log(`Stream response for ${request.id} has been prepared.`)
+  })
+  return res
+}

+ 265 - 45
test/grpc2.ts

@@ -1,4 +1,4 @@
-import { Subject, take } from 'rxjs';
+import { Subject, from, take } from 'rxjs';
 import { Message, MessageLog, ConnectionRequest } from '../interfaces/general.interface';
 import { GrpcServiceMethod } from '../services/grpc.service.method';
 import { readFileSync } from 'fs';
@@ -11,40 +11,49 @@ let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages g
 let targetserver: string = 'localhost:3000'
 let targetserver2: string = 'localhost:3002'
 let hostServer: string = 'localhost:3001'
-let array: any[] = [] // Used for testing                     
+let array: any[] = [] // Used for testing    
+let intervalToStreamOutGoingMessage: number = 1
+
+
+/* Simple Test: 1 to 1 */
 let connectionRequest: ConnectionRequest = {
-  database: 'grpc2',
   server: {
+    name: 'g2',
     serverUrl: hostServer,
     connectionType: 'GRPC',
     messageToBePublishedfromApplication: new Subject<Message>()
   },
   client: {
+    name: 'g1',
     targetServer: targetserver,
     connectionType: 'GRPC',
     messageToBeReceivedFromRemote: new Subject<Message>()
   }
 }
 
-// Handler for the incoming Messages from the other side. 
+connectionService.generateConnection(connectionRequest)
+// 10000th message == 848438e1-da50-4d98-aa12-e44d6d6a1489
+
+// let generateFakeMessagesToBePublished = stream().pipe(take(1000))
+let generateFakeMessagesToBePublished = from(parsedMessages).pipe(take(20000))
+generateFakeMessagesToBePublished.subscribe({
+  next: message => {
+    let payload: Message = {
+      id: hostServer,
+      message: message
+    }
+    // connectionRequest.server.messageToBePublishedfromApplication.next(payload)
+  }
+})
+
 connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
-  next: request => {
-    // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
-    if ((request.message as MessageLog).appData.msgPayload == 'Query') {
-      generateFakeStreamResponse(request).subscribe({
-        next: (responseMessage: Message) => {
-          // console.log(`Processing request:${request.id}....`)
-          connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
-        },
-        error: error => console.error(error),
-        complete: () => {
-          console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
-        }
-      })
-    } else {
-      array.push(request)
-      console.log(`Received messages from the other side: ${(request.message as MessageLog).appData.msgId}`)
+  next: response => {
+    if((response.message as MessageLog).appData.msgId == `ebf94479-44fe-470d-827c-9f1389396d6a`){
+      console.log(`Received the 1000th message. Running the test. Initiating server restart....`)
+      connectionService.restartServerInDuration(10)
     }
+    console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+    array.push(response)
   },
   error: error => console.error(error),
   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
@@ -52,9 +61,98 @@ connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
 
 
 
-connectionService.generateConnection(connectionRequest)
+/* Complex Test: 1 to 1*/
+// let connectionRequest: ConnectionRequest = {
+//   server: {
+//     name: 'g1',
+//     serverUrl: hostServer,
+//     connectionType: 'GRPC',
+//     messageToBePublishedfromApplication: new Subject<Message>()
+//   },
+//   client: {
+//     name: 'g2',
+//     targetServer: targetserver,
+//     connectionType: 'GRPC',
+//     messageToBeReceivedFromRemote: new Subject<Message>()
+//   }
+// }
+
+// connectionService.generateConnection(connectionRequest)
+
+// setTimeout(() => {
+//   let message = {
+//     id: parsedMessages[10].appData.msgId,
+//     message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
+//   }
+//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
+// }, 3000)
+// setTimeout(() => {
+//   let message = {
+//     id: parsedMessages[11].appData.msgId,
+//     message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
+//   }
+//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
+// }, 4000)
+
+// Handler for the incoming Messages from the other side. 
+// connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
+//   next: request => {
+//     // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
+//     if ((request.message as MessageLog).appData.msgPayload == 'Query') {
+//       generateFakeStreamResponse(request).subscribe({
+//         next: (responseMessage: Message) => {
+//               console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+//           connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
+//         },
+//         error: error => console.error(error),
+//         complete: () => {
+//           console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
+//         }
+//       })
+//     } else {
+//       array.push(request)
+//       console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
+//     }
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
+
+
+
+/* Simple Test: 1 to Many */
+// let connectionRequest: ConnectionRequest = {
+//   server: {
+//     name: 'g1',
+//     serverUrl: hostServer,
+//     connectionType: 'GRPC',
+//     messageToBePublishedfromApplication: new Subject<Message>()
+//   },
+//   client: {
+//     name: 'g2',
+//     targetServer: targetserver,
+//     connectionType: 'GRPC',
+//     messageToBeReceivedFromRemote: new Subject<Message>()
+//   }
+// }
+// let connectionRequest2: ConnectionRequest = {
+//   server: {
+//     name: 'g1',
+//     serverUrl: hostServer,
+//     connectionType: 'GRPC',
+//     messageToBePublishedfromApplication: new Subject<Message>()
+//   },
+//   client: {
+//     name: 'g3',
+//     targetServer: targetserver2,
+//     connectionType: 'GRPC',
+//     messageToBeReceivedFromRemote: new Subject<Message>()
+//   }
+// }
+
+// connectionService.generateConnection(connectionRequest)
+// connectionService.generateConnection(connectionRequest2)
 
-/* Simple Test */
 // let generateFakeMessagesToBePublished = stream().pipe(take(10))
 // generateFakeMessagesToBePublished.subscribe({
 //   next: message => {
@@ -65,29 +163,136 @@ connectionService.generateConnection(connectionRequest)
 //     connectionRequest.server.messageToBePublishedfromApplication.next(payload)
 //   }
 // })
+// connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
+//   next: request => {
+//     console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+
+//     array.push(request)
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
+
+// connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
+//   next: request => {
+// console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+//     array.push(request)
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
+
+// connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({
+//   next: request => {
+//     array.push(request)
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
+
+
+
+
+
+
+/* Complex Test: 1 to Many */
+// let connectionRequest: ConnectionRequest = {
+//   server: {
+//     name: 'g1',
+//     serverUrl: hostServer,
+//     connectionType: 'GRPC',
+//     messageToBePublishedfromApplication: new Subject<Message>()
+//   },
+//   client: {
+//     name: 'g2',
+//     targetServer: targetserver,
+//     connectionType: 'GRPC',
+//     messageToBeReceivedFromRemote: new Subject<Message>()
+//   }
+// }
+// let connectionRequest2: ConnectionRequest = {
+//   server: {
+//     name: 'g1',
+//     serverUrl: hostServer,
+//     connectionType: 'GRPC',
+//     messageToBePublishedfromApplication: new Subject<Message>()
+//   },
+//   client: {
+//     name: 'g3',
+//     targetServer: targetserver2,
+//     connectionType: 'GRPC',
+//     messageToBeReceivedFromRemote: new Subject<Message>()
+//   }
+// }
+
+// connectionService.generateConnection(connectionRequest)
+// connectionService.generateConnection(connectionRequest2)
+
+// setTimeout(() => {
+//   let message = {
+//     id: parsedMessages[10].appData.msgId,
+//     message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
+//   }
+//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
+//   connectionRequest2.server.messageToBePublishedfromApplication.next(message)
+// }, 3000)
+// setTimeout(() => {
+//   let message = {
+//     id: parsedMessages[11].appData.msgId,
+//     message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
+//   }
+//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
+//   connectionRequest2.server.messageToBePublishedfromApplication.next(message)
+// }, 4000)
+
+// connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
+//   next: request => {
+//     // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is
+//     if ((request.message as MessageLog).appData.msgPayload == 'Query') {
+//       generateFakeStreamResponse(request).subscribe({
+//         next: (responseMessage: Message) => {
+//           connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
+//         },
+//         error: error => console.error(error),
+//         complete: () => {
+//           console.log(`Stream request for ${request.id} is queued.`) // should be indefinite
+//         }
+//       })
+//     } else {
+// console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+
+//       array.push(request)
+//       console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
+//     }
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
+// connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({
+//   next: request => {
+//     // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is
+//     if ((request.message as MessageLog).appData.msgPayload == 'Query') {
+//       generateFakeStreamResponse(request).subscribe({
+//         next: (responseMessage: Message) => {
+//           connectionRequest2.server.messageToBePublishedfromApplication.next(responseMessage)
+//         },
+//         error: error => console.error(error),
+//         complete: () => {
+//           console.log(`Stream request for ${request.id} is queued.`) // should be indefinite
+//         }
+//       })
+//     } else {
+//     console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+
+//       array.push(request)
+//       console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
+//     }
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
 
 
-/* Complex Test: Expected out come, both must receive 14 message by the end. Havent try to disconnect.*/
-setTimeout(() => {
-  let message = {
-    id: parsedMessages[10].appData.msgId,
-    message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
-  }
-  connectionRequest.server.messageToBePublishedfromApplication.next(message)
-}, 3000)
-setTimeout(() => {
-  let message = {
-    id: parsedMessages[11].appData.msgId,
-    message: parsedMessages[11]// Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
-  }
-  connectionRequest.server.messageToBePublishedfromApplication.next(message)
-}, 4000)
-setTimeout(() => {
-  console.log(`All received data: ${array.length}`)
-}, 10000)
-setTimeout(() => {
-  console.log(`All received data: ${array.length}`)
-}, 20000)
 
 
 // this is just to publish an array of fake data as a Subject
@@ -98,11 +303,11 @@ function stream(): Subject<any> {
   const intervalId = setInterval(() => {
     result.next(messages[count]);
     count++;
-    if (count >= 1000) {
+    if (count >= 10000) {
       clearInterval(intervalId);
       result.complete();
     }
-  }, 500)
+  }, intervalToStreamOutGoingMessage)
   return result
 }
 
@@ -122,3 +327,18 @@ function generateFakeStreamResponse(request: any): Subject<any> {
   })
   return res
 }
+
+/* Checking the values by the end of the test */
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 5000)
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 10000)
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 15000)
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 20000)
+

+ 257 - 46
test/grpc3.ts

@@ -8,43 +8,46 @@ import { ServerClientManager } from '../services/server-client.service';
 const connectionService: ServerClientManager = new ServerClientManager(new GrpcServiceMethod())
 const messagesJSON: any = readFileSync('payload.json')
 let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial 
-let targetserver: string = 'localhost:3001'
-let targetserver2: string = 'localhost:3002'
+let targetserver: string = 'localhost:3000'
+let targetserver2: string = 'localhost:3001'
 let hostServer: string = 'localhost:3002'
-let array: any[] = [] // Used for testing                     
+let array: any[] = [] // Used for testing    
+let intervalToStreamOutGoingMessage: number = 10
+
+
+/* Simple Test: 1 to 1 */
 let connectionRequest: ConnectionRequest = {
-  database: 'grpc2',
   server: {
+    name: 'g2',
     serverUrl: hostServer,
     connectionType: 'GRPC',
     messageToBePublishedfromApplication: new Subject<Message>()
   },
   client: {
+    name: 'g1',
     targetServer: targetserver,
     connectionType: 'GRPC',
     messageToBeReceivedFromRemote: new Subject<Message>()
   }
 }
 
-// Handler for the incoming Messages from the other side. 
-connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
-  next: request => {
-    // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
-    if ((request.message as MessageLog).appData.msgPayload == 'Query') {
-      generateFakeStreamResponse(request).subscribe({
-        next: (responseMessage: Message) => {
-          // console.log(`Processing request:${request.id}....`)
-          connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
-        },
-        error: error => console.error(error),
-        complete: () => {
-          console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
-        }
-      })
-    } else {
-      array.push(request)
-      console.log(`Received messages from the other side: ${(request.message as MessageLog).appData.msgId}`)
+connectionService.generateConnection(connectionRequest)
+
+let generateFakeMessagesToBePublished = stream().pipe(take(10))
+generateFakeMessagesToBePublished.subscribe({
+  next: message => {
+    let payload: Message = {
+      id: hostServer,
+      message: message
     }
+    connectionRequest.server.messageToBePublishedfromApplication.next(payload)
+  }
+})
+
+connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
+  next: response => {
+    console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+    array.push(response)
   },
   error: error => console.error(error),
   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
@@ -52,9 +55,98 @@ connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
 
 
 
-connectionService.generateConnection(connectionRequest)
+/* Complex Test: 1 to 1*/
+// let connectionRequest: ConnectionRequest = {
+//   server: {
+//     name: 'g1',
+//     serverUrl: hostServer,
+//     connectionType: 'GRPC',
+//     messageToBePublishedfromApplication: new Subject<Message>()
+//   },
+//   client: {
+//     name: 'g2',
+//     targetServer: targetserver,
+//     connectionType: 'GRPC',
+//     messageToBeReceivedFromRemote: new Subject<Message>()
+//   }
+// }
+
+// connectionService.generateConnection(connectionRequest)
+
+// setTimeout(() => {
+//   let message = {
+//     id: parsedMessages[10].appData.msgId,
+//     message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
+//   }
+//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
+// }, 3000)
+// setTimeout(() => {
+//   let message = {
+//     id: parsedMessages[11].appData.msgId,
+//     message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
+//   }
+//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
+// }, 4000)
+
+// Handler for the incoming Messages from the other side. 
+// connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
+//   next: request => {
+//     // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
+//     if ((request.message as MessageLog).appData.msgPayload == 'Query') {
+//       generateFakeStreamResponse(request).subscribe({
+//         next: (responseMessage: Message) => {
+//               console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+//           connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
+//         },
+//         error: error => console.error(error),
+//         complete: () => {
+//           console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
+//         }
+//       })
+//     } else {
+//       array.push(request)
+//       console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
+//     }
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
+
+
+
+/* Simple Test: 1 to Many */
+// let connectionRequest: ConnectionRequest = {
+//   server: {
+//     name: 'g1',
+//     serverUrl: hostServer,
+//     connectionType: 'GRPC',
+//     messageToBePublishedfromApplication: new Subject<Message>()
+//   },
+//   client: {
+//     name: 'g2',
+//     targetServer: targetserver,
+//     connectionType: 'GRPC',
+//     messageToBeReceivedFromRemote: new Subject<Message>()
+//   }
+// }
+// let connectionRequest2: ConnectionRequest = {
+//   server: {
+//     name: 'g1',
+//     serverUrl: hostServer,
+//     connectionType: 'GRPC',
+//     messageToBePublishedfromApplication: new Subject<Message>()
+//   },
+//   client: {
+//     name: 'g3',
+//     targetServer: targetserver2,
+//     connectionType: 'GRPC',
+//     messageToBeReceivedFromRemote: new Subject<Message>()
+//   }
+// }
+
+// connectionService.generateConnection(connectionRequest)
+// connectionService.generateConnection(connectionRequest2)
 
-/* Simple Test */
 // let generateFakeMessagesToBePublished = stream().pipe(take(10))
 // generateFakeMessagesToBePublished.subscribe({
 //   next: message => {
@@ -65,29 +157,136 @@ connectionService.generateConnection(connectionRequest)
 //     connectionRequest.server.messageToBePublishedfromApplication.next(payload)
 //   }
 // })
+// connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
+//   next: request => {
+//     console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+
+//     array.push(request)
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
+
+// connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
+//   next: request => {
+// console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+//     array.push(request)
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
+
+// connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({
+//   next: request => {
+//     array.push(request)
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
+
+
+
+
+
+
+/* Complex Test: 1 to Many */
+// let connectionRequest: ConnectionRequest = {
+//   server: {
+//     name: 'g1',
+//     serverUrl: hostServer,
+//     connectionType: 'GRPC',
+//     messageToBePublishedfromApplication: new Subject<Message>()
+//   },
+//   client: {
+//     name: 'g2',
+//     targetServer: targetserver,
+//     connectionType: 'GRPC',
+//     messageToBeReceivedFromRemote: new Subject<Message>()
+//   }
+// }
+// let connectionRequest2: ConnectionRequest = {
+//   server: {
+//     name: 'g1',
+//     serverUrl: hostServer,
+//     connectionType: 'GRPC',
+//     messageToBePublishedfromApplication: new Subject<Message>()
+//   },
+//   client: {
+//     name: 'g3',
+//     targetServer: targetserver2,
+//     connectionType: 'GRPC',
+//     messageToBeReceivedFromRemote: new Subject<Message>()
+//   }
+// }
+
+// connectionService.generateConnection(connectionRequest)
+// connectionService.generateConnection(connectionRequest2)
+
+// setTimeout(() => {
+//   let message = {
+//     id: parsedMessages[10].appData.msgId,
+//     message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
+//   }
+//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
+//   connectionRequest2.server.messageToBePublishedfromApplication.next(message)
+// }, 3000)
+// setTimeout(() => {
+//   let message = {
+//     id: parsedMessages[11].appData.msgId,
+//     message: parsedMessages[11] // Choose this number, because i purposely use the 12th message and change the msgPayload property to query to emulate a request
+//   }
+//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
+//   connectionRequest2.server.messageToBePublishedfromApplication.next(message)
+// }, 4000)
+
+// connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
+//   next: request => {
+//     // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is
+//     if ((request.message as MessageLog).appData.msgPayload == 'Query') {
+//       generateFakeStreamResponse(request).subscribe({
+//         next: (responseMessage: Message) => {
+//           connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
+//         },
+//         error: error => console.error(error),
+//         complete: () => {
+//           console.log(`Stream request for ${request.id} is queued.`) // should be indefinite
+//         }
+//       })
+//     } else {
+// console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+
+//       array.push(request)
+//       console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
+//     }
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
+// connectionRequest2.client.messageToBeReceivedFromRemote.subscribe({
+//   next: request => {
+//     // Application logic comes here. This is where the asortment takes place, of deciding whose messages it belongs of what it is
+//     if ((request.message as MessageLog).appData.msgPayload == 'Query') {
+//       generateFakeStreamResponse(request).subscribe({
+//         next: (responseMessage: Message) => {
+//           connectionRequest2.server.messageToBePublishedfromApplication.next(responseMessage)
+//         },
+//         error: error => console.error(error),
+//         complete: () => {
+//           console.log(`Stream request for ${request.id} is queued.`) // should be indefinite
+//         }
+//       })
+//     } else {
+//     console.log(`Received ${(response.message as MessageLog).appData.msgId} from ${connectionRequest.client.targetServer}`)
+
+//       array.push(request)
+//       console.log(`Received message: ${(request.message as MessageLog).appData.msgId} from ${request.id}`)
+//     }
+//   },
+//   error: error => console.error(error),
+//   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
+// })
 
 
-/* Complex Test: Expected out come, both must receive 14 message by the end. Havent try to disconnect.*/
-setTimeout(() => {
-  let message = {
-    id: parsedMessages[10].appData.msgId,
-    message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
-  }
-  connectionRequest.server.messageToBePublishedfromApplication.next(message)
-}, 3000)
-setTimeout(() => {
-  let message = {
-    id: parsedMessages[11].appData.msgId,
-    message: parsedMessages[11]// Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
-  }
-  connectionRequest.server.messageToBePublishedfromApplication.next(message)
-}, 4000)
-setTimeout(() => {
-  console.log(`All received data: ${array.length}`)
-}, 10000)
-setTimeout(() => {
-  console.log(`All received data: ${array.length}`)
-}, 20000)
 
 
 // this is just to publish an array of fake data as a Subject
@@ -102,7 +301,7 @@ function stream(): Subject<any> {
       clearInterval(intervalId);
       result.complete();
     }
-  }, 500)
+  }, intervalToStreamOutGoingMessage)
   return result
 }
 
@@ -122,3 +321,15 @@ function generateFakeStreamResponse(request: any): Subject<any> {
   })
   return res
 }
+
+/* Checking the values by the end of the test */
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 5000)
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 10000)
+setTimeout(() => {
+  console.log(`All received data: ${array.length}`)
+}, 15000)
+