Browse Source

connection attribution

enzo 1 year ago
parent
commit
61afbae1ee

+ 0 - 0
killgrpc.bat → build.bat


+ 1 - 1
index.ts

@@ -1,7 +1,7 @@
 import { Subject, from } from 'rxjs';
 import * as fs from 'fs'
 export * from './services/fis.retransmission.service';
-export * from './services/grpc.service';
+export * from './services/server-client.service';
 
 const messagesJSON: any = fs.readFileSync('payload.json')
 let parsedMessages = JSON.parse(messagesJSON)

+ 46 - 4
interfaces/general.interface.ts

@@ -1,5 +1,7 @@
 /* General interface used for office work/ */
 
+import { Subject } from "rxjs"
+
 
 export enum ColorCode {
     'GREEN' = 'GREEN',
@@ -7,18 +9,23 @@ export enum ColorCode {
     'RED' = 'RED'
 }
 
+export enum ConnectionStatus {
+    'GREEN' = 'GREEN',
+    'YELLOW' = 'YELLOW',
+    'RED' = 'RED'
+}
 export interface messageTransmissionInterface {
     id?: string,
     state: '' | 'attempt to send' | 'failed sent' | 'sent successfully',
     date?: Date,
     msg: string
 }
-export interface MessageLog { // this one specifically for office work case only. FIS copyright LOL
+export interface MessageLog {
     appLogLocId: string,
     appData: {
         msgId: string,
         msgLogDateTime: string,
-        msgDateTime: string,s
+        msgDateTime: string, s
         msgTag: string[],
         msgPayload: string
     }
@@ -32,12 +39,10 @@ export interface ReportStatus {
     code: ColorCode,
     message: string,
     payload?: any,
-    from: string | any
 }
 // https://grpc.io/docs/what-is-grpc/core-concepts/
 export interface GrpcConnectionType {
     instanceType: '' | 'server' | 'client'
-    serviceMethod: '' | 'unary' | 'server streaming' | 'client streaming' | 'bidirectional'
 }
 
 export interface Message {
@@ -47,3 +52,40 @@ export interface Message {
 
 export type Status = -1 | 0 | 1 // For status chain effect
 
+
+export interface ConnectionAttribute {
+    ConnectionID: ConnectionID,
+    outGoing: ChannelAttribute,
+    inComing: ChannelAttribute,
+    connectionStatus: Subject<ReportStatus>
+}
+export interface ChannelAttribute {
+    ChannelID?: string,
+    PublisherID?: string,
+    SubscriberID?: string,
+    PublisherInstance?: any,
+    SubscriberInstance?: any,
+    MessageToBePublished: Subject<Message> | null
+    MessageToBeReceived: Subject<Message> | null
+}
+
+export interface ConnectionRequest {
+    server: ServerRequest,
+    client: ClientRequest
+}
+
+export interface ServerRequest {
+    serverUrl: string,
+    connectionType: 'GRPC' | 'HTTP' | 'Socket',
+    messageToBePublishedfromApplication: Subject<Message>
+}
+export interface ClientRequest {
+    targetServer: string,
+    connectionType: 'GRPC' | 'HTTP' | 'Socket',
+    messageToBeReceivedFromRemote: Subject<Message>
+}
+
+export interface ConnectionID {
+    local: string,
+    remote: string
+}

+ 14 - 1
package-lock.json

@@ -17,7 +17,8 @@
         "lodash": "^4.17.21",
         "mongo": "^0.1.0",
         "mongoose": "^7.6.0",
-        "rxjs": "^7.8.1"
+        "rxjs": "^7.8.1",
+        "uuid": "^9.0.1"
       },
       "devDependencies": {
         "@types/node": "^20.6.0",
@@ -1191,6 +1192,18 @@
         "node": ">= 0.4.0"
       }
     },
+    "node_modules/uuid": {
+      "version": "9.0.1",
+      "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.1.tgz",
+      "integrity": "sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==",
+      "funding": [
+        "https://github.com/sponsors/broofa",
+        "https://github.com/sponsors/ctavan"
+      ],
+      "bin": {
+        "uuid": "dist/bin/uuid"
+      }
+    },
     "node_modules/vary": {
       "version": "1.1.2",
       "resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz",

+ 3 - 2
package.json

@@ -24,10 +24,11 @@
     "lodash": "^4.17.21",
     "mongo": "^0.1.0",
     "mongoose": "^7.6.0",
-    "rxjs": "^7.8.1"
+    "rxjs": "^7.8.1",
+    "uuid": "^9.0.1"
   },
   "devDependencies": {
     "@types/node": "^20.6.0",
     "typescript": "^5.2.2"
   }
-}
+}

+ 5 - 5
services/fis.retransmission.service.ts

@@ -1,6 +1,6 @@
 import * as _ from 'lodash'
 import mongoose, { Model, Schema } from 'mongoose';
-import { Observable, Subject, Subscription, from } from 'rxjs'
+import { BehaviorSubject, Observable, Subject, Subscription, from } from 'rxjs'
 import { ColorCode, Message, MessageLog, ReportStatus, Status } from '../interfaces/general.interface'
 require('dotenv').config();
 
@@ -11,6 +11,7 @@ export class FisRetransmissionService {
     private mongoConnection: any
     private messageModel: any
     private maximumBufferLength: number = parseInt(process.env.MaxBufferLoad as string) // please configure at environment
+    // private statusReport: Subject<ReportStatus> = new Subject()
 
     constructor() {
         // Connect to mongoDB. 
@@ -18,7 +19,7 @@ export class FisRetransmissionService {
     }
 
     // Main function that intercepts outgoing messages by communicating || intepreting report status from grpc connection as indicator 
-    public handleMessage(applicationOutgoingMessage: Subject<Message>, statusReport: Subject<ReportStatus>): Subject<Message> {
+    public handleMessage(applicationOutgoingMessage: Subject<Message>, statusReport: BehaviorSubject<ReportStatus>): Subject<Message> {
         let releaseMessageSubject: Subject<Message> = new Subject() // Every message subscribed from applicationOutgoingMessage will be released through this subject
         let messageReleaseSubscription: Subscription | null = null
         let messageBufferSubscription: Subscription | null = null
@@ -27,7 +28,7 @@ export class FisRetransmissionService {
         statusReport.subscribe((report: ReportStatus) => {
             /* Green should release all data from buffer and mongo and also redirect the applicationOutgoingMessage back into the return subject(releaseMessageSubject)
             if there's any. */
-            if (report.code == ColorCode.GREEN) { 
+            if (report.code == ColorCode.GREEN) {
                 console.log(`Connection status report && ${report.message ?? 'No Message'}`)
                 /* Status Chain begins */
                 let status: Status = 1
@@ -136,7 +137,6 @@ export class FisRetransmissionService {
                     let report: ReportStatus = {
                         code: ColorCode.RED,
                         message: `Buffer is exceeding limit. Initiate storage transfer to designated database.`,
-                        from: `Error Handling Service`
                     }
                     statusReport.next(report)
                 }
@@ -385,7 +385,7 @@ export class FisRetransmissionService {
                 });
             } else {
                 status = 0
-                console.log(`Error: Message Model is ${this.messageModel}!! Please set up the mongoose connectino properly!`)
+                console.log(`Error: Message Model is ${this.messageModel}!! Please set up the mongoose connection properly!`)
             }
         }
     }

+ 252 - 0
services/grpc.service.method.ts

@@ -0,0 +1,252 @@
+import * as grpc from '@grpc/grpc-js';
+import { Subject, Subscription } from "rxjs";
+import { ReportStatus, ColorCode, Message, MessageLog, ConnectionAttribute, ConnectionRequest, GrpcConnectionType } from "../interfaces/general.interface";
+import { Status } from '@grpc/grpc-js/build/src/constants';
+import { v4 as uuidv4 } from 'uuid'
+const message_proto = require('./protos/server.proto')
+export class GrpcServiceMethod {
+
+    public async create(request: ConnectionRequest, connectionAttribute: ConnectionAttribute): Promise<any> {
+        // Assuming currently only one client
+        this.createGrpcInstance(request.server.serverUrl, { instanceType: 'server' }, connectionAttribute)
+        this.createGrpcInstance(request.client.targetServer, { instanceType: 'client' }, connectionAttribute)
+    }
+
+    private async generateAdditionalAttributes(connectionAttribute: ConnectionAttribute, clientInfo?: any, localInfo?: any) {
+        if (clientInfo) {
+            connectionAttribute.inComing.ChannelID = clientInfo.channelID
+            connectionAttribute.inComing.PublisherID = clientInfo.publisherID
+            connectionAttribute.inComing.SubscriberID = clientInfo.subscriberID
+            // let report: any = {
+            //     message: 'Remote Server Communication Established',
+            //     channelID: clientInfo.channelID
+            // }
+            // connectionAttribute.connectionStatus.next(report)
+        }
+        if (localInfo) {
+            connectionAttribute.outGoing.ChannelID = localInfo.channelID
+            connectionAttribute.outGoing.PublisherID = localInfo.publisherID
+            connectionAttribute.outGoing.SubscriberID = localInfo.subscriberID
+            // let report: any = {
+            //     message: 'Local Server Communication Established',
+            //     channelID: localInfo.channelID
+            // }
+            // connectionAttribute.connectionStatus.next(report)
+        }
+        if (connectionAttribute.outGoing.ChannelID && connectionAttribute.inComing.ChannelID) {
+            connectionAttribute.ConnectionID.local = connectionAttribute.outGoing.ChannelID + connectionAttribute.inComing.ChannelID
+            connectionAttribute.ConnectionID.remote = connectionAttribute.inComing.ChannelID + connectionAttribute.outGoing.ChannelID
+            let report: ReportStatus = {
+                code: ColorCode.GREEN,
+                message: `ConnectionID: ${connectionAttribute.ConnectionID.local} && ${connectionAttribute.ConnectionID.remote} `,
+            }
+            connectionAttribute.connectionStatus.next(report)
+            console.log(connectionAttribute)
+        }
+    }
+
+    // To be migrated into a service in the immediate future
+    private async createGrpcInstance(
+        serverUrl: string,
+        grpcType: GrpcConnectionType,
+        connectionAttribute: ConnectionAttribute,
+    ) {
+        let statusControl: Subject<ReportStatus> = connectionAttribute.connectionStatus
+        let consecutiveResolutions = 0;
+        let lastResolutionTime = Date.now();
+        let alreadyHealthCheck: boolean = false
+        let yellowErrorEmission: boolean = false
+        let redErrorEmission: boolean = false
+
+        while (true) {
+            try {
+                let recreatePromise = new Promise((resolve) => {
+                    if (grpcType.instanceType == 'server') {
+                        this.createServerStreamingServer(serverUrl, connectionAttribute).then(() => {
+                            resolve('recreate')
+                        })
+                    }
+                    if (grpcType.instanceType == 'client') {
+                        this.createServerStreamingClient(serverUrl, alreadyHealthCheck, connectionAttribute).then(() => {
+                            resolve('recreate')
+                        })
+                    }
+                })
+                await recreatePromise
+                // If connection resolves (indicating failure), increment the count
+                consecutiveResolutions++;
+                // console.log(`Reconnection Attempt: ${consecutiveResolutions}`)
+                alreadyHealthCheck = true
+                // If there are x consecutive resolutions, log an error and break the loop
+                if (consecutiveResolutions >= parseInt(process.env.ReconnectionAttempt as string) && redErrorEmission == false) {
+                    redErrorEmission = true
+                    console.error(`Connection failed ${consecutiveResolutions} times. Stopping connection attempts.`);
+                    let error: ReportStatus = {
+                        code: ColorCode.RED,
+                        message: 'Initiate Doomsday protocol....',
+                    }
+                    statusControl.next(error)
+                }
+                if (consecutiveResolutions < parseInt(process.env.ReconnectionAttempt as string) && yellowErrorEmission == false) {
+                    yellowErrorEmission = true
+                    let error: ReportStatus = {
+                        code: ColorCode.YELLOW,
+                        // message: `Reconnection Attempt: ${consecutiveResolutions}. Server has yet to respond`
+                        message: `Attempting reconnection... Server has yet to respond`,
+                    }
+                    statusControl.next(error);
+                }
+            } catch (error) {
+                // Connection did not resolve, reset the count
+                consecutiveResolutions = 0;
+                console.error('Connection attempt failed:', error);
+            }
+            // Check for a pause of more than 3 seconds since the last resolution attempt
+            const currentTime = Date.now();
+            const timeSinceLastResolution = currentTime - lastResolutionTime;
+            if (timeSinceLastResolution > 2000) {
+                consecutiveResolutions = 0;
+                yellowErrorEmission = false
+                redErrorEmission = false
+                alreadyHealthCheck = false
+            }
+            // Update the last resolution time
+            lastResolutionTime = currentTime;
+            await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
+            // timeout generate message to trigger this reconnection
+        }
+    }
+    // Create Server Instance to stream all application Outgoing messages
+    public async createServerStreamingServer(
+        serverUrl: string,
+        connectionAttribute: ConnectionAttribute
+    ): Promise<any> { // '0.0.0.0:3001'
+        return new Promise((resolve, reject) => {
+            try {
+                // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
+                let server: grpc.Server = new grpc.Server();
+                server.addService(message_proto.Message.service, {
+                    HandleMessage: (call) => {
+                        // Assign channel uuid
+                        let clientInfo = JSON.parse(call.request.message)
+                        this.generateAdditionalAttributes(connectionAttribute, clientInfo)
+
+                        console.log(`Initializing stream. Opening Channel... Confirmation from ${call.request.id}`)
+                        if (connectionAttribute.outGoing.MessageToBePublished) {
+                            let subscription: Subscription = connectionAttribute.outGoing.MessageToBePublished.subscribe({
+                                next: (response: Message) => {
+                                    console.log(`Sending ${(response.message as MessageLog).appData.msgId}`)
+                                    let message = {
+                                        id: response.id,
+                                        message: JSON.stringify(response.message)
+                                    }
+                                    call.write(message)
+                                },
+                                error: err => {
+                                    console.error(err)
+                                    subscription.unsubscribe()
+                                    resolve('')
+                                },
+                                complete: () => {
+                                    console.log(`Stream response completed for ${call.request.id}`)
+                                    subscription.unsubscribe()
+                                    resolve('')
+                                }
+                            })
+                        }
+                    },
+
+                    Check: (_, callback) => {
+                        // for now it is just sending the status message over to tell the client it is alive
+                        // For simplicity, always return "SERVING" as status
+                        callback(null, { status: 'SERVING' });
+                    },
+                });
+                // Bind and start the server
+                server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
+                    console.log(`gRPC server is running on ${serverUrl}`);
+                    server.start();
+                });
+            }
+            catch (error) {
+                resolve(error)
+            }
+
+        })
+    }
+
+    // Send a request over to the other server to open a channel for this server to emit/stream messages over
+    public async createServerStreamingClient(
+        server: string,
+        alreadyHealthCheck: boolean,
+        connectionAttribute: ConnectionAttribute
+    ): Promise<string> {
+        return new Promise(async (resolve, reject) => {
+            const client = new message_proto.Message(server, grpc.credentials.createInsecure());
+            // perform check to see if server is alive, if not terminate this grpc instant and create again
+            this.checkConnectionHealth(client, connectionAttribute.connectionStatus, alreadyHealthCheck).catch(() => {
+                resolve('')
+            })
+            let outGoingInfo: any = {
+                channelID: uuidv4(),
+                publisherID: uuidv4(),
+                subscriberID: uuidv4()
+            }
+            this.generateAdditionalAttributes(connectionAttribute, {}, outGoingInfo)
+
+            // connectionAttribute.ConnectionID = connectionAttribute.outGoing.ChannelID + (connectionAttribute.inComing.ChannelID ?? 'undefined')
+            let call = client.HandleMessage({ id: server, message: JSON.stringify(outGoingInfo) })
+            console.log(`Sending request to ${server} to open response channel...`)
+
+            call.on('status', (status: Status) => {
+                if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
+                    console.log(`Message trasmission operation is successful`)
+                    // RPC completed successfully
+                } if (status == grpc.status.UNAVAILABLE) {
+                    let report = {
+                        code: ColorCode.YELLOW,
+                        message: `Server doesn't seem to be alive. Error returned.`,
+                        from: `Server Streaming Client Instance`
+                    }
+                    connectionAttribute.connectionStatus.next(report)
+                    resolve('No connection established. Server is not responding..')
+                }
+            });
+
+            call.on('data', (data: any) => {
+                let response: Message = {
+                    id: data.id,
+                    message: JSON.parse(data.message)
+                }
+                if (connectionAttribute.inComing.MessageToBeReceived) {
+                    connectionAttribute.inComing.MessageToBeReceived.next(response)
+                }
+                console.log((response.message as MessageLog).appData.msgId)
+            });
+
+            call.on('error', (err) => {
+                resolve('')
+            });
+        })
+    }
+
+
+    // Check connection To be Update. This function is destroying my code flow
+    public async checkConnectionHealth(client: any, statusControl: Subject<ReportStatus>, alreadyHealthCheck: boolean): Promise<boolean> {
+        return new Promise((resolve, reject) => {
+            client.Check({}, (error, response) => {
+                if (response) {
+                    console.log(`GRPC Health check status: ${response.status} Server Connected`);
+                    let report: ReportStatus = {
+                        code: ColorCode.GREEN,
+                        message: `Good to go!!!`,
+                    }
+                    statusControl.next(report)
+                } else {
+                    if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
+                    resolve(false)
+                }
+            })
+        })
+    }
+}

+ 0 - 120
services/grpc.service.ts

@@ -1,120 +0,0 @@
-import { Subject, Subscription, take, takeUntil } from 'rxjs';
-import { ColorCode, GrpcConnectionType, Message, ReportStatus } from '../interfaces/general.interface';
-import { GrpcServiceMethod } from './service.method';
-
-
-export class GrpcService {
-    private grpcServerConnection: any = {}
-    private incomingMessage: Subject<any> = new Subject()
-
-    constructor(private grpcServiceMethod: GrpcServiceMethod) { }
-
-    public getIncomingMessage(): Subject<Message> {
-        return this.incomingMessage
-    }
-
-    public async stopServer(serverUrl: string): Promise<any> {
-        return new Promise((resolve, reject) => {
-            if (this.grpcServerConnection[serverUrl]) {
-                console.log(`Shutting down the gRPC server:${serverUrl} ...`);
-                // this.grpcServerConnection[serverUrl].tryShutdown(() => {
-                //     console.log(`Server ${serverUrl} has been gracefully stopped.`);
-                //     resolve('')
-                // })
-                resolve(this.grpcServerConnection[serverUrl].forceShutdown())
-                console.log(`Server ${serverUrl} is forced to shut down!`)
-                // simply removing the reference to the GrpcService instance associated with the specific serverUrl from the grpcServerConnection object.
-                // However, the gRPC server instance itself continues to run as long as it has not been explicitly shut down using methods like tryShutdown.
-                console.log(`Deleting grpc connection instance:${serverUrl} .....`)
-                delete this.grpcServerConnection[serverUrl];
-            } else {
-                console.log(`Server${serverUrl} is not running.`);
-                reject()
-            }
-        })
-    }
-
-    public getAllGrpcServerConnectionInstance(): any {
-        console.log(this.grpcServerConnection)
-        return this.grpcServerConnection
-    }
-
-    // To be migrated into a service in the immediate future
-    public async createGrpcInstance(serverUrl: string, reportStatus: Subject<ReportStatus>, connectionType: GrpcConnectionType, messageToBePublished?: Subject<Message>) {
-        let messageToBeTransmitted: Subject<Message> = messageToBePublished ?? new Subject()
-        let statusControl: Subject<ReportStatus> = reportStatus
-        let consecutiveResolutions = 0;
-        let lastResolutionTime = Date.now();
-        let alreadyHealthCheck: boolean = false
-        let yellowErrorEmission: boolean = false
-        let redErrorEmission: boolean = false
-
-        while (true) {
-            try {
-                if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'server streaming') {
-                    await this.grpcServiceMethod.createServerStreamingClient(serverUrl, alreadyHealthCheck, statusControl, this.incomingMessage);
-                }
-                if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'server streaming') {
-                    await this.grpcServiceMethod.createServerStreamingServer(serverUrl, this.grpcServerConnection, messageToBeTransmitted)
-                }
-                /* To be enabled again if there's a need for bidiretional streaming */
-                // if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'bidirectional') {
-                //     await this.grpcServiceMethod.createBidirectionalStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl, this.incomingMessage);
-                // }
-                // if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'bidirectional') {
-                //     await this.grpcServiceMethod.createGrpcBidirectionalServer(serverUrl, messageToBeTransmitted, statusControl, this.grpcServerConnection)
-                // }
-                // If connection resolves (indicating failure), increment the count
-                consecutiveResolutions++;
-                // console.log(`Reconnection Attempt: ${consecutiveResolutions}`)
-                alreadyHealthCheck = true
-
-
-                // If there are x consecutive resolutions, log an error and break the loop
-                if (consecutiveResolutions >= parseInt(process.env.ReconnectionAttempt as string) && redErrorEmission == false) {
-                    redErrorEmission = true
-                    console.error(`Connection failed ${consecutiveResolutions} times. Stopping connection attempts.`);
-                    let error: ReportStatus = {
-                        code: ColorCode.RED,
-                        message: 'Initiate Doomsday protocol....',
-                        from: `GRPC instance management`
-                    }
-                    statusControl.next(error)
-                }
-                if (consecutiveResolutions < parseInt(process.env.ReconnectionAttempt as string) && yellowErrorEmission == false) {
-                    yellowErrorEmission = true
-                    let error: ReportStatus = {
-                        code: ColorCode.YELLOW,
-                        // message: `Reconnection Attempt: ${consecutiveResolutions}. Server has yet to respond`
-                        message: `Attempting reconnection... Server has yet to respond`,
-                        from: `GRPC instance management`
-                    }
-                    statusControl.next(error);
-                }
-            } catch (error) {
-                // Connection did not resolve, reset the count
-                consecutiveResolutions = 0;
-                console.error('Connection attempt failed:', error);
-            }
-
-            // Check for a pause of more than 3 seconds since the last resolution attempt
-            const currentTime = Date.now();
-            const timeSinceLastResolution = currentTime - lastResolutionTime;
-            if (timeSinceLastResolution > 2000) {
-                consecutiveResolutions = 0;
-                yellowErrorEmission = false
-                redErrorEmission = false
-                alreadyHealthCheck = false
-            }
-
-            // Update the last resolution time
-            lastResolutionTime = currentTime;
-
-            await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
-            // timeout generate message to trigger this reconnection
-        }
-    }
-
-}
-
-

+ 78 - 0
services/server-client.service.ts

@@ -0,0 +1,78 @@
+import { BehaviorSubject, Subject } from 'rxjs';
+import { ColorCode, Message, ReportStatus, ConnectionAttribute, ConnectionRequest, ConnectionStatus } from '../interfaces/general.interface';
+import { GrpcServiceMethod } from './grpc.service.method';
+import { FisRetransmissionService } from './fis.retransmission.service';
+
+export class ServerClientManager {
+
+    private connectionAttributes: ConnectionAttribute[] = []
+
+    constructor(private grpcService: GrpcServiceMethod) {
+    }
+
+    public generateConnection(request: ConnectionRequest) {
+        let initialReport: ReportStatus = { code: ColorCode.YELLOW, message: 'Initialization of the subject' }
+        let reportSubject: BehaviorSubject<ReportStatus> = new BehaviorSubject(initialReport)
+        let retransmission: FisRetransmissionService = new FisRetransmissionService()
+        let messageToBePublished: Subject<Message> = retransmission.handleMessage(request.server.messageToBePublishedfromApplication, reportSubject)
+
+        let connectionAttribute: ConnectionAttribute = {
+            ConnectionID: {
+                local: '',
+                remote: ''
+            },
+            outGoing: {
+                MessageToBePublished: messageToBePublished,
+                MessageToBeReceived: null
+            },
+            inComing: {
+                MessageToBePublished: null,
+                MessageToBeReceived: request.client.messageToBeReceivedFromRemote
+            },
+            connectionStatus: reportSubject // this is not related to report status for the retrasmission service
+        }
+        // let connectionStatus: Record<string, ConnectionStatus> = {
+        //     localStatus: ConnectionStatus.RED,
+        //     remoteStatus: ConnectionStatus.RED
+        // }
+
+        // This is default connection
+        if (!request.server.connectionType) {
+            request.server.connectionType = 'GRPC'
+        }
+        // For each connection type:
+        if (request.server.connectionType == 'GRPC') {
+            this.grpcService.create(request, connectionAttribute)
+            this.connectionAttributes.push(connectionAttribute)
+        }
+        // Check to make sure both the connection at local and remote is up, then report to retransmission service to release message
+        // connectionAttribute.connectionStatus.subscribe({
+        //     // Check this....
+        //     next: (element: any) => {
+        //         if (element.message == 'Local Server Communication Established') {
+        //             connectionStatus.localStatus = ConnectionStatus.GREEN
+        //             console.log(`Local alive`)
+        //         }
+        //         if (element.message == 'Remote Server Communication Established') {
+        //             connectionStatus.remoteStatus = ConnectionStatus.GREEN
+        //             console.log(`Remote alive`)
+        //         }
+
+                // if (connectionStatus.localStatus == ConnectionStatus.GREEN && connectionStatus.remoteStatus == ConnectionStatus.GREEN) {
+                //     let report: ReportStatus = {
+                //         code: ColorCode.GREEN,
+                //         message: `Both Local and Remote are connected`
+                //     }
+                //     // reportSubject.next(report)
+                //     console.log(`Both local and remote alive`)
+                // }
+            // },
+            // error: (err) => console.error(err),
+            // complete: () => { }
+        // })
+
+    }
+
+
+}
+

+ 0 - 315
services/service.method.ts

@@ -1,315 +0,0 @@
-import * as grpc from '@grpc/grpc-js';
-import { Subject, Subscription } from "rxjs";
-import { ReportStatus, ColorCode, Message, MessageLog } from "../interfaces/general.interface";
-import { Status } from '@grpc/grpc-js/build/src/constants';
-import { error } from 'console';
-const message_proto = require('./protos/server.proto')
-
-export class GrpcServiceMethod {
-
-    // Create Server Instance to stream all application Outgoing messages
-    public async createServerStreamingServer(
-        serverUrl: string,
-        grpcServerConnection: any,
-        messageToBeStream: Subject<Message>
-    ): Promise<any> { // '0.0.0.0:3001'
-        return new Promise((resolve, reject) => {
-            try {
-                // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
-                let server: grpc.Server = new grpc.Server();
-                // Add the streamingData function to the gRPC service
-                // Define your message_proto.Message service methods
-                server.addService(message_proto.Message.service, {
-                    HandleMessage: (call) => {
-                        // console.log(call.request)
-                        console.log(`Intializing stream. Opening Channel. Confirmation from ${call.getPeer()}`)
-
-                        let subscription: Subscription = messageToBeStream.subscribe({
-                            next: (response: Message) => {
-                                console.log(`Sending ${(response.message as MessageLog).appData.msgId}`)
-                                let message = {
-                                    id: response.id,
-                                    message: JSON.stringify(response.message)
-                                }
-                                call.write(message)
-                            },
-                            error: err => {
-                                console.error(err)
-                                subscription.unsubscribe()
-                                resolve('')
-                            },
-                            complete: () => {
-                                console.log(`Stream response completed for ${call.request.id}`)
-                                subscription.unsubscribe()
-                                resolve('')
-                                // call.end()
-                            }
-                        })
-                    },
-
-                    Check: (_, callback) => {
-                        // health check logic here
-                        // for now it is just sending the status message over to tell the client it is alive
-                        // For simplicity, always return "SERVING" as status
-                        callback(null, { status: 'SERVING' });
-                    },
-                });
-
-                // Bind and start the server
-                server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
-                    console.log(`gRPC server is running on ${serverUrl}`);
-                    server.start();
-                });
-                grpcServerConnection[serverUrl] = server
-            }
-            catch (error) {
-                resolve(error)
-            }
-
-        })
-    }
-
-    // Send a request over to the other server to open a channel for this server to emit/stream messages over
-    public async createServerStreamingClient(
-        server: string,
-        alreadyHealthCheck: boolean,
-        statusControl: Subject<ReportStatus>,
-        incomingMessage: Subject<Message>
-    ): Promise<string> {
-        return new Promise(async (resolve, reject) => {
-            const client = new message_proto.Message(server, grpc.credentials.createInsecure());
-
-            // perform check to see if server is alive, if not terminate this grpc instant and create again
-            this.checkConnectionHealth(client, statusControl, alreadyHealthCheck).catch((error) => {
-                resolve('')
-            })
-            // this is where the request sending logic occurs
-            let call = client.HandleMessage({ id: `0000`, message: `Intiate Main Stream Channel Response` })
-            console.log(`Sending request to open response channel...`)
-
-            call.on('status', (status: Status) => {
-                if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
-                    console.log(`Message trasmission operation is successful`)
-                    // RPC completed successfully
-                } if (status == grpc.status.UNAVAILABLE) {
-                    let report = {
-                        code: ColorCode.YELLOW,
-                        message: `Server doesn't seem to be alive. Error returned.`,
-                        from: `Server Streaming Client Instance`
-                    }
-                    statusControl.next(report)
-                    resolve('No connection established. Server is not responding..')
-                }
-            });
-
-            call.on('data', (data: any) => {
-                // standard procedure. convert back the data and pass to the application to be processed
-                let response: Message = {
-                    id: data.id,
-                    message: JSON.parse(data.message)
-                }
-                incomingMessage.next(response)
-                console.log((response.message as MessageLog).appData.msgId)
-            });
-
-            call.on('error', (err) => {
-                resolve('')
-            });
-
-            // call.on('end', () => { // this is for gracefull || willfull termination from the server
-            //     console.log(`Terminating Stream Request. Directing response to main channel`)
-            //     resolve('')
-            // });
-        })
-    }
-
-
-
-    /* ----------------All the functions below are for Bi-directional streaming. Subject to be deleted if decided not in use---------------- */
-    public async createGrpcBidirectionalServer(
-        serverUrl: string,
-        messageToBeStream: Subject<any>,
-        statusControl: Subject<ReportStatus>,
-        grpcServerConnection: any,
-    ): Promise<any> { // '0.0.0.0:3001'
-        return new Promise((resolve, reject) => {
-            try {
-                // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
-                let server: grpc.Server = new grpc.Server();
-                // Add the streamingData function to the gRPC service
-                // Define your message_proto.Message service methods
-
-                server.addService(message_proto.Message.service, {
-                    sendMessageStream: (call) => {
-                        console.log(`Client connected from: ${call.getPeer()}`);
-                        let report: ReportStatus = {
-                            code: ColorCode.GREEN,
-                            message: `Client connected!!`,
-                            from: `Bidirectional Instance`
-                        }
-                        statusControl.next(report)
-
-                        // Right now this is being broadcast.
-                        let subscription: Subscription = messageToBeStream.subscribe({
-                            next: (payload: any) => {
-                                let noConnection = call.cancelled // check connection for each and every message
-                                if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check 
-                                    let report: ReportStatus = {
-                                        code: ColorCode.YELLOW,
-                                        message: `Client is not alive.....`,
-                                        payload: payload,
-                                        from: `Bidirectional Instance`
-                                    }
-                                    statusControl.next(report) // no connection. Tell buffer service to stop releasing messages
-                                    subscription.unsubscribe() // i still dont understand why i wrote this here
-                                } else {
-                                    console.log(`Sending ${payload.appData.msgId}`)
-                                    let message: string = JSON.stringify(payload)
-                                    call.write({ message })
-                                }
-                            },
-                            error: err => console.error(err),
-                            complete: () => { } //it will never complete
-                        })
-
-                        call.on('data', (data: any) => {
-                            // console.log(data) // it does return in string format
-                            let payload = JSON.parse(data.message)
-                            console.log(`Received Message from Client: ${payload.appData?.msgId}`);
-                            // Forward the received message to the RxJS subject
-                            // let respmsg: any = {
-                            //     msgId: payload.appData?.msgId,
-                            //     confirmationMessage: `Message ${payload.appData?.msgId} acknowledged!`
-                            // }
-                            // let message: string = JSON.stringify(respmsg)
-                            // console.log(`Responding to client: ${respmsg.msgId}`);
-                            // Note: The parameter here MUST BE STRICTLY be the same letter as defined in proto. Eg: message MessageRequest { string >>'message'<< = 1 }
-                            // call.write({ message });
-                        });
-
-                        call.on('end', () => {
-                            console.log('Client stream ended');
-                            // but the stream never ends. THis is not a reliable way to tell if a client is disconnected
-                        });
-
-                        call.on('error', (err) => {
-                            // Error that may occue during the rpc call. Id there's an error, put a callbacn function there to check the connection for client
-                            // emit a yellow report to halt message release. If the server does not reply to the callback function, then emit a red card
-                            // the call back function will be to write and then the client should response immediately through test
-                        });
-
-                        call.on('close', () => {
-                            console.log('Unknown cause for diconnectivity');
-                            // Handle client closure, which may be due to errors or manual termination
-                        });
-
-                    },
-
-                    Check: (_, callback) => {
-                        // health check logic here
-                        // for now it is just sending the status message over to tell the client it is alive
-                        // For simplicity, always return "SERVING" as status
-                        callback(null, { status: 'SERVING' });
-                    },
-                });
-
-                // Bind and start the server
-                server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
-                    console.log(`gRPC server is running on ${serverUrl}`);
-                    server.start();
-                });
-                grpcServerConnection[serverUrl] = server
-            }
-            catch (error) {
-                resolve(error)
-            }
-        })
-    }
-
-
-    public async createBidirectionalStreamingClient(
-        server: string,
-        alreadyHealthCheck: boolean,
-        messageToBeTransmitted: Subject<any>,
-        statusControl: Subject<ReportStatus>,
-        incomingResponse: Subject<Message>
-    ): Promise<string> {
-        let subscription: any
-        let unsubscribed: boolean = false
-
-        return new Promise(async (resolve, reject) => {
-            const client = new message_proto.Message(server, grpc.credentials.createInsecure());
-            const call = client.sendMessageStream();
-
-            this.checkConnectionHealth(client, statusControl, alreadyHealthCheck)
-
-            call.on('status', (status: Status) => { // this is useless in streaming(on for unary)
-                // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
-                // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming
-                // if (status == grpc.status.UNAVAILABLE) { // only returns a status when there's error. Otherwise it just waits
-                //     resolve('No connection established. Server is not responding..')
-                // }
-            });
-
-            // All the grpc operations are here
-            // Subscribe to the RxJS subject to send data to the server
-            subscription = messageToBeTransmitted.subscribe({
-                next: (payload: any) => {
-                    if (!unsubscribed) {
-                        console.log(`Sending ${payload.appData.msgId}`)
-                        let message: string = JSON.stringify(payload)
-                        call.write({ message })
-                    }
-                },
-                error: err => console.error(err),
-                complete: () => { } //it will never complete
-            });
-
-
-            call.on('data', (data: any) => {
-                let message = JSON.parse(data.message)
-                console.log(`Received message from Server: ${message.msgId ?? message.appData?.msgId ?? `Invalid`}`);
-            });
-
-            call.on('error', (err) => {
-                // console.log(`Something wrong with RPC call...`)
-                if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks
-                    subscription.unsubscribe();
-                    unsubscribed = true;
-                }
-                resolve('Server Error');
-            });
-
-            call.on('end', () => {
-                if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks
-                    subscription.unsubscribe();
-                    unsubscribed = true;
-                }
-                resolve('Server Error');
-            });
-
-        })
-    }
-
-
-
-    // Check connection To be Update. This function is destroying my code flow
-    public async checkConnectionHealth(client: any, statusControl: Subject<ReportStatus>, alreadyHealthCheck: boolean): Promise<boolean> {
-        return new Promise((resolve, reject) => {
-            client.Check({}, (error, response) => {
-                if (response) {
-                    console.log(`GRPC Health check status: ${response.status} Server Connected`);
-                    let report: ReportStatus = {
-                        code: ColorCode.GREEN,
-                        message: `Good to go!!!`,
-                        from: `GRPC health check`
-                    }
-                    statusControl.next(report)
-                } else {
-                    if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
-                    resolve(false)
-                }
-            })
-        })
-    }
-}

