| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423 | 
							- import * as _ from 'lodash'
 
- import * as fs from 'fs'
 
- import mongoose, { Model, Schema } from 'mongoose';
 
- import { Observable, Subject, Subscription, from } from 'rxjs'
 
- import { ColorCode, GrpcMessage, MessageLog, ReportStatus, Status } from '../interfaces/general.interface'
 
- require('dotenv').config();
 
- // Implement status chain refactoring
 
- export class FisRetransmissionService {
 
-     private mongoUrl: string = process.env.MONGO + 'emergencyStorage'
 
-     private bufferedStorage: any[] = []
 
-     private mongoConnection: any
 
-     private messageModel: any
 
-     private maximumBufferLength: number = parseInt(process.env.MaxBufferLoad as string) // right now just put as 15
 
-     constructor() {
 
-         // Connect to mongoDB. 
 
-         this.manageMongoConnection()
 
-     }
 
-     // Main function that intercepts outgoing messages by communicating || intepreting report status from grpc connection as indicator 
 
-     public handleMessage(messageToBePublished: Subject<GrpcMessage>, statusReport: Subject<ReportStatus>): Subject<GrpcMessage> {
 
-         let releaseMessageSubject: Subject<GrpcMessage> = new Subject() // A return value
 
-         // Using the concept of toggling to improve the eficacy of subscription control && data flow
 
-         let messageReleaseSubscription: Subscription | null = null
 
-         let messageBufferSubscription: Subscription | null = null
 
-         let messageStreamToMongo: Subscription | null = null
 
-         this.checkBufferLimit(messageToBePublished, statusReport)
 
-         statusReport.subscribe((report: ReportStatus) => {
 
-             if (report.code == ColorCode.GREEN) {
 
-                 console.log(`Connection status report && ${report.message ?? 'No Message'}`)
 
-                 /* Status Chain begins */
 
-                 let status: Status = 1
 
-                 if (status === 1) {
 
-                     messageStreamToMongo = this.deactivateMongoStreamSubscription(messageStreamToMongo)
 
-                     if (messageStreamToMongo) status = 0
 
-                 }
 
-                 if (status === 1) {
 
-                     messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)
 
-                     if (messageBufferSubscription) status = 0
 
-                 }
 
-                 if (status === 1) {
 
-                     messageReleaseSubscription = this.activateReleaseSubscription(messageReleaseSubscription, messageToBePublished, releaseMessageSubject)
 
-                     if (!messageReleaseSubscription) status = 0
 
-                 }
 
-                 if (status === 1) {
 
-                     this.releaseMessageFromLocalBuffer(this.bufferedStorage).then((resObs: Observable<GrpcMessage>) => {
 
-                         resObs.subscribe({
 
-                             next: message => releaseMessageSubject.next(message),
 
-                             error: err => console.error(err),
 
-                             complete: () => {
 
-                                 this.bufferedStorage = []
 
-                                 console.log(`Reset buffer Storage count: ${this.bufferedStorage.length}. All messages have been released back into the stream.`)
 
-                             }
 
-                         })
 
-                     }).catch((err) => {
 
-                         status = 0
 
-                         console.error(err)
 
-                     })
 
-                 }
 
-                 if (status === 1) {
 
-                     this.releaseMessageFromMongoStorage().then((resObs: Subject<GrpcMessage>) => {
 
-                         resObs.subscribe({
 
-                             next: message => releaseMessageSubject.next(message),
 
-                             error: err => console.error(err),
 
-                             complete: () => console.log(`All Mongo data are transferred `)
 
-                         })
 
-                     }).catch((err) => {
 
-                         status = 0
 
-                         console.error(err)
 
-                     })
 
-                 }
 
-                 if (status === 0) {
 
-                     console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
 
-                 }
 
-             }
 
-             if (report.code == ColorCode.YELLOW) {
 
-                 if (report.payload) {
 
-                     console.log(`Rebuffering ${report.payload.message?.appData?.msgId} into buffer...`)
 
-                     this.bufferedStorage.push(report.payload)
 
-                 }
 
-                 console.log(`Connection status report && ${report.message ?? 'No Message'}`)
 
-                 let status: Status = 1
 
-                 /* Status Chain begins */
 
-                 if (status === 1) {
 
-                     messageBufferSubscription = this.activateBufferSubscription(this.bufferedStorage, messageBufferSubscription, messageToBePublished)
 
-                     if (!messageBufferSubscription) status = 0
 
-                 }
 
-                 if (status === 1) {
 
-                     messageReleaseSubscription = this.deactivateReleaseSubscription(messageReleaseSubscription)
 
-                     if (messageReleaseSubscription) status = 0
 
-                 }
 
-                 if (status === 0) {
 
-                     console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
 
-                 }
 
-             }
 
-             if (report.code == ColorCode.RED) {
 
-                 console.log(`Connection status report: Server down. ${report.message} lol`)
 
-                 let status: Status = 1
 
-                 if (status === 1) {
 
-                     messageStreamToMongo = this.activateMongoStreamSubscription(messageStreamToMongo, messageToBePublished)
 
-                     if (!messageStreamToMongo) status = 0
 
-                 }
 
-                 if (status === 1) {
 
-                     messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)
 
-                     if (messageBufferSubscription) status = 0
 
-                 }
 
-                 if (status === 1) {
 
-                     this.transferBufferedMessageToMongoStorage(this.bufferedStorage, messageBufferSubscription).then((res: any[]) => {
 
-                         if (res.length !== this.bufferedStorage.length || this.bufferedStorage.length > 0) status = -1 // this promise function should return an empty array
 
-                     })
 
-                 }
 
-                 if (status === 0) {
 
-                     console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
 
-                 }
 
-             }
 
-             if (!report.code) {
 
-                 console.log(`Unknown message...`)
 
-             }
 
-         })
 
