Browse Source

redirection to main channel

enzo 11 months ago
parent
commit
0e8b888067

+ 12 - 4
interfaces/general.interface.ts

@@ -3,8 +3,8 @@
 
 export enum ColorCode {
     'GREEN' = 'GREEN',
-    "YELLOW" = "YELLOW",
-    "RED" = "RED"
+    'YELLOW' = 'YELLOW',
+    'RED' = 'RED'
 }
 
 export interface messageTransmissionInterface {
@@ -18,7 +18,7 @@ export interface MessageLog { // this one specifically for office work case only
     appData: {
         msgId: string,
         msgLogDateTime: string,
-        msgDateTime: string,
+        msgDateTime: string,s
         msgTag: string[],
         msgPayload: string
     }
@@ -38,4 +38,12 @@ export interface ReportStatus {
 export interface GrpcConnectionType {
     instanceType: '' | 'server' | 'client'
     serviceMethod: '' | 'unary' | 'server streaming' | 'client streaming' | 'bidirectional'
-}
+}
+
+export interface GrpcMessage {
+    id: string,
+    message: MessageLog | string
+}
+
+export type Status = -1 | 0 | 1 // For status chain effect
+

+ 9 - 1
models/message.schema.ts

@@ -35,4 +35,12 @@ const messageSchema = new mongoose.Schema({
     appData: appData
 });
 
-module.exports = messageSchema
+const grpcMessageSchema = new mongoose.Schema({
+    id: {
+        type: String,
+        required: true
+    },
+    message: messageSchema
+})
+
+module.exports = grpcMessageSchema

+ 228 - 182
services/fis.retransmission.service.ts

@@ -2,7 +2,7 @@ import * as _ from 'lodash'
 import * as fs from 'fs'
 import mongoose, { Model, Schema } from 'mongoose';
 import { Observable, Subject, Subscription, from } from 'rxjs'
-import { ColorCode, ReportStatus } from '../interfaces/general.interface'
+import { ColorCode, GrpcMessage, MessageLog, ReportStatus, Status } from '../interfaces/general.interface'
 require('dotenv').config();
 
 // Implement status chain refactoring
@@ -19,32 +19,32 @@ export class FisRetransmissionService {
     }
 
     // Main function that intercepts outgoing messages by communicating || intepreting report status from grpc connection as indicator 
-    public handleMessage(messageToBePublished: Subject<any>, statusReport: Subject<ReportStatus>): Subject<any> {
-        let releaseMessageSubject: Subject<any> = new Subject() // A return value
+    public handleMessage(messageToBePublished: Subject<GrpcMessage>, statusReport: Subject<ReportStatus>): Subject<GrpcMessage> {
+        let releaseMessageSubject: Subject<GrpcMessage> = new Subject() // A return value
         // Using the concept of toggling to improve the eficacy of subscription control && data flow
         let messageReleaseSubscription: Subscription | null = null
         let messageBufferSubscription: Subscription | null = null
         let messageStreamToMongo: Subscription | null = null
         this.checkBufferLimit(messageToBePublished, statusReport)
-        statusReport.subscribe((report: any) => {
+        statusReport.subscribe((report: ReportStatus) => {
             if (report.code == ColorCode.GREEN) {
                 console.log(`Connection status report && ${report.message ?? 'No Message'}`)
                 /* Status Chain begins */
                 let status: Status = 1
                 if (status === 1) {
                     messageStreamToMongo = this.deactivateMongoStreamSubscription(messageStreamToMongo)
-                    if (messageStreamToMongo) status = -1
+                    if (messageStreamToMongo) status = 0
                 }
                 if (status === 1) {
                     messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)
-                    if (messageBufferSubscription) status = -1
+                    if (messageBufferSubscription) status = 0
                 }
                 if (status === 1) {
                     messageReleaseSubscription = this.activateReleaseSubscription(messageReleaseSubscription, messageToBePublished, releaseMessageSubject)
-                    if (!messageReleaseSubscription) status = -1
+                    if (!messageReleaseSubscription) status = 0
                 }
                 if (status === 1) {
-                    this.releaseMessageFromLocalBuffer(this.bufferedStorage).then((resObs: Observable<any>) => {
+                    this.releaseMessageFromLocalBuffer(this.bufferedStorage).then((resObs: Observable<GrpcMessage>) => {
                         resObs.subscribe({
                             next: message => releaseMessageSubject.next(message),
                             error: err => console.error(err),
@@ -54,23 +54,23 @@ export class FisRetransmissionService {
                             }
                         })
                     }).catch((err) => {
-                        status = -1
+                        status = 0
                         console.error(err)
                     })
                 }
                 if (status === 1) {
-                    this.releaseMessageFromMongoStorage().then((resObs: Subject<any>) => {
+                    this.releaseMessageFromMongoStorage().then((resObs: Subject<GrpcMessage>) => {
                         resObs.subscribe({
                             next: message => releaseMessageSubject.next(message),
                             error: err => console.error(err),
                             complete: () => console.log(`All Mongo data are transferred `)
                         })
                     }).catch((err) => {
-                        status = -1
+                        status = 0
                         console.error(err)
                     })
                 }
-                if (status === -1) {
+                if (status === 0) {
                     console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
                 }
 
@@ -86,13 +86,13 @@ export class FisRetransmissionService {
                 /* Status Chain begins */
                 if (status === 1) {
                     messageBufferSubscription = this.activateBufferSubscription(this.bufferedStorage, messageBufferSubscription, messageToBePublished)
-                    if (!messageBufferSubscription) status = -1
+                    if (!messageBufferSubscription) status = 0
                 }
                 if (status === 1) {
                     messageReleaseSubscription = this.deactivateReleaseSubscription(messageReleaseSubscription)
-                    if (messageReleaseSubscription) status = -1
+                    if (messageReleaseSubscription) status = 0
                 }
-                if (status === -1) {
+                if (status === 0) {
                     console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
                 }
             }
@@ -102,168 +102,167 @@ export class FisRetransmissionService {
                 let status: Status = 1
                 if (status === 1) {
                     messageStreamToMongo = this.activateMongoStreamSubscription(messageStreamToMongo, messageToBePublished)
-                    if (!messageStreamToMongo) status = -1
+                    if (!messageStreamToMongo) status = 0
                 }
                 if (status === 1) {
                     messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)
-                    if (messageBufferSubscription) status = -1
+                    if (messageBufferSubscription) status = 0
                 }
                 if (status === 1) {
                     this.transferBufferedMessageToMongoStorage(this.bufferedStorage, messageBufferSubscription).then((res: any[]) => {
                         if (res.length !== this.bufferedStorage.length || this.bufferedStorage.length > 0) status = -1 // this promise function should return an empty array
                     })
                 }
-                if (status === -1) {
+                if (status === 0) {
                     console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
                 }
 
             }
-            if (!report.code || report.code == "") {
+            if (!report.code) {
                 console.log(`Unknown message...`)
             }
         })
         return releaseMessageSubject
     }
 
-    private checkBufferLimit(message: Subject<any>, statusReport: Subject<ReportStatus>) {
-        message.subscribe(() => {
-            if (this.bufferedStorage.length >= this.maximumBufferLength) {
-                // for every messges that comes in, check the bufffer size, if it exceesd more than designated amount, push a red report status i
-                console.log(`Buffer length exceeds limit imposed!!!`)
-                let report: ReportStatus = {
-                    code: ColorCode.RED,
-                    message: `Buffer is exceeding limit. Initiate storage transfer to designated database.`,
-                    from: `Error Handling Service`
+    private checkBufferLimit(message: Subject<GrpcMessage>, statusReport: Subject<ReportStatus>) {
+        let status: Status = 1
+        if (status = 1) {
+            message.subscribe(() => {
+                if (this.bufferedStorage.length >= this.maximumBufferLength) {
+                    // for every messges that comes in, check the bufffer size, if it exceesd more than designated amount, push a red report status i
+                    console.log(`Buffer length exceeds limit imposed!!!`)
+                    let report: ReportStatus = {
+                        code: ColorCode.RED,
+                        message: `Buffer is exceeding limit. Initiate storage transfer to designated database.`,
+                        from: `Error Handling Service`
+                    }
+                    statusReport.next(report)
                 }
-                statusReport.next(report)
-            }
-        })
+            })
+        }
     }
 
     // Release the incoming Messages to be returned to the caller
-    private activateReleaseSubscription(messageReleaseSubscription, messageToBePublished, releaseMessageSubject): Subscription | null {
-        if (!messageReleaseSubscription) {
-            messageReleaseSubscription = messageToBePublished.subscribe({
-                next: (message: any) => {
-                    console.log(`Releasing ${message.message.appData.msgId}...`);
-                    releaseMessageSubject.next(message);
-                },
-                error: (err) => console.error(err),
-                complete: () => { },
-            });
-            console.log(`Subscription message release activated.`);
-        } else {
-            console.log(`Subscription message release is already active.`);
+    private activateReleaseSubscription(messageReleaseSubscription: Subscription | null, messageToBePublished: Subject<GrpcMessage>, releaseMessageSubject: Subject<GrpcMessage>): Subscription | null {
+        let status: Status = 1
+        if (status = 1) {
+            if (!messageReleaseSubscription) {
+                messageReleaseSubscription = messageToBePublished.subscribe({
+                    next: (message: GrpcMessage) => {
+                        console.log(`Releasing ${(message.message as MessageLog).appData.msgId}...`);
+                        releaseMessageSubject.next(message);
+                    },
+                    error: (err) => console.error(err),
+                    complete: () => { },
+                });
+                console.log(`Subscription message release activated.`);
+            } else {
+                status = 0
+                console.log(`Subscription message release is already active.`);
+            }
         }
         return messageReleaseSubscription
     }
 
     // Stop the incoming Messaes to be returned to caller
-    private deactivateReleaseSubscription(messageReleaseSubscription): Subscription | null {
-        if (messageReleaseSubscription) {
-            messageReleaseSubscription.unsubscribe();
-            messageReleaseSubscription = null;
-            console.log(`Subscription message release deactivated.`);
-        } else {
-            console.log(`Subscription message release is already deactivated.`);
+    private deactivateReleaseSubscription(messageReleaseSubscription: Subscription | null): Subscription | null {
+        let status: Status = 1
+        if (status = 1) {
+            if (messageReleaseSubscription) {
+                messageReleaseSubscription.unsubscribe();
+                messageReleaseSubscription = null;
+                console.log(`Subscription message release deactivated.`);
+            } else {
+                console.log(`Subscription message release is already deactivated.`);
+            }
         }
         return messageReleaseSubscription
     }
 
     // Begin to push the incoming messages into local instantarray
-    private activateBufferSubscription(bufferStorage: any[], messageBufferSubscription: Subscription | null, messageToBePublished: Subject<any>): Subscription | null {
-        if (!messageBufferSubscription) {
-            messageBufferSubscription = messageToBePublished.subscribe({
-                next: (message: any) => {
-                    console.log(`Buffering ${message.message.appData.msgId}...  Local array length: ${bufferStorage.length}`);
-                    bufferStorage.push(message)
-                },
-                error: (err) => console.error(err),
-                complete: () => { },
-            });
-            console.log(`Subscription message buffer activated.`);
-        } else {
-            console.log(`Subscription message buffer is already active.`);
+    private activateBufferSubscription(bufferStorage: GrpcMessage[], messageBufferSubscription: Subscription | null, messageToBePublished: Subject<GrpcMessage>): Subscription | null {
+        let status: Status = 1
+        if (status = 1) {
+            if (!messageBufferSubscription) {
+                messageBufferSubscription = messageToBePublished.subscribe({
+                    next: (message: any) => {
+                        console.log(`Buffering ${(message.message as MessageLog).appData.msgId}...  Local array length: ${bufferStorage.length}`);
+                        bufferStorage.push(message)
+                    },
+                    error: (err) => console.error(err),
+                    complete: () => { },
+                });
+                console.log(`Subscription message buffer activated.`);
+            } else {
+                status = 0
+                console.log(`Subscription message buffer is already active.`);
+            }
         }
         return messageBufferSubscription
     }
 
     // Stop pushing the incoming messages into local instantarray
     private deactivateBufferSubscription(messageBufferSubscription: Subscription | null): Subscription | null {
-        if (messageBufferSubscription) {
-            messageBufferSubscription.unsubscribe();
-            messageBufferSubscription = null;
-            console.log(`Subscription message buffer deactivated.`);
-        } else {
-            console.log(`Subscription message buffer is already deactivated.`);
+        let status: Status = 1
+        if (status) {
+
+            if (messageBufferSubscription) {
+                messageBufferSubscription.unsubscribe();
+                messageBufferSubscription = null;
+                console.log(`Subscription message buffer deactivated.`);
+            } else {
+                status = 0
+                console.log(`Subscription message buffer is already deactivated.`);
+            }
         }
         return null
     }
 
     // Change the streaming direction of the incoming messages into mongo streaming subject( to be saved in local databse )
-    private activateMongoStreamSubscription(messageStreamToMongo: Subscription | null, messageToBePublished: Subject<any>): Subscription | null {
-        if (!messageStreamToMongo) {
-            messageStreamToMongo = messageToBePublished.subscribe({
-                next: (message: any) => {
-                    console.log(`Saving ${message.message.appData.msgId}...`);
-                    this.saveToMongo(message)
-                },
-                error: (err) => console.error(err),
-                complete: () => { },
-            });
-            console.log(`Subscription message streaming to Mongo activated.`);
-        } else {
-            console.log(`Subscription message streaming to Mongo  is already active.`);
+    private activateMongoStreamSubscription(messageStreamToMongo: Subscription | null, messageToBePublished: Subject<GrpcMessage>): Subscription | null {
+        let status: Status = 1
+        if (status = 1) {
+            if (!messageStreamToMongo) {
+                messageStreamToMongo = messageToBePublished.subscribe({
+                    next: (message: any) => {
+                        console.log(`Saving ${(message.message as MessageLog).appData.msgId}...`);
+                        this.saveToMongo(message)
+                    },
+                    error: (err) => console.error(err),
+                    complete: () => { },
+                });
+                console.log(`Subscription message streaming to Mongo activated.`);
+            } else {
+                status = 0
+                console.log(`Subscription message streaming to Mongo  is already active.`);
+            }
         }
         return messageStreamToMongo
     }
 
     // Stop or cut off the mongo streaming
     private deactivateMongoStreamSubscription(messageStreamToMongo: Subscription | null): Subscription | null {
-        if (messageStreamToMongo) {
-            messageStreamToMongo.unsubscribe();
-            messageStreamToMongo = null;
-            console.log(`Subscription message streaming to Mongo deactivated.`);
-        } else {
-            console.log(`Subscription message streaming to Mongo is already deactivated.`);
-        }
-        return messageStreamToMongo
-    }
-
-    // Store in json file in this project folder. To be enabled in future
-    private async transferMessageToLocalStorage(message: Subject<any>): Promise<void> {
-        let localArray: any[] = this.bufferedStorage
-        let filename = `localstorage.json`;
-
-        while (localArray.length > 0) {
-            let objectToWrite = this.bufferedStorage[0];
-            await writeMessage(objectToWrite, filename)
-        }
-        message.subscribe((message: any) => {
-            writeMessage(message, filename)
-        })
-
-        if (localArray.length < 1) this.bufferedStorage = localArray
-        console.log('Local Array is empty. Finished transferring to files.')
-
-        async function writeMessage(message: any, filename: string) {
-            try {
-                let stringifiedMessage = JSON.stringify(message);
-                await fs.promises.appendFile(filename, stringifiedMessage + "\r\n")
-                console.log(`Successfully transferred ${filename}`);
-                localArray.shift();
-            } catch (err) {
-                console.error(`Error trasferring ${filename}:`, err);
+        let status: Status = 1
+        if (status = 1) {
+            if (messageStreamToMongo) {
+                messageStreamToMongo.unsubscribe();
+                messageStreamToMongo = null;
+                console.log(`Subscription message streaming to Mongo deactivated.`);
+            } else {
+                status = 0
+                console.log(`Subscription message streaming to Mongo is already deactivated.`);
             }
         }
+        return messageStreamToMongo
     }
 
     // To be used by mongoStreamSubscription to perform the saving execution
-    private async saveToMongo(message: any): Promise<boolean> {
+    private async saveToMongo(message: GrpcMessage): Promise<boolean> {
         return new Promise((resolve, reject) => {
             // let messageModel: Model<any> = this.mongoConnection.model('Message', require('../models/message.schema'))
             this.messageModel.create(message).then(() => {
-                console.log(`Saved MessageID ${message.message.appData.msgId} into ${this.mongoUrl}`);
+                console.log(`Saved MessageID ${(message.message as MessageLog).appData.msgId} into ${this.mongoUrl}`);
                 resolve(true)
             }).catch((err) => {
                 console.log(`MongoSaveError: ${err.message}`)
@@ -273,67 +272,79 @@ export class FisRetransmissionService {
     }
 
     // As the name implies, transder all the messages from the local instance into mongoStorage. Local instance should be emptied after transfer is completed
-    private async transferBufferedMessageToMongoStorage(bufferedMessage: any[], messageBufferSubscription): Promise<any[]> {
+    private async transferBufferedMessageToMongoStorage(bufferedMessage: GrpcMessage[], messageBufferSubscription: Subscription | null): Promise<GrpcMessage[]> {
         return new Promise((resolve, reject) => {
-            let bufferedStorage: Observable<any> = from(bufferedMessage)
-            bufferedStorage.subscribe({
-                next: (message: any) => {
-                    this.saveToMongo(message).then((res) => {
-                        console.log(`Message ${message.message.appData.msgId} saved successfully...`)
-                    }).catch((err) => console.error(err))
-                },
-                error: (error) => {
-                    reject(error)
-                    console.error(error)
-                },
-                complete: () => {
-                    this.bufferedStorage = []
-                    if (messageBufferSubscription) {
-                        console.log(`All ${bufferedMessage.length} buffered messages have been sent for transfer to ${this.mongoUrl}. Current length: ${this.bufferedStorage.length}`)
+            let status: Status = 1
+            if (status = 1) {
+                let bufferedStorage: Observable<GrpcMessage> = from(bufferedMessage)
+                bufferedStorage.subscribe({
+                    next: (message: any) => {
+                        this.saveToMongo(message).then((res) => {
+                            console.log(`Message ${(message.message as MessageLog).appData.msgId} saved successfully...`)
+                        }).catch((err) => console.error(err))
+                    },
+                    error: (error) => {
+                        reject(error)
+                        console.error(error)
+                    },
+                    complete: () => {
+                        this.bufferedStorage = []
+                        if (messageBufferSubscription) {
+                            console.log(`All ${bufferedMessage.length} buffered messages have been sent for transfer to ${this.mongoUrl}. Current length: ${this.bufferedStorage.length}`)
+                        }
+                        resolve(this.bufferedStorage)
                     }
-                    resolve(this.bufferedStorage)
-                }
-            })
+                })
+            }
         })
     }
 
     // Transfer stored messages from the local instance back into the stream to be returned to the caller.
-    private async releaseMessageFromLocalBuffer(bufferedStorage: any[]): Promise<Observable<any>> {
+    private async releaseMessageFromLocalBuffer(bufferedStorage: GrpcMessage[]): Promise<Observable<GrpcMessage>> {
         return new Promise((resolve, reject) => {
-            if (bufferedStorage.length > 1) {
-                let caseVariable = this.bufferedStorage.length > 1;
-                console.log(`Releasing data from local buffer instance. There ${caseVariable ? "is" : "are"} ${this.bufferedStorage.length} messages...`);
-                let returnArrayObs: Observable<any> = from(bufferedStorage)
-                resolve(returnArrayObs)
-            } else {
-                let message = `There is no data in stored in local instance`
-                reject(message)
+            let status: Status = 1
+            if (status = 1) {
+                if (bufferedStorage.length > 1) {
+                    let caseVariable = this.bufferedStorage.length > 1;
+                    console.log(`Releasing data from local buffer instance. There ${caseVariable ? "is" : "are"} ${this.bufferedStorage.length} messages...`);
+                    let returnArrayObs: Observable<GrpcMessage> = from(bufferedStorage)
+                    resolve(returnArrayObs)
+                } else {
+                    let message = `There is no data in stored in local instance`
+                    reject(message)
+                }
             }
         })
     }
 
     // Transder all the stored messages in designated mongo databases. It should be empty after all the data has been transferred.
-    private async releaseMessageFromMongoStorage(): Promise<Subject<any>> {
+    private async releaseMessageFromMongoStorage(): Promise<Subject<GrpcMessage>> {
         return new Promise((resolve, reject) => {
-            let dataSubject: Subject<any> = new Subject()
-            this.extractAllMessages(dataSubject)
-            resolve(dataSubject)
+            let status: Status = 1
+            if (status = 1) {
+                let dataSubject: Subject<GrpcMessage> = new Subject()
+                this.extractAllMessages(dataSubject)
+                resolve(dataSubject)
+            }
         })
     }
 
     // Connect to designated mongodatabase.
     private async connectToMongoDatabase(): Promise<any> {
         return new Promise((resolve, reject) => {
-            console.log(this.mongoUrl)
-            this.mongoConnection = mongoose.createConnection(this.mongoUrl)
-            this.mongoConnection.on('error', (error) => {
-                console.error('Connection error:', error);
-                resolve('')
-            });
-            this.mongoConnection.once('open', () => {
-                console.log(`Connected to ${process.env.MONGO}`);
-                this.messageModel = this.mongoConnection.model('Message', require('../models/message.schema'));
-            });
+            let status: Status = 1
+            if (status = 1) {
+                console.log(this.mongoUrl)
+                this.mongoConnection = mongoose.createConnection(this.mongoUrl)
+                this.mongoConnection.on('error', (error) => {
+                    console.error('Connection error:', error);
+                    resolve('')
+                });
+                this.mongoConnection.once('open', () => {
+                    console.log(`Connected to ${process.env.MONGO}`);
+                    this.messageModel = this.mongoConnection.model('Message', require('../models/message.schema'));
+                });
+            }
         })
     }
 
@@ -349,29 +360,64 @@ export class FisRetransmissionService {
         }
     }
 
-    public async extractAllMessages(subjectArgs: Subject<any>): Promise<void> {
-        if (this.messageModel) {
-            const eventStream = this.messageModel.find().lean().cursor()
-            eventStream.on('data', (message) => {
-                // Emit each document to the subject
-                subjectArgs.next(message);
-            });
-            eventStream.on('end', async () => {
-                // All data has been streamed, complete the subject
-                subjectArgs.complete();
-                // Delete the data once it has been streamed
-                try {
-                    await this.messageModel.deleteMany({});
-                    console.log('Data in Mongo deleted successfully.');
-                } catch (err) {
-                    console.error('Error deleting data:', err);
-                }
-            });
-        } else {
-            console.log(`Error: Message Model is ${this.messageModel}!! Please set up the mongoose connectino properly!`)
+    public async extractAllMessages(subjectArgs: Subject<GrpcMessage>): Promise<void> {
+        // Need to resolve the issue of streaming in a specific order that is sequential
+        let status: Status = 1
+        if (status = 1) {
+            if (this.messageModel) {
+                const eventStream = this.messageModel.find().lean().cursor()
+                eventStream.on('data', (message) => {
+                    // Emit each document to the subject
+                    subjectArgs.next(message);
+                });
+                eventStream.on('end', async () => {
+                    // All data has been streamed, complete the subject
+                    subjectArgs.complete();
+                    // Delete the data once it has been streamed
+                    try {
+                        await this.messageModel.deleteMany({});
+                        console.log('Data in Mongo deleted successfully.');
+                    } catch (err) {
+                        console.error('Error deleting data:', err);
+                    }
+                });
+            } else {
+                status = 0
+                console.log(`Error: Message Model is ${this.messageModel}!! Please set up the mongoose connectino properly!`)
+            }
         }
     }
 
 }
 
-type Status = -1 | 0 | 1 // For status chain effect
+
+
+
+
+// Store in json file in this project folder. To be enabled in future
+// private async transferMessageToLocalStorage(message: Subject<any>): Promise<void> {
+//     let localArray: any[] = this.bufferedStorage
+//     let filename = `localstorage.json`;
+
+//     while (localArray.length > 0) {
+//         let objectToWrite = this.bufferedStorage[0];
+//         await writeMessage(objectToWrite, filename)
+//     }
+//     message.subscribe((message: any) => {
+//         writeMessage(message, filename)
+//     })
+
+//     if (localArray.length < 1) this.bufferedStorage = localArray
+//     console.log('Local Array is empty. Finished transferring to files.')
+
+//     async function writeMessage(message: any, filename: string) {
+//         try {
+//             let stringifiedMessage = JSON.stringify(message);
+//             await fs.promises.appendFile(filename, stringifiedMessage + "\r\n")
+//             console.log(`Successfully transferred ${filename}`);
+//             localArray.shift();
+//         } catch (err) {
+//             console.error(`Error trasferring ${filename}:`, err);
+//         }
+//     }
+// }

+ 7 - 353
services/grpc.service.ts

@@ -1,15 +1,14 @@
-import * as grpc from '@grpc/grpc-js';
 import { Subject, Subscription, take, takeUntil } from 'rxjs';
 import { ColorCode, GrpcConnectionType, MessageLog, ReportStatus } from '../interfaces/general.interface';
-import { Status } from '@grpc/grpc-js/build/src/constants';
-const message_proto = require('./protos/server.proto')
+import { GrpcServiceMethod } from './service.method';
+
 
 export class GrpcService {
     private grpcServerConnection: any = {}
     private incomingRequest: Subject<any> = new Subject()
     private incomingResponse: Subject<any> = new Subject()
 
-    constructor() { }
+    constructor(private grpcServiceMethod: GrpcServiceMethod) { }
 
     public getIncomingRequest(): Subject<any> {
         return this.incomingRequest
@@ -58,16 +57,16 @@ export class GrpcService {
         while (true) {
             try {
                 if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'bidirectional') {
-                    await this.createBidirectionalStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl);
+                    await this.grpcServiceMethod.createBidirectionalStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl, this.incomingResponse);
                 }
                 if (connectionType.instanceType == 'client' && connectionType.serviceMethod == 'server streaming') {
-                    await this.createServerStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl);
+                    await this.grpcServiceMethod.createServerStreamingClient(serverUrl, alreadyHealthCheck, messageToBeTransmitted, statusControl, this.incomingResponse);
                 }
                 if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'bidirectional') {
-                    await this.createGrpcBidirectionalServer(serverUrl, messageToBeTransmitted, statusControl)
+                    await this.grpcServiceMethod.createGrpcBidirectionalServer(serverUrl, messageToBeTransmitted, statusControl, this.grpcServerConnection, this.incomingRequest)
                 }
                 if (connectionType.instanceType == 'server' && connectionType.serviceMethod == 'server streaming') {
-                    await this.createServerStreamingServer(serverUrl, alreadyHealthCheck, messageToBePublished, statusControl)
+                    await this.grpcServiceMethod.createServerStreamingServer(serverUrl, alreadyHealthCheck, messageToBePublished, statusControl, this.grpcServerConnection, this.incomingRequest)
                 }
                 // If connection resolves (indicating failure), increment the count
                 consecutiveResolutions++;
@@ -120,351 +119,6 @@ export class GrpcService {
         }
     }
 
-    private async createGrpcBidirectionalServer(serverUrl: string, messageToBeStream: Subject<any>, statusControl: Subject<ReportStatus>): Promise<any> { // '0.0.0.0:3001'
-        return new Promise((resolve, reject) => {
-            try {
-                // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
-                let server: grpc.Server = new grpc.Server();
-                // Add the streamingData function to the gRPC service
-                // Define your message_proto.Message service methods
-
-                server.addService(message_proto.Message.service, {
-                    sendMessageStream: (call) => {
-                        console.log(`Client connected from: ${call.getPeer()}`);
-                        let report: ReportStatus = {
-                            code: ColorCode.GREEN,
-                            message: `Client connected!!`,
-                            from: `Bidirectional Instance`
-                        }
-                        statusControl.next(report)
-
-                        // Right now this is being broadcast.
-                        let subscription: Subscription = messageToBeStream.subscribe({
-                            next: (payload: any) => {
-                                let noConnection = call.cancelled // check connection for each and every message
-                                if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check 
-                                    let report: ReportStatus = {
-                                        code: ColorCode.YELLOW,
-                                        message: `Client is not alive.....`,
-                                        payload: payload,
-                                        from: `Bidirectional Instance`
-                                    }
-                                    statusControl.next(report) // no connection. Tell buffer service to stop releasing messages
-                                    subscription.unsubscribe() // i still dont understand why i wrote this here
-                                } else {
-                                    console.log(`Sending ${payload.appData.msgId}`)
-                                    let message: string = JSON.stringify(payload)
-                                    call.write({ message })
-                                }
-                            },
-                            error: err => console.error(err),
-                            complete: () => { } //it will never complete
-                        })
-
-                        call.on('data', (data: any) => {
-                            // console.log(data) // it does return in string format
-                            let payload = JSON.parse(data.message)
-                            console.log(`Received Message from Client: ${payload.appData?.msgId}`);
-                            // Forward the received message to the RxJS subject
-                            // let respmsg: any = {
-                            //     msgId: payload.appData?.msgId,
-                            //     confirmationMessage: `Message ${payload.appData?.msgId} acknowledged!`
-                            // }
-                            // let message: string = JSON.stringify(respmsg)
-                            // console.log(`Responding to client: ${respmsg.msgId}`);
-                            // Note: The parameter here MUST BE STRICTLY be the same letter as defined in proto. Eg: message MessageRequest { string >>'message'<< = 1 }
-                            // call.write({ message });
-                        });
-
-                        call.on('end', () => {
-                            console.log('Client stream ended');
-                            // but the stream never ends. THis is not a reliable way to tell if a client is disconnected
-                        });
-
-                        call.on('error', (err) => {
-                            // Error that may occue during the rpc call. Id there's an error, put a callbacn function there to check the connection for client
-                            // emit a yellow report to halt message release. If the server does not reply to the callback function, then emit a red card
-                            // the call back function will be to write and then the client should response immediately through test
-                        });
-
-                        call.on('close', () => {
-                            console.log('Unknown cause for diconnectivity');
-                            // Handle client closure, which may be due to errors or manual termination
-                        });
-
-                    },
-
-                    Check: (_, callback) => {
-                        // health check logic here
-                        // for now it is just sending the status message over to tell the client it is alive
-                        // For simplicity, always return "SERVING" as status
-                        callback(null, { status: 'SERVING' });
-                    },
-                });
-
-                // Bind and start the server
-                server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
-                    console.log(`gRPC server is running on ${serverUrl}`);
-                    server.start();
-                });
-                this.grpcServerConnection[serverUrl] = server
-            }
-            catch (error) {
-                resolve(error)
-            }
-
-        })
-    }
-
-
-    private async createBidirectionalStreamingClient(server: string, alreadyHealthCheck: boolean, messageToBeTransmitted: Subject<any>, statusControl: Subject<ReportStatus>): Promise<string> {
-        let subscription: any
-        let unsubscribed: boolean = false
-
-        return new Promise(async (resolve, reject) => {
-            const client = new message_proto.Message(server, grpc.credentials.createInsecure());
-            const call = client.sendMessageStream();
-
-            this.checkConnectionHealth(client, statusControl, alreadyHealthCheck)
-
-            call.on('status', (status: Status) => { // this is useless in streaming(on for unary)
-                // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
-                // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming
-                // if (status == grpc.status.UNAVAILABLE) { // only returns a status when there's error. Otherwise it just waits
-                //     resolve('No connection established. Server is not responding..')
-                // }
-            });
-
-            // All the grpc operations are here
-            // Subscribe to the RxJS subject to send data to the server
-            subscription = messageToBeTransmitted.subscribe({
-                next: (payload: any) => {
-                    if (!unsubscribed) {
-                        console.log(`Sending ${payload.appData.msgId}`)
-                        let message: string = JSON.stringify(payload)
-                        call.write({ message })
-                    }
-                },
-                error: err => console.error(err),
-                complete: () => { } //it will never complete
-            });
-
-
-            call.on('data', (data: any) => {
-                let message = JSON.parse(data.message)
-                console.log(`Received message from Server: ${message.msgId ?? message.appData?.msgId ?? `Invalid`}`);
-            });
-
-            call.on('error', (err) => {
-                // console.log(`Something wrong with RPC call...`)
-                if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks
-                    subscription.unsubscribe();
-                    unsubscribed = true;
-                }
-                resolve('Server Error');
-            });
-
-            call.on('end', () => {
-                if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks
-                    subscription.unsubscribe();
-                    unsubscribed = true;
-                }
-                resolve('Server Error');
-            });
-
-        })
-    }
-
-    private async createServerStreamingServer(serverUrl: string, alreadyHealthCheck: boolean, messageToBeStream: Subject<any>, statusControl: Subject<ReportStatus>): Promise<any> { // '0.0.0.0:3001'
-        return new Promise((resolve, reject) => {
-            try {
-                // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
-                let server: grpc.Server = new grpc.Server();
-                // Add the streamingData function to the gRPC service
-                // Define your message_proto.Message service methods
-
-                server.addService(message_proto.Message.service, {
-                    HandleMessage: (call) => { // this is for bidirectional streaming. Need to have another one for unary calls for web clients
-                        let report: ReportStatus = { //let the flow come through 
-                            code: ColorCode.GREEN,
-                            message: `Client connected!!`,
-                            from: `Server Streaming Instance`
-                        }
-                        statusControl.next(report)
-
-                        let request = call.request // unary request from client to be responded with a stream
-                        console.log(`Received unary call.... request: ${request.id}`)
-                        this.incomingRequest.next(request)
-                        console.log(`Client connected from: ${call.getPeer()}`);
-
-
-                        let subscription: Subscription = messageToBeStream.subscribe({
-                            next: (response: any) => {
-                                if (response.id == request.id) {
-                                    // console.log(`${response.id} vs ${request.id}`)
-                                    // Check who's response it belongs to
-                                    let noConnection = call.cancelled // check connection for each and every message
-                                    if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check 
-                                        let report: ReportStatus = {
-                                            code: ColorCode.YELLOW,
-                                            message: `Client is not alive.....`,
-                                            payload: response,
-                                            from: `Server Streaming Instance`
-                                        }
-                                        statusControl.next(report)
-                                        subscription.unsubscribe()
-                                    } else {
-                                        console.log(`Sending ${response.message.appData.msgId} in respond to unary ${request.id}`)
-                                        // let respond: string = JSON.stringify(response.message)
-                                        let message = {
-                                            id: response.id,
-                                            message: JSON.stringify(response.message)
-                                        }
-                                        // console.log(message)
-                                        call.write(message)
-                                    }
-                                }
-                            },
-                            error: err => {
-                                console.error(err)
-                                let report: ReportStatus = {
-                                    code: ColorCode.YELLOW,
-                                    message: `Message streaming error`,
-                                    from: `Server Streaming Instance`
-                                }
-                                statusControl.next(report)
-                                subscription.unsubscribe()
-                            },
-                            complete: () => {
-                                console.log(`Stream response completed for ${request.id}`)
-                                subscription.unsubscribe()
-                                // call.end()
-                            }
-                        })
-                    },
-
-                    Check: (_, callback) => {
-                        // health check logic here
-                        // for now it is just sending the status message over to tell the client it is alive
-                        // For simplicity, always return "SERVING" as status
-                        callback(null, { status: 'SERVING' });
-                    },
-                });
-
-                // Bind and start the server
-                server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
-                    console.log(`gRPC server is running on ${serverUrl}`);
-                    server.start();
-                });
-                this.grpcServerConnection[serverUrl] = server
-            }
-            catch (error) {
-                resolve(error)
-            }
-
-        })
-    }
-
-    // Create a server streaming call. Please note that the structure of the code would not be the same as bidirectional because of it's unary nature
-    private async createServerStreamingClient(server: string, alreadyHealthCheck: boolean, unaryRequestSubject: Subject<any>, statusControl: Subject<ReportStatus>): Promise<string> {
-        return new Promise(async (resolve, reject) => {
-            const client = new message_proto.Message(server, grpc.credentials.createInsecure());
-            this.checkConnectionHealth(client, statusControl, alreadyHealthCheck) // perform check to see if server is alive, if not terminate this grpc instant and create again
-
-            /* Memory leak some where here */
-
-            unaryRequestSubject.subscribe({
-                next: (request: any) => {
-                    let message = {
-                        id: request.appData?.msgId,
-                        message: JSON.stringify(request)
-                    }
-
-                    console.log(`<${message.id}> Sending request: ${message.id} over to server....`)
-                    const call = client.HandleMessage(message)
-
-                    call.on('status', (status: Status) => {
-                        // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
-                        // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming
-                        if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
-                            console.log(`Message trasmission operation is successful`)
-                            // RPC completed successfully
-                        } if (status == grpc.status.UNAVAILABLE) {
-                            resolve('No connection established. Server is not responding..')
-                            let report = {
-                                code: ColorCode.YELLOW,
-                                message: `Server doesn't seem to be alive. Error returned.`,
-                                from: `Server Streaming Client Instance`
-                            }
-                            statusControl.next(report)
-                        }
-                    });
-
-                    call.on('data', (data: any) => {
-                        // console.log(`Received stream response from Server. Receiver: ${message.id}`);
-                        let response = {
-                            id: data.id,
-                            message: JSON.parse(data.message)
-                        }
-                        console.log(response)
-                        this.incomingResponse.next(response)
-                    });
-
-                    call.on('error', (err) => {
-                        let report = {
-                            code: ColorCode.YELLOW,
-                            message: `Server doesn't seem to be alive. Error returned.`,
-                            from: `Server Streaming Client Instance`
-                        }
-                        statusControl.next(report)
-                        // resolve(err)
-                    });
-
-                    call.on('end', () => { // this is for gracefull || willfull termination from the server
-                        console.log(`Streaming Response is completed`)
-                        let report = {
-                            code: ColorCode.YELLOW,
-                            message: `Server doesn't seem to be alive. Error returned.`,
-                            from: `Server Streaming Client Instance`
-                        }
-                        statusControl.next(report)
-                        // subscription.unsubscribe(); // this is not correct i am just destroying the entire operation. i should be terminating the instance to which i think it does by it self
-                        // resolve('Server Error');
-                    });
-                    /* Avoid rsolving at the moment. Because initially it was intended for the bi directional streaming to continue to instantiate the client
-                    should there be any rpc errors or internet connection errors. In this case, we just want to listen to incoming unary call without terminating the session
-                    A separate resolve will be prepared for the subject should it fails in its operation */
-                },
-                error: error => {
-                    console.error(error),
-                        resolve(error)
-                },
-                complete: () => { } // should not complete since this is an indefinite listening process to transmit requests made by relevant client application
-            })
-
-        })
-    }
-
-    // Check connection To be Update. This function is destroying my code flow
-    private async checkConnectionHealth(client: any, statusControl: Subject<ReportStatus>, alreadyHealthCheck: boolean): Promise<boolean> {
-        return new Promise((resolve, reject) => {
-            client.Check({}, (error, response) => {
-                if (response) {
-                    console.log(`GRPC Health check status: ${response.status} Server Connected`);
-                    let report: ReportStatus = {
-                        code: ColorCode.GREEN,
-                        message: `Good to go!!!`,
-                        from: `GRPC health check`
-                    }
-                    statusControl.next(report)
-                } else {
-                    if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
-
-                }
-            })
-        })
-    }
-
 }
 
 

+ 476 - 0
services/service.method.ts

@@ -0,0 +1,476 @@
+import * as grpc from '@grpc/grpc-js';
+import { Subject, Subscription } from "rxjs";
+import { ReportStatus, ColorCode, GrpcMessage, MessageLog } from "../interfaces/general.interface";
+import { Status } from '@grpc/grpc-js/build/src/constants';
+const message_proto = require('./protos/server.proto')
+
+export class GrpcServiceMethod {
+
+    public async createServerStreamingServer(
+        serverUrl: string,
+        alreadyHealthCheck: boolean,
+        messageToBeStream: Subject<any>,
+        statusControl: Subject<ReportStatus>,
+        grpcServerConnection: any,
+        incomingRequest: Subject<GrpcMessage>
+    ): Promise<any> { // '0.0.0.0:3001'
+        return new Promise((resolve, reject) => {
+            try {
+                // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
+                let server: grpc.Server = new grpc.Server();
+                // Add the streamingData function to the gRPC service
+                // Define your message_proto.Message service methods
+
+                server.addService(message_proto.Message.service, {
+                    HandleMessage: (call) => {
+                        incomingRequest.next(call.request)
+                        console.log(call.request)
+                        console.log(`Intializing main stream response. Confirmation from ${call.request.id}`)
+                        // This will be the main channel for streaming them response messages
+                        let report: ReportStatus = { //let the flow come through 
+                            code: ColorCode.GREEN,
+                            message: `Client connected!!`,
+                            from: `Server Streaming Instance`
+                        }
+                        statusControl.next(report)
+
+                        let subscription: Subscription = messageToBeStream.subscribe({
+                            next: (response: GrpcMessage) => {
+                                // console.log(`${response.id} vs ${request.id}`)
+                                // Check who's response it belongs to
+                                let noConnection = call.cancelled // check connection for each and every message
+                                if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check 
+                                    let report: ReportStatus = {
+                                        code: ColorCode.YELLOW,
+                                        message: `Client is not alive.....`,
+                                        payload: response,
+                                        from: `Server Streaming Instance`
+                                    }
+                                    statusControl.next(report)
+                                    subscription.unsubscribe()
+                                } else {
+                                    console.log(`Sending ${(response.message as MessageLog).appData.msgId} in respond to request: ${call.request.id}`)
+                                    let message = {
+                                        id: response.id,
+                                        message: JSON.stringify(response.message)
+                                    }
+                                    call.write(message)
+                                }
+                            },
+                            error: err => {
+                                console.error(err)
+                                let report: ReportStatus = {
+                                    code: ColorCode.YELLOW,
+                                    message: `Message streaming error`,
+                                    from: `Server Streaming Instance`
+                                }
+                                statusControl.next(report)
+                                subscription.unsubscribe()
+                            },
+                            complete: () => {
+                                console.log(`Stream response completed for ${call.request.id}`)
+                                subscription.unsubscribe()
+                                // call.end()
+                            }
+                        })
+
+                        if (call.request.id != '0000') {
+                            console.log(call.request)
+                            /* Case from handling incoming request from clients. This no longer takes into consideration where the request is coming
+                            from. If the client is subscribed to the server, it will receive it's due. */
+                            // console.log(`Client connected from: ${call.getPeer()}`);
+                            let request = call.request // unary request from client to be responded with a stream
+                            console.log(`Received unary call.... request: ${request.id}`)
+                            call.cancel()
+                        }
+
+                    },
+
+                    Check: (_, callback) => {
+                        // health check logic here
+                        // for now it is just sending the status message over to tell the client it is alive
+                        // For simplicity, always return "SERVING" as status
+                        callback(null, { status: 'SERVING' });
+                    },
+                });
+
+                // Bind and start the server
+                server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
+                    console.log(`gRPC server is running on ${serverUrl}`);
+                    server.start();
+                });
+                grpcServerConnection[serverUrl] = server
+            }
+            catch (error) {
+                resolve(error)
+            }
+
+        })
+    }
+
+    // Create a server streaming call. Please note that the structure of the code would not be the same as bidirectional because of it's unary nature
+    public async createServerStreamingClient(
+        server: string,
+        alreadyHealthCheck: boolean,
+        unaryRequestSubject: Subject<any>,
+        statusControl: Subject<ReportStatus>,
+        incomingResponse: Subject<GrpcMessage>
+    ): Promise<string> {
+        return new Promise(async (resolve, reject) => {
+            const client = new message_proto.Message(server, grpc.credentials.createInsecure());
+
+            unaryRequestSubject.subscribe({
+                next: (request: any) => {
+                    let message = {
+                        id: request.id,
+                        message: JSON.stringify(request.message)
+                    }
+                    console.log(message)
+                    console.log(`Sending request: ${message.id} over to server....`)
+                    let call = client.HandleMessage(message)
+
+                    call.on('status', (status: Status) => {
+                        if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
+                            console.log(`Message trasmission operation is successful`)
+                            // RPC completed successfully
+                        } if (status == grpc.status.UNAVAILABLE) {
+                            resolve('No connection established. Server is not responding..')
+                            let report = {
+                                code: ColorCode.YELLOW,
+                                message: `Server doesn't seem to be alive. Error returned.`,
+                                payload: request,
+                                from: `Server Streaming Client Instance`
+                            }
+                            statusControl.next(report)
+                        }
+                    });
+
+                    call.on('data', (data: any) => {
+                        let response: GrpcMessage = {
+                            id: data.id,
+                            message: JSON.parse(data.message)
+                        }
+                        incomingResponse.next(response)
+                        console.log((response.message as MessageLog).appData.msgId)
+                    });
+
+                    call.on('error', (err) => {
+                    });
+
+                    call.on('end', () => { // this is for gracefull || willfull termination from the server
+                        console.log(`Terminating Stream Request. Directing response to main channel`)
+                    });
+
+                },
+                error: error => {
+                    console.error(error),
+                        resolve(error)
+                },
+                complete: () => { } // should not complete since this is an indefinite listening process to transmit requests made by relevant client application
+            })
+
+            unaryRequestSubject.next({ id: `0000`, message: `Intiate Main Stream Channel Response` })
+
+            this.checkConnectionHealth(client, statusControl, alreadyHealthCheck) // perform check to see if server is alive, if not terminate this grpc instant and create again
+            // initiate(statusControl, incomingResponse).then(() => {
+            //     streamRequest(unaryRequestSubject, statusControl)
+            // }).catch(() => {
+            //     resolve('Trigger Reconnection logic. Terminate this client instance and creating new ones')
+            // })
+
+            // async function intialize(statusControl: Subject<any>, incomingResponse: Subject<any>) {
+            // async function initiate(statusControl: Subject<ReportStatus>, incomingResponse: Subject<any>) {
+            //     let greenlight: ReportStatus = {
+            //         code: ColorCode.GREEN,
+            //         message: `Initial Client set up. Release unary Request`,
+            //         from: `Server Streaming Client Instance`
+            //     }
+            //     statusControl.next(greenlight)
+            //     let report: ReportStatus = {
+            //         code: ColorCode.YELLOW,
+            //         message: `Server doesn't seem to be alive. Error returned.`,
+            //         from: `Server Streaming Client Instance`
+            //     }
+            //     let call = client.HandleMessage({
+            //         id: '0000',
+            //         message: `Establishing channel for response stream. Channel for response!`
+            //     })
+
+            //     call.on('status', (status: Status) => {
+            //         // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
+            //         // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming
+            //         if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
+            //             console.log(`Message trasmission operation is successful`)
+            //             resolve('')
+            //         } if (status == grpc.status.UNAVAILABLE) {
+            //             resolve('No connection established. Server is not responding..')
+            //             statusControl.next(report)
+            //             reject()
+            //         }
+            //     });
+
+            //     // This is and should be the only channel for response. THe demultiplexing will be handled by application logic
+            //     call.on('data', (data: any) => {
+            //         // console.log(`Received stream response from Server. Receiver: ${message.id}`);
+            //         let response = {
+            //             id: data.id,
+            //             message: JSON.parse(data.message)
+            //         }
+            //         // console.log(response)
+            //         incomingResponse.next(response)
+            //     });
+
+            //     call.on('error', (err) => {
+            //         statusControl.next(report)
+            //     });
+
+            //     call.on('end', () => { // this is for gracefull || willfull termination from the server
+            //         console.log(`Streaming Response is completed`)
+            //         statusControl.next(report)
+            //     });
+            // }
+
+            // // }
+
+            // // function streamRequest(unaryRequestSubject: Subject<any>, statusControl: Subject<any>) {
+            // // Just send request, no need to listen to response. IT will be handled by the channel above.
+            // function streamRequest(unaryRequestSubject: Subject<GrpcMessage>, statusControl: Subject<ReportStatus>) {
+            //     unaryRequestSubject.subscribe({
+            //         next: (request: any) => {
+            //             let message = {
+            //                 id: request.id,
+            //                 message: JSON.stringify(request)
+            //             }
+            //             console.log(`Sending request: ${message.id} over to server....`)
+            //             const call = client.HandleMessage(message)
+
+            //             call.on('status', (status: Status) => {
+            //                 if (status == grpc.status.OK) { // only returns a status when there's error. Otherwise it just waits
+            //                     console.log(`Message trasmission operation is successful`)
+            //                     // RPC completed successfully
+            //                 } if (status == grpc.status.UNAVAILABLE) {
+            //                     resolve('No connection established. Server is not responding..')
+            //                     let report = {
+            //                         code: ColorCode.YELLOW,
+            //                         message: `Server doesn't seem to be alive. Error returned.`,
+            //                         payload: request,
+            //                         from: `Server Streaming Client Instance`
+            //                     }
+            //                     statusControl.next(report)
+            //                 }
+            //             });
+
+            //             // call.on('data', (data: any) => {
+            //             //     let response = {
+            //             //         data: data.id,
+            //             //         message: JSON.parse(data.message)
+            //             //     }
+            //             //     console.log(response.message.appData.msgId)
+            //             // });
+
+            //             call.on('error', (err) => {
+            //             });
+
+            //             call.on('end', () => { // this is for gracefull || willfull termination from the server
+            //                 console.log(`Terminating Stream Request. Directing response to main channel`)
+            //             });
+
+            //         },
+            //         error: error => {
+            //             console.error(error),
+            //                 resolve(error)
+            //         },
+            //         complete: () => { } // should not complete since this is an indefinite listening process to transmit requests made by relevant client application
+            //     })
+            // }
+
+        })
+    }
+
+    public async createGrpcBidirectionalServer(
+        serverUrl: string,
+        messageToBeStream: Subject<any>,
+        statusControl: Subject<ReportStatus>,
+        grpcServerConnection: any,
+        incomingRequest: Subject<GrpcMessage>
+    ): Promise<any> { // '0.0.0.0:3001'
+        return new Promise((resolve, reject) => {
+            try {
+                // https://github.com/grpc/proposal/blob/master/L5-node-client-interceptors.md
+                let server: grpc.Server = new grpc.Server();
+                // Add the streamingData function to the gRPC service
+                // Define your message_proto.Message service methods
+
+                server.addService(message_proto.Message.service, {
+                    sendMessageStream: (call) => {
+                        console.log(`Client connected from: ${call.getPeer()}`);
+                        let report: ReportStatus = {
+                            code: ColorCode.GREEN,
+                            message: `Client connected!!`,
+                            from: `Bidirectional Instance`
+                        }
+                        statusControl.next(report)
+
+                        // Right now this is being broadcast.
+                        let subscription: Subscription = messageToBeStream.subscribe({
+                            next: (payload: any) => {
+                                let noConnection = call.cancelled // check connection for each and every message
+                                if (noConnection === true) { // that means there's no connection, beccause the cancel operation is determined to check 
+                                    let report: ReportStatus = {
+                                        code: ColorCode.YELLOW,
+                                        message: `Client is not alive.....`,
+                                        payload: payload,
+                                        from: `Bidirectional Instance`
+                                    }
+                                    statusControl.next(report) // no connection. Tell buffer service to stop releasing messages
+                                    subscription.unsubscribe() // i still dont understand why i wrote this here
+                                } else {
+                                    console.log(`Sending ${payload.appData.msgId}`)
+                                    let message: string = JSON.stringify(payload)
+                                    call.write({ message })
+                                }
+                            },
+                            error: err => console.error(err),
+                            complete: () => { } //it will never complete
+                        })
+
+                        call.on('data', (data: any) => {
+                            // console.log(data) // it does return in string format
+                            let payload = JSON.parse(data.message)
+                            console.log(`Received Message from Client: ${payload.appData?.msgId}`);
+                            // Forward the received message to the RxJS subject
+                            // let respmsg: any = {
+                            //     msgId: payload.appData?.msgId,
+                            //     confirmationMessage: `Message ${payload.appData?.msgId} acknowledged!`
+                            // }
+                            // let message: string = JSON.stringify(respmsg)
+                            // console.log(`Responding to client: ${respmsg.msgId}`);
+                            // Note: The parameter here MUST BE STRICTLY be the same letter as defined in proto. Eg: message MessageRequest { string >>'message'<< = 1 }
+                            // call.write({ message });
+                        });
+
+                        call.on('end', () => {
+                            console.log('Client stream ended');
+                            // but the stream never ends. THis is not a reliable way to tell if a client is disconnected
+                        });
+
+                        call.on('error', (err) => {
+                            // Error that may occue during the rpc call. Id there's an error, put a callbacn function there to check the connection for client
+                            // emit a yellow report to halt message release. If the server does not reply to the callback function, then emit a red card
+                            // the call back function will be to write and then the client should response immediately through test
+                        });
+
+                        call.on('close', () => {
+                            console.log('Unknown cause for diconnectivity');
+                            // Handle client closure, which may be due to errors or manual termination
+                        });
+
+                    },
+
+                    Check: (_, callback) => {
+                        // health check logic here
+                        // for now it is just sending the status message over to tell the client it is alive
+                        // For simplicity, always return "SERVING" as status
+                        callback(null, { status: 'SERVING' });
+                    },
+                });
+
+                // Bind and start the server
+                server.bindAsync(serverUrl, grpc.ServerCredentials.createInsecure(), () => {
+                    console.log(`gRPC server is running on ${serverUrl}`);
+                    server.start();
+                });
+                grpcServerConnection[serverUrl] = server
+            }
+            catch (error) {
+                resolve(error)
+            }
+        })
+    }
+
+
+    public async createBidirectionalStreamingClient(
+        server: string,
+        alreadyHealthCheck: boolean,
+        messageToBeTransmitted: Subject<any>,
+        statusControl: Subject<ReportStatus>,
+        incomingResponse: Subject<GrpcMessage>
+    ): Promise<string> {
+        let subscription: any
+        let unsubscribed: boolean = false
+
+        return new Promise(async (resolve, reject) => {
+            const client = new message_proto.Message(server, grpc.credentials.createInsecure());
+            const call = client.sendMessageStream();
+
+            this.checkConnectionHealth(client, statusControl, alreadyHealthCheck)
+
+            call.on('status', (status: Status) => { // this is useless in streaming(on for unary)
+                // console.log(status) // For more info: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
+                // https://grpc.io/docs/what-is-grpc/core-concepts/#streaming
+                // if (status == grpc.status.UNAVAILABLE) { // only returns a status when there's error. Otherwise it just waits
+                //     resolve('No connection established. Server is not responding..')
+                // }
+            });
+
+            // All the grpc operations are here
+            // Subscribe to the RxJS subject to send data to the server
+            subscription = messageToBeTransmitted.subscribe({
+                next: (payload: any) => {
+                    if (!unsubscribed) {
+                        console.log(`Sending ${payload.appData.msgId}`)
+                        let message: string = JSON.stringify(payload)
+                        call.write({ message })
+                    }
+                },
+                error: err => console.error(err),
+                complete: () => { } //it will never complete
+            });
+
+
+            call.on('data', (data: any) => {
+                let message = JSON.parse(data.message)
+                console.log(`Received message from Server: ${message.msgId ?? message.appData?.msgId ?? `Invalid`}`);
+            });
+
+            call.on('error', (err) => {
+                // console.log(`Something wrong with RPC call...`)
+                if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks
+                    subscription.unsubscribe();
+                    unsubscribed = true;
+                }
+                resolve('Server Error');
+            });
+
+            call.on('end', () => {
+                if (!unsubscribed && subscription) { // kill subcription to prevent memory leaks
+                    subscription.unsubscribe();
+                    unsubscribed = true;
+                }
+                resolve('Server Error');
+            });
+
+        })
+    }
+
+
+
+    // Check connection To be Update. This function is destroying my code flow
+    public async checkConnectionHealth(client: any, statusControl: Subject<ReportStatus>, alreadyHealthCheck: boolean): Promise<boolean> {
+        return new Promise((resolve, reject) => {
+            client.Check({}, (error, response) => {
+                if (response) {
+                    console.log(`GRPC Health check status: ${response.status} Server Connected`);
+                    let report: ReportStatus = {
+                        code: ColorCode.GREEN,
+                        message: `Good to go!!!`,
+                        from: `GRPC health check`
+                    }
+                    statusControl.next(report)
+                } else {
+                    if (alreadyHealthCheck == false) console.error(`Health check failed: ${error}`);
+                }
+            })
+        })
+    }
+}

+ 2 - 2
test/grpc1.ts

@@ -3,10 +3,11 @@ import { GrpcService } from "../services/grpc.service";
 import * as fs from 'fs'
 import { FisRetransmissionService } from "../services/fis.retransmission.service";
 import { ReportStatus } from "../interfaces/general.interface";
+import { GrpcServiceMethod } from "../services/service.method";
 
 const messagesJSON: any = fs.readFileSync('payload.json')
 const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
-const gprcService: GrpcService = new GrpcService()
+const gprcService: GrpcService = new GrpcService(new GrpcServiceMethod()) 
 let incomingRequest: Subject<any> = gprcService.getIncomingRequest()
 let applicationOutgoingResponse: Subject<any> = new Subject()
 let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial 
@@ -56,7 +57,6 @@ function stream(): Subject<any> {
       result.complete();
     }
   }, 500)
-
   return result
 }
 

+ 36 - 23
test/grpc2.ts

@@ -1,12 +1,13 @@
 import * as fs from 'fs'
 import { Subject, groupBy, mergeMap, toArray } from 'rxjs';
-import { ColorCode, ReportStatus } from '../interfaces/general.interface';
+import { ColorCode, GrpcMessage, MessageLog, ReportStatus } from '../interfaces/general.interface';
 import { GrpcService } from '../services/grpc.service';
 import { FisRetransmissionService } from '../services/fis.retransmission.service';
+import { GrpcServiceMethod } from '../services/service.method';
 
 // Subject for bidirectional communication
 const errorHandlingService: FisRetransmissionService = new FisRetransmissionService()
-const grpcService: GrpcService = new GrpcService()
+const grpcService: GrpcService = new GrpcService(new GrpcServiceMethod())
 const messagesJSON: any = fs.readFileSync('payload.json')
 let incomingResponse: Subject<any> = grpcService.getIncomingResponse()
 let parsedMessages: any[] = JSON.parse(messagesJSON) // load the fake messages generated for this trial 
@@ -14,24 +15,23 @@ let messageToBeReleased: Subject<any> = new Subject() // Sample message to be tr
 let statusControl: Subject<ReportStatus> = new Subject() // Listening for error events and states
 let dataMessages = stream() // Emulate messges to be sent over to target server
 let server1: string = 'localhost:3000'
-let unaryRequestSubject: Subject<any> = new Subject()
+let unaryRequestSubject: Subject<GrpcMessage> = new Subject()
 let array: any[] = []
 
 incomingResponse.subscribe({
   next: request => {
-    console.log(`To be distributed to request:${request.id} => message: ${request.message.appData.msgId}`)
     array.push(request)
-    // Have to create a function that creates observables/subjects corresponding to the request made by the client to stream the responses 
-
-    /* now is one request will have it's own listener. If the client is down all listeners instantiated from all the request are terminated.
-    Server now doesn't care because when it proceses all the request, the response are merged into one channel. My not so clever solution is that
-    when client starts, it wil have to first send a request specifically just to grab the data that was loss, and then let the internal 
-    application do the sorting */
-
-    /* To really see how this work, i will create an array. Since i wil pump in 3 request, the server side is hardcoded atm to stream 10 messagse
-    for one request. I am going to terminate the client halfway through, change the code, so that when i start the client again, it will only send
-    1 request over, there making another request for 10 more. But of course, in the real implementation, this can be a initializer just to see if 
-    there's any buffered messages to be sent over
+    console.log(`To be distributed to request:${request.id} => message: ${(request.message as MessageLog).appData.msgId}`)
+    /* Here's the plan. Since each and every single call that the client make, the server will open a stream specifically for that 
+    client. Now. A new strategy to cater to the revised solution. Everytime a client start up, it will first make a request call
+    spefically to open a stream specifically for response. THe demulitplexing will be delegated outside. The grpc instance should
+    not be bothered with hashydabery that's going on. Server side just need to receive the request, (must terminate the stream 
+    response if it eats up too much memory), and then respond with the stream established earlier when the client first send the 
+    first request call.
+    New dilemma, there will be some changes to be followed as well. Since i want to integrate the termination of the stream and in
+    this case, the stream will terminate right after it sends a acknoledgement saying that the request is being processed. But due 
+    to the reconnection logic integration, the client will treat server.end as a server failure, and will try to execute the 
+    reconnection logic. Should not be hard to change.
      */
   }
 })
@@ -42,20 +42,33 @@ errorHandlingService.handleMessage(unaryRequestSubject, statusControl).subscribe
 })
 grpcService.createGrpcInstance(server1, messageToBeReleased, statusControl, { instanceType: 'client', serviceMethod: 'server streaming' })
 
-messageToBeReleased.next(parsedMessages[0])
 setTimeout(() => {
-  messageToBeReleased.next(parsedMessages[1])
-}, 2000)
+  let request = {
+    id: parsedMessages[1].appData.msgId,
+    message: parsedMessages[1]
+  }
+  unaryRequestSubject.next(request)
+}, 1000)
 setTimeout(() => {
-  messageToBeReleased.next(parsedMessages[2])
+  let request = {
+    id: parsedMessages[2].appData.msgId,
+    message: parsedMessages[2]
+  }
+  unaryRequestSubject.next(request)
 }, 3000)
 setTimeout(() => {
-  messageToBeReleased.next(parsedMessages[3])
+  let request = {
+    id: parsedMessages[3].appData.msgId,
+    message: parsedMessages[3]
+  }
+  unaryRequestSubject.next(request)
 }, 5000)
-
 setTimeout(() => {
-  console.log(`Total messages received: ${array.length}`)
-}, 11000)
+  console.log(`Full amount received: ${array.length}`)
+}, 10000)
+setTimeout(() => {
+  console.log(`Full amount received: ${array.length}`)
+}, 12000)
 
 // this is just to publish an array of fake data as a Subject
 function stream(): Subject<any> {

+ 9 - 34
test/test.ts

@@ -1,38 +1,13 @@
-import { Subject, from } from 'rxjs';
-import { groupBy, mergeMap } from 'rxjs/operators';
+import { Subject } from "rxjs";
 
-// Sample response data
-const responseData = [
-  { who: 'user1', message: 'Hello from user1' },
-  { who: 'user2', message: 'Hi there from user2' },
-  { who: 'user1', message: 'Another message from user1' },
-  { who: 'user3', message: 'Message from user3' },
-];
+let testSubject: Subject<any> = new Subject()
 
-// Create a Subject to receive the response
-const responseSubject = new Subject();
+testSubject.next({test: 'See how long this take'})
 
-// Use groupBy to split the response into observables based on 'who'
-const groupedSubjects: Record<string, Subject<any>> = {};
+setTimeout(() => {
+  testSubject.subscribe(e => console.log(e.test))
+}, 2000)
 
-// Create Observables based on 'who' property
-const groupedObservables = responseData.map((message) => {
-  if (!groupedSubjects[message.who]) {
-    groupedSubjects[message.who] = new Subject();
-  }
-  return groupedSubjects[message.who].asObservable();
-});
-
-// Merge all Observables into a single stream
-from(groupedObservables)
-  .pipe(
-    mergeMap((obs) => obs)
-  )
-  .subscribe((message) => {
-    console.log(`Received message from ${message.who}: ${message.message}`);
-  });
-
-// Push data into the responseSubject
-responseData.forEach((data) => {
-  responseSubject.next(data);
-});
+setTimeout(() => {
+  testSubject.next({test: 'too late'})
+}, 4000)