+ 1 - 1
startgrpc.bat

@@ -1,6 +1,6 @@
 
 @echo off
-start wt -M -d "C:\Users\esenz\dev\Task\Fis-Restransmission" cmd /k "npm run grpc1" ; split-pane -d "C:\Users\esenz\dev\Task\Fis-Restransmission" cmd /k "npm run grpc2" ; split-pane -d "C:\Users\esenz\dev\Task\Fis-Restransmission" cmd /k "npm run grpc2"
+start wt -M -d "C:\Users\esenz\dev\Task\Fis-Restransmission" cmd /k "npm run grpc1" ; split-pane -d "C:\Users\esenz\dev\Task\Fis-Restransmission" cmd /k "npm run grpc2" ; split-pane -d "C:\Users\esenz\dev\Task\Fis-Restransmission" cmd /k "npm run grpc3"
 
 
 //wt -p "Command Prompt" ; split-pane -p "Windows PowerShell" ; split-pane -H 

+ 59 - 52
test/grpc1.ts

@@ -1,32 +1,39 @@
-import * as fs from 'fs'
-import { Subject, groupBy, mergeMap, take, toArray } from 'rxjs';
-import { ColorCode, Message, MessageLog, ReportStatus } from '../interfaces/general.interface';
-import { GrpcService } from '../services/grpc.service';
-import { FisRetransmissionService } from '../services/fis.retransmission.service';
-import { GrpcServiceMethod } from '../services/service.method';
+import { Subject, take } from 'rxjs';
+import { Message, MessageLog, ConnectionRequest } from '../interfaces/general.interface';
+import { GrpcServiceMethod } from '../services/grpc.service.method';
+import { readFileSync } from 'fs';
+import { ServerClientManager } from '../services/server-client.service';
 
 // Subject for bidirectional communication