-         return releaseMessageSubject
 
-     }
 
-     private checkBufferLimit(message: Subject<GrpcMessage>, statusReport: Subject<ReportStatus>) {
 
-         let status: Status = 1
 
-         if (status = 1) {
 
-             message.subscribe(() => {
 
-                 if (this.bufferedStorage.length >= this.maximumBufferLength) {
 
-                     // for every messges that comes in, check the bufffer size, if it exceesd more than designated amount, push a red report status i
 
-                     console.log(`Buffer length exceeds limit imposed!!!`)
 
-                     let report: ReportStatus = {
 
-                         code: ColorCode.RED,
 
-                         message: `Buffer is exceeding limit. Initiate storage transfer to designated database.`,
 
-                         from: `Error Handling Service`
 
-                     }
 
-                     statusReport.next(report)
 
-                 }
 
-             })
 
-         }
 
-     }
 
-     // Release the incoming Messages to be returned to the caller
 
-     private activateReleaseSubscription(messageReleaseSubscription: Subscription | null, messageToBePublished: Subject<GrpcMessage>, releaseMessageSubject: Subject<GrpcMessage>): Subscription | null {
 
-         let status: Status = 1
 
-         if (status = 1) {
 
-             if (!messageReleaseSubscription) {
 
-                 messageReleaseSubscription = messageToBePublished.subscribe({
 
-                     next: (message: GrpcMessage) => {
 
-                         console.log(`Releasing ${(message.message as MessageLog).appData.msgId}...`);
 
-                         releaseMessageSubject.next(message);
 
-                     },
 
-                     error: (err) => console.error(err),
 
-                     complete: () => { },
 
-                 });
 
-                 console.log(`Subscription message release activated.`);
 
-             } else {
 
-                 status = 0
 
-                 console.log(`Subscription message release is already active.`);
 
-             }
 
-         }
 
-         return messageReleaseSubscription
 
-     }
 
-     // Stop the incoming Messaes to be returned to caller
 
-     private deactivateReleaseSubscription(messageReleaseSubscription: Subscription | null): Subscription | null {
 
-         let status: Status = 1
 
-         if (status = 1) {
 
-             if (messageReleaseSubscription) {
 
-                 messageReleaseSubscription.unsubscribe();
 
-                 messageReleaseSubscription = null;
 
-                 console.log(`Subscription message release deactivated.`);
 
-             } else {
 
-                 console.log(`Subscription message release is already deactivated.`);
 
-             }
 
-         }
 
-         return messageReleaseSubscription
 
-     }
 
-     // Begin to push the incoming messages into local instantarray
 
-     private activateBufferSubscription(bufferStorage: GrpcMessage[], messageBufferSubscription: Subscription | null, messageToBePublished: Subject<GrpcMessage>): Subscription | null {
 
-         let status: Status = 1
 
-         if (status = 1) {
 
-             if (!messageBufferSubscription) {
 
-                 messageBufferSubscription = messageToBePublished.subscribe({
 
-                     next: (message: any) => {
 
-                         console.log(`Buffering ${(message.message as MessageLog).appData.msgId}...  Local array length: ${bufferStorage.length}`);
 
-                         bufferStorage.push(message)
 
-                     },
 
-                     error: (err) => console.error(err),
 
-                     complete: () => { },
 
-                 });
 
-                 console.log(`Subscription message buffer activated.`);
 
-             } else {
 
-                 status = 0
 
-                 console.log(`Subscription message buffer is already active.`);
 
-             }
 
-         }
 
-         return messageBufferSubscription
 
-     }
 
-     // Stop pushing the incoming messages into local instantarray
 
-     private deactivateBufferSubscription(messageBufferSubscription: Subscription | null): Subscription | null {
 
-         let status: Status = 1
 
-         if (status) {
 
-             if (messageBufferSubscription) {
 
-                 messageBufferSubscription.unsubscribe();
 
-                 messageBufferSubscription = null;
 
-                 console.log(`Subscription message buffer deactivated.`);
 
-             } else {
 
-                 status = 0
 
-                 console.log(`Subscription message buffer is already deactivated.`);
 
-             }
 
-         }
 
-         return null
 
-     }
 
-     // Change the streaming direction of the incoming messages into mongo streaming subject( to be saved in local databse )
 
-     private activateMongoStreamSubscription(messageStreamToMongo: Subscription | null, messageToBePublished: Subject<GrpcMessage>): Subscription | null {
 
-         let status: Status = 1
 
-         if (status = 1) {
 
-             if (!messageStreamToMongo) {
 
-                 messageStreamToMongo = messageToBePublished.subscribe({
 
-                     next: (message: any) => {
 
-                         console.log(`Saving ${(message.message as MessageLog).appData.msgId}...`);
 
-                         this.saveToMongo(message)
 
-                     },
 
-                     error: (err) => console.error(err),
 
-                     complete: () => { },
 
-                 });
 
-                 console.log(`Subscription message streaming to Mongo activated.`);
 
-             } else {
 
-                 status = 0
 
-                 console.log(`Subscription message streaming to Mongo  is already active.`);
 
-             }
 
-         }
 
-         return messageStreamToMongo
 
-     }
 
-     // Stop or cut off the mongo streaming
 
-     private deactivateMongoStreamSubscription(messageStreamToMongo: Subscription | null): Subscription | null {
 
-         let status: Status = 1
 
-         if (status = 1) {
 
-             if (messageStreamToMongo) {
 
-                 messageStreamToMongo.unsubscribe();
 
-                 messageStreamToMongo = null;
 
-                 console.log(`Subscription message streaming to Mongo deactivated.`);
 
-             } else {
 
-                 status = 0
 
-                 console.log(`Subscription message streaming to Mongo is already deactivated.`);
 
-             }
 
-         }
 
-         return messageStreamToMongo
 
-     }
 
-     // To be used by mongoStreamSubscription to perform the saving execution
 
-     private async saveToMongo(message: GrpcMessage): Promise<boolean> {
 
-         return new Promise((resolve, reject) => {
 
-             // let messageModel: Model<any> = this.mongoConnection.model('Message', require('../models/message.schema'))
 
-             this.messageModel.create(message).then(() => {
 
-                 console.log(`Saved MessageID ${(message.message as MessageLog).appData.msgId} into ${this.mongoUrl}`);
 
-                 resolve(true)
 
-             }).catch((err) => {
 
-                 console.log(`MongoSaveError: ${err.message}`)
 
-                 reject(err)
 
-             })
 
-         })
 
-     }
 
-     // As the name implies, transder all the messages from the local instance into mongoStorage. Local instance should be emptied after transfer is completed
 
-     private async transferBufferedMessageToMongoStorage(bufferedMessage: GrpcMessage[], messageBufferSubscription: Subscription | null): Promise<GrpcMessage[]> {
 
-         return new Promise((resolve, reject) => {
 
-             let status: Status = 1
 
-             if (status = 1) {
 
-                 let bufferedStorage: Observable<GrpcMessage> = from(bufferedMessage)
 
-                 bufferedStorage.subscribe({
 
-                     next: (message: any) => {
 
-                         this.saveToMongo(message).then((res) => {
 
-                             console.log(`Message ${(message.message as MessageLog).appData.msgId} saved successfully...`)
 
-                         }).catch((err) => console.error(err))
 
-                     },
 
-                     error: (error) => {
 
-                         reject(error)
 
-                         console.error(error)
 
-                     },
 
-                     complete: () => {
 
-                         this.bufferedStorage = []
 
-                         if (messageBufferSubscription) {
 
-                             console.log(`All ${bufferedMessage.length} buffered messages have been sent for transfer to ${this.mongoUrl}. Current length: ${this.bufferedStorage.length}`)
 
-                         }
 
-                         resolve(this.bufferedStorage)
 
-                     }
 
-                 })
 
-             }
 
-         })
 
-     }
 
-     // Transfer stored messages from the local instance back into the stream to be returned to the caller.
 