-const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
-const grpcService: GrpcService = new GrpcService(new GrpcServiceMethod())
-const messagesJSON: any = fs.readFileSync('payload.json')
-let incomingMessages: Subject<any> = grpcService.getIncomingMessage()
+const connectionService: ServerClientManager = new ServerClientManager(new GrpcServiceMethod())
+const messagesJSON: any = readFileSync('payload.json')
 let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial 
-let messageToBeReleased: Subject<any> = new Subject() // Sample message to be transmitted over to target server
-let applicationOutgoingResponse: Subject<Message> = new Subject()
-let statusControl: Subject<ReportStatus> = new Subject() // Listening for error events and states
 let targetserver: string = 'localhost:3001'
+let targetserver2: string = 'localhost:3002'
 let hostServer: string = 'localhost:3000'
-let array: any[] = [] // Used for testing
+let array: any[] = [] // Used for testing                     
+let connectionRequest: ConnectionRequest = {
+  server: {
+    serverUrl: hostServer,
+    connectionType: 'GRPC',
+    messageToBePublishedfromApplication: new Subject<Message>()
+  },
+  client: {
+    targetServer: targetserver,
+    connectionType: 'GRPC',
+    messageToBeReceivedFromRemote: new Subject<Message>()
+  }
+}
 
 // Handler for the incoming Messages from the other side. 