-     private async releaseMessageFromLocalBuffer(bufferedStorage: GrpcMessage[]): Promise<Observable<GrpcMessage>> {
 
-         return new Promise((resolve, reject) => {
 
-             let status: Status = 1
 
-             if (status = 1) {
 
-                 if (bufferedStorage.length > 1) {
 
-                     let caseVariable = this.bufferedStorage.length > 1;
 
-                     console.log(`Releasing data from local buffer instance. There ${caseVariable ? "is" : "are"} ${this.bufferedStorage.length} messages...`);
 
-                     let returnArrayObs: Observable<GrpcMessage> = from(bufferedStorage)
 
-                     resolve(returnArrayObs)
 
-                 } else {
 
-                     let message = `There is no data in stored in local instance`
 
-                     reject(message)
 
-                 }
 
-             }
 
-         })
 
-     }
 
-     // Transder all the stored messages in designated mongo databases. It should be empty after all the data has been transferred.
 
-     private async releaseMessageFromMongoStorage(): Promise<Subject<GrpcMessage>> {
 
-         return new Promise((resolve, reject) => {
 
-             let status: Status = 1
 
-             if (status = 1) {
 
-                 let dataSubject: Subject<GrpcMessage> = new Subject()
 
-                 this.extractAllMessages(dataSubject)
 
-                 resolve(dataSubject)
 
-             }
 
-         })
 
-     }
 
-     // Connect to designated mongodatabase.
 
-     private async connectToMongoDatabase(): Promise<any> {
 
-         return new Promise((resolve, reject) => {
 
-             let status: Status = 1
 
-             if (status = 1) {
 
-                 console.log(this.mongoUrl)
 
-                 this.mongoConnection = mongoose.createConnection(this.mongoUrl)
 
-                 this.mongoConnection.on('error', (error) => {
 
-                     console.error('Connection error:', error);
 
-                     resolve('')
 
-                 });
 
-                 this.mongoConnection.once('open', () => {
 
-                     console.log(`Connected to ${process.env.MONGO}`);
 
-                     this.messageModel = this.mongoConnection.model('Message', require('../models/message.schema'));
 
-                 });
 
-             }
 
-         })
 
-     }
 
-     // Manage mongoCOnnectino. The logic used would be different across differnet application. This will loop the process indefinitely os it is always trying to connect to database.
 
-     private async manageMongoConnection(): Promise<boolean> {
 
-         while (true) {
 
-             try {
 
-                 await this.connectToMongoDatabase()
 
-             } catch (error) {
 
-                 console.log(`Something Wrong occured. Please check at manageMongoConnection`)
 
-             }
 
-             await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt
 
-         }
 
-     }
 
-     public async extractAllMessages(subjectArgs: Subject<GrpcMessage>): Promise<void> {
 
-         // Need to resolve the issue of streaming in a specific order that is sequential
 
-         let status: Status = 1
 
-         if (status = 1) {
 
-             if (this.messageModel) {
 
-                 const eventStream = this.messageModel.find().lean().cursor()
 
-                 eventStream.on('data', (message) => {
 
-                     // Emit each document to the subject
 
-                     subjectArgs.next(message);
 
-                 });
 
-                 eventStream.on('end', async () => {
 
-                     // All data has been streamed, complete the subject
 
-                     subjectArgs.complete();
 
-                     // Delete the data once it has been streamed
 
-                     try {
 
-                         await this.messageModel.deleteMany({});
 
-                         console.log('Data in Mongo deleted successfully.');
 
-                     } catch (err) {
 
-                         console.error('Error deleting data:', err);
 
-                     }
 
-                 });
 
-             } else {
 
-                 status = 0
 
-                 console.log(`Error: Message Model is ${this.messageModel}!! Please set up the mongoose connectino properly!`)
 
-             }
 
-         }
 
-     }
 
- }
 
- // Store in json file in this project folder. To be enabled in future
 
- // private async transferMessageToLocalStorage(message: Subject<any>): Promise<void> {
 
- //     let localArray: any[] = this.bufferedStorage
 
- //     let filename = `localstorage.json`;
 
- //     while (localArray.length > 0) {
 
- //         let objectToWrite = this.bufferedStorage[0];
 
- //         await writeMessage(objectToWrite, filename)
 
- //     }
 
- //     message.subscribe((message: any) => {
 
- //         writeMessage(message, filename)
 
- //     })
 
- //     if (localArray.length < 1) this.bufferedStorage = localArray
 
- //     console.log('Local Array is empty. Finished transferring to files.')
 
- //     async function writeMessage(message: any, filename: string) {
 
- //         try {
 
- //             let stringifiedMessage = JSON.stringify(message);
 
- //             await fs.promises.appendFile(filename, stringifiedMessage + "\r\n")
 
- //             console.log(`Successfully transferred ${filename}`);
 
- //             localArray.shift();
 
- //         } catch (err) {
 
- //             console.error(`Error trasferring ${filename}:`, err);
 
- //         }
 
- //     }
 
- // }
 
 
  |