-incomingMessages.subscribe({
+connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
   next: request => {
     // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
     if ((request.message as MessageLog).appData.msgPayload == 'Query') {
       generateFakeStreamResponse(request).subscribe({
-        next: (responseMessage) => {
+        next: (responseMessage: Message) => {
           // console.log(`Processing request:${request.id}....`)
-          applicationOutgoingResponse.next(responseMessage)
+          connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
         },
         error: error => console.error(error),
         complete: () => {
@@ -42,34 +49,44 @@ incomingMessages.subscribe({
   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
 })
 
-// Open channel for sending messages across.
-errorHandlingService.handleMessage(applicationOutgoingResponse, statusControl).subscribe((messages) => {
-  messageToBeReleased.next(messages)
-})
-grpcService.createGrpcInstance(hostServer, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' }, messageToBeReleased)
 
-grpcService.createGrpcInstance(targetserver, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
 
-setTimeout(() => {
-  let message = {
-    id: parsedMessages[10].appData.msgId,
-    message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
-  }
-  applicationOutgoingResponse.next(message)
-}, 2000)
-setTimeout(() => {
-  let message = {
-    id: parsedMessages[11].appData.msgId,
-    message: parsedMessages[11]// Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
+connectionService.generateConnection(connectionRequest)
+
+/* Simple Test */
+let generateFakeMessagesToBePublished = stream().pipe(take(10))
+generateFakeMessagesToBePublished.subscribe({
+  next: message => {
+    let payload: Message = {
+      id: hostServer,
+      message: message
+    }
+    connectionRequest.server.messageToBePublishedfromApplication.next(payload)
   }
-  applicationOutgoingResponse.next(message)
-}, 3000)
-setTimeout(() => {
-  console.log(`All received data: ${array.length}`)
-}, 10000)
-setTimeout(() => {
-  console.log(`All received data: ${array.length}`)
-}, 20000)
+})
+
+
+/* Complex Test */
+// setTimeout(() => {
+//   let message = {
+//     id: parsedMessages[10].appData.msgId,
+//     message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
+//   }
+//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
+// }, 3000)
+// setTimeout(() => {
+//   let message = {
+//     id: parsedMessages[11].appData.msgId,
+//     message: parsedMessages[11]// Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
+//   }
+//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
+// }, 4000)
+// setTimeout(() => {
+//   console.log(`All received data: ${array.length}`)
+// }, 10000)
+// setTimeout(() => {
+//   console.log(`All received data: ${array.length}`)
+// }, 20000)
 
 
 // this is just to publish an array of fake data as a Subject
@@ -104,13 +121,3 @@ function generateFakeStreamResponse(request: any): Subject<any> {
   })
   return res
 }
-
-/* Extra NOTEs:
-So let's say when this host makes a request and send it over to the other side to process. And say this request
-is actually a query, so it will take some times for the other side to process the data. But what happens when the 
-other side down, that means i won't get my query. Is this the responsibility of the application logic of the other
-side? To keep track of the message request sent over? 
-Personal opinion it should be the responsibility of the application logic to keep track of the request they are 
-processing, especially when the server goes down whilst they are streaning back the response. Because for this 
-retransmission service, it shouldn't care anymore. It just make sure to get the message to pass over and buffer
-the message when established client instance could not talk to the server on the other side.*/

+ 62 - 45
test/grpc2.ts

@@ -1,32 +1,39 @@
-import * as fs from 'fs'
-import { Subject, groupBy, mergeMap, take, toArray } from 'rxjs';
-import { ColorCode, Message, MessageLog, ReportStatus } from '../interfaces/general.interface';
-import { GrpcService } from '../services/grpc.service';
-import { FisRetransmissionService } from '../services/fis.retransmission.service';
-import { GrpcServiceMethod } from '../services/service.method';
+import { Subject, take } from 'rxjs';
+import { Message, MessageLog, ConnectionRequest } from '../interfaces/general.interface';
+import { GrpcServiceMethod } from '../services/grpc.service.method';
+import { readFileSync } from 'fs';
+import { ServerClientManager } from '../services/server-client.service';
 
 // Subject for bidirectional communication
-const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
-const grpcService: GrpcService = new GrpcService(new GrpcServiceMethod())
-const messagesJSON: any = fs.readFileSync('payload.json')
-let incomingMessages: Subject<any> = grpcService.getIncomingMessage()
+const connectionService: ServerClientManager = new ServerClientManager(new GrpcServiceMethod())
+const messagesJSON: any = readFileSync('payload.json')
 let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial 
-let messageToBeReleased: Subject<any> = new Subject() // Sample message to be transmitted over to target server
-let applicationOutgoingResponse: Subject<Message> = new Subject()
-let statusControl: Subject<ReportStatus> = new Subject() // Listening for error events and states
 let targetserver: string = 'localhost:3000'
+let targetserver2: string = 'localhost:3002'
 let hostServer: string = 'localhost:3001'
-let array: any[] = []// Used for testing
+let array: any[] = [] // Used for testing                     
+let connectionRequest: ConnectionRequest = {
+  server: {
+    serverUrl: hostServer,
+    connectionType: 'GRPC',
+    messageToBePublishedfromApplication: new Subject<Message>()
+  },
+  client: {
+    targetServer: targetserver,
+    connectionType: 'GRPC',
+    messageToBeReceivedFromRemote: new Subject<Message>()
+  }
+}
 
 // Handler for the incoming Messages from the other side. 
-incomingMessages.subscribe({
+connectionRequest.client.messageToBeReceivedFromRemote.subscribe({
   next: request => {
     // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
     if ((request.message as MessageLog).appData.msgPayload == 'Query') {
       generateFakeStreamResponse(request).subscribe({
-        next: (responseMessage) => {
+        next: (responseMessage: Message) => {
           // console.log(`Processing request:${request.id}....`)
-          applicationOutgoingResponse.next(responseMessage)
+          connectionRequest.server.messageToBePublishedfromApplication.next(responseMessage)
         },
         error: error => console.error(error),
         complete: () => {
@@ -42,34 +49,44 @@ incomingMessages.subscribe({
   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
 })
 
-// Open channel for sending messages across.
-errorHandlingService.handleMessage(applicationOutgoingResponse, statusControl).subscribe((messages) => {
-  messageToBeReleased.next(messages)
-})
-grpcService.createGrpcInstance(hostServer, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' }, messageToBeReleased)
 
-grpcService.createGrpcInstance(targetserver, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
 
-setTimeout(() => {
-  let message = {
-    id: parsedMessages[10].appData.msgId,
-    message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
-  }
-  applicationOutgoingResponse.next(message)
-}, 2000)
-setTimeout(() => {
-  let message = {
-    id: parsedMessages[11].appData.msgId,
-    message: parsedMessages[11]// Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
-  }
-  applicationOutgoingResponse.next(message)
-}, 3000)
-setTimeout(() => {
-  console.log(`All received data: ${array.length}`)
-}, 10000)
-setTimeout(() => {
-  console.log(`All received data: ${array.length}`)
-}, 20000)
+connectionService.generateConnection(connectionRequest)
+
+/* Simple Test */
+// let generateFakeMessagesToBePublished = stream().pipe(take(10))
+// generateFakeMessagesToBePublished.subscribe({
+//   next: message => {
+//     let payload: Message = {
+//       id: hostServer,
+//       message: JSON.stringify(message)
+//     }
+//     connectionRequest.server.messageToBePublishedfromApplication.next(payload)
+//   }
+// })
+
+
+/* Complex Test */
+// setTimeout(() => {
+//   let message = {
+//     id: parsedMessages[10].appData.msgId,
+//     message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
+//   }
+//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
+// }, 3000)
+// setTimeout(() => {
+//   let message = {
+//     id: parsedMessages[11].appData.msgId,
+//     message: parsedMessages[11]// Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
+//   }
+//   connectionRequest.server.messageToBePublishedfromApplication.next(message)
+// }, 4000)
+// setTimeout(() => {
+//   console.log(`All received data: ${array.length}`)
+// }, 10000)
+// setTimeout(() => {
+//   console.log(`All received data: ${array.length}`)
+// }, 20000)
 
 
 // this is just to publish an array of fake data as a Subject
@@ -88,9 +105,10 @@ function stream(): Subject<any> {
   return result
 }
 
+
 function generateFakeStreamResponse(request: any): Subject<any> {
   let res: Subject<any> = new Subject()
-  stream().pipe(take(5)).subscribe({
+  stream().pipe(take(7)).subscribe({
     next: element => {
       let message = {
         id: request.id, // Caller's 
@@ -103,4 +121,3 @@ function generateFakeStreamResponse(request: any): Subject<any> {
   })
   return res
 }
-

+ 53 - 33
test/grpc3.ts

@@ -1,35 +1,43 @@
-import * as fs from 'fs'
-import { Subject, groupBy, mergeMap, take, toArray } from 'rxjs';
-import { ColorCode, Message, MessageLog, ReportStatus } from '../interfaces/general.interface';
-import { GrpcService } from '../services/grpc.service';
-import { FisRetransmissionService } from '../services/fis.retransmission.service';
-import { GrpcServiceMethod } from '../services/service.method';
+import { Subject, take } from 'rxjs';
+import { Message, MessageLog, ConnectionRequest } from '../interfaces/general.interface';
+import { GrpcServiceMethod } from '../services/grpc.service.method';
+import { readFileSync } from 'fs';
+import { ServerClientManager } from '../services/server-client.service';
 
 // Subject for bidirectional communication
-const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
-const grpcService: GrpcService = new GrpcService(new GrpcServiceMethod())
-const messagesJSON: any = fs.readFileSync('payload.json')
-let incomingMessages: Subject<any> = grpcService.getIncomingMessage()
+const connectionService: ServerClientManager = new ServerClientManager(new GrpcServiceMethod())
+const messagesJSON: any = readFileSync('payload.json')
 let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial 
-let messageToBeReleased: Subject<any> = new Subject() // Sample message to be transmitted over to target server
-let applicationOutgoingResponse: Subject<Message> = new Subject()
-let statusControl: Subject<ReportStatus> = new Subject() // Listening for error events and states
-let targetserver: string = 'localhost:3000'
-let hostServer: string = 'localhost:3001'
-let array: any[] = []
+let targetserver: string = 'localhost:300'
+// let targetserver2: string = 'localhost:3002'
+let hostServer: string = 'localhost:3002'
+let array: any[] = [] // Used for testing                     
+let request: ConnectionRequest = {
+  server: {
+    serverUrl: hostServer,
+    connectionType: 'GRPC',
+    messageToBePublishedfromApplication: new Subject<Message>()
+  },
+  client: {
+    targetServer: targetserver,
+    connectionType: 'GRPC',
+    messageToBeReceivedFromRemote: new Subject<Message>()
+  },
+}
 
-incomingMessages.subscribe({
+// Handler for the incoming Messages from the other side. 
+request.client.messageToBeReceivedFromRemote.subscribe({
   next: request => {
     // Application logic comes here. This is where the asortment takes place, of decidiing whose messages it belongs of what it is
     if ((request.message as MessageLog).appData.msgPayload == 'Query') {
       generateFakeStreamResponse(request).subscribe({
         next: (responseMessage) => {
           // console.log(`Processing request:${request.id}....`)
-          applicationOutgoingResponse.next(responseMessage)
+          request.server.messageToBePublishedfromApplication.next(responseMessage)
         },
         error: error => console.error(error),
         complete: () => {
-          console.log(`Stream request for ${request.id} is queued.`) // shpuld be indefinite
+          console.log(`Stream request for ${request.id} is queued.`) // should be indefinite
         }
       })
     } else {
@@ -41,20 +49,33 @@ incomingMessages.subscribe({
   complete: () => console.log(`Response for incoming generated. But this will never stop, and should not either.`)
 })
 
-// Open channel for sending messages across.
-errorHandlingService.handleMessage(applicationOutgoingResponse, statusControl).subscribe((messages) => {
-  messageToBeReleased.next(messages)
-})
-grpcService.createGrpcInstance(hostServer, statusControl, { instanceType: 'server', serviceMethod: 'server streaming' }, messageToBeReleased)
 
-grpcService.createGrpcInstance(targetserver, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
 
-setTimeout(() => {
-  console.log(`All received data: ${array.length}`)
-}, 10000)
-setTimeout(() => {
-  console.log(`All received data: ${array.length}`)
-}, 20000)
+connectionService.generateConnection(request)
+// grpcService.createGrpcInstance(hostServer, { instanceType: 'server', serviceMethod: 'server streaming' }, messageToBeReleased)
+
+// grpcService.createGrpcInstance(targetserver, { instanceType: 'client', serviceMethod: 'server streaming' })
+
+// setTimeout(() => {
+//   let message = {
+//     id: parsedMessages[10].appData.msgId,
+//     message: parsedMessages[10] // Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
+//   }
+//   applicationOutgoingResponse.next(message)
+// }, 3000)
+// setTimeout(() => {
+//   let message = {
+//     id: parsedMessages[11].appData.msgId,
+//     message: parsedMessages[11]// Choose this number, because i purposely use the 11th message and change the msgPayload property to query to emulate a request
+//   }
+//   applicationOutgoingResponse.next(message)
+// }, 4000)
+// setTimeout(() => {
+//   console.log(`All received data: ${array.length}`)
+// }, 10000)
+// setTimeout(() => {
+//   console.log(`All received data: ${array.length}`)
+// }, 20000)
 
 
 // this is just to publish an array of fake data as a Subject
@@ -76,7 +97,7 @@ function stream(): Subject<any> {
 
 function generateFakeStreamResponse(request: any): Subject<any> {
   let res: Subject<any> = new Subject()
-  stream().pipe(take(5)).subscribe({
+  stream().pipe(take(7)).subscribe({
     next: element => {
       let message = {
         id: request.id, // Caller's 
@@ -89,4 +110,3 @@ function generateFakeStreamResponse(request: any): Subject<any> {
   })
   return res
 }
-

+ 2 - 1
test/test.ts

@@ -10,4 +10,5 @@ setTimeout(() => {
 
 setTimeout(() => {
   testSubject.next({test: 'too late'})
-}, 4000)
+}, 4000)
+

+ 1 - 1
tsconfig.json

@@ -21,7 +21,7 @@
     // "noLib": true,                                    /* Disable including any library files, including the default lib.d.ts. */
     // "useDefineForClassFields": true,                  /* Emit ECMAScript-standard-compliant class fields. */
     /* Modules */
-    "module": "commonjs", /* Specify what module code is generated. */
+    "module": "CommonJS", /* Specify what module code is generated. */
     // "rootDir": "./",                                  /* Specify the root folder within your source files. */
     "moduleResolution": "node", /* Specify how TypeScript looks up a file from a given module specifier. */
     // "baseUrl": "./",                                  /* Specify the base directory to resolve non-relative module names. */