| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425 | import * as _ from 'lodash'import mongoose, { Model, Schema } from 'mongoose';import { Observable, Subject, Subscription, from } from 'rxjs'import { ColorCode, Message, MessageLog, ReportStatus, Status } from '../interfaces/general.interface'require('dotenv').config();// Implement status chain refactoringexport class FisRetransmissionService {    private mongoUrl: string = process.env.MONGO + 'emergencyStorage'    private bufferedStorage: Message[] = []    private mongoConnection: any    private messageModel: any    private maximumBufferLength: number = parseInt(process.env.MaxBufferLoad as string) // please configure at environment    constructor() {        // Connect to mongoDB.         this.manageMongoConnection()    }    // Main function that intercepts outgoing messages by communicating || intepreting report status from grpc connection as indicator     public handleMessage(applicationOutgoingMessage: Subject<Message>, statusReport: Subject<ReportStatus>): Subject<Message> {        let releaseMessageSubject: Subject<Message> = new Subject() // Every message subscribed from applicationOutgoingMessage will be released through this subject        let messageReleaseSubscription: Subscription | null = null        let messageBufferSubscription: Subscription | null = null        let messageStreamToMongo: Subscription | null = null        this.checkBufferLimit(applicationOutgoingMessage, statusReport)        statusReport.subscribe((report: ReportStatus) => {            /* Green should release all data from buffer and mongo and also redirect the applicationOutgoingMessage back into the return subject(releaseMessageSubject)            if there's any. */            if (report.code == ColorCode.GREEN) {                 console.log(`Connection status report && ${report.message ?? 'No Message'}`)                /* Status Chain begins */                let status: Status = 1                if (status === 1) {                    messageStreamToMongo = this.deactivateMongoStreamSubscription(messageStreamToMongo)                    if (messageStreamToMongo) status = 0                }                if (status === 1) {                    messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)                    if (messageBufferSubscription) status = 0                }                if (status === 1) {                    messageReleaseSubscription = this.activateReleaseSubscription(messageReleaseSubscription, applicationOutgoingMessage, releaseMessageSubject)                    if (!messageReleaseSubscription) status = 0                }                if (status === 1) {                    this.releaseMessageFromLocalBuffer(this.bufferedStorage).then((resObs: Observable<Message>) => {                        resObs.subscribe({                            next: message => releaseMessageSubject.next(message),                            error: err => console.error(err),                            complete: () => {                                this.bufferedStorage = []                                console.log(`Reset buffer Storage count: ${this.bufferedStorage.length}. All messages have been released back into the stream.`)                            }                        })                    }).catch((err) => {                        status = 0                        console.error(err)                    })                }                if (status === 1) {                    this.releaseMessageFromMongoStorage().then((resObs: Subject<Message>) => {                        resObs.subscribe({                            next: message => releaseMessageSubject.next(message),                            error: err => console.error(err),                            complete: () => console.log(`All Mongo data are transferred `)                        })                    }).catch((err) => {                        status = 0                        console.error(err)                    })                }                if (status === 0) {                    console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)                }            }            /* Start buffering the messages coming in from applicationOutgonigMessages and also stop it from flowing into the release subject */            if (report.code == ColorCode.YELLOW) {                if (report.payload) {                    console.log(`Rebuffering ${report.payload.message?.appData?.msgId} into buffer...`)                    this.bufferedStorage.push(report.payload)                }                console.log(`Connection status report && ${report.message ?? 'No Message'}`)                let status: Status = 1                /* Status Chain begins */                if (status === 1) {                    messageBufferSubscription = this.activateBufferSubscription(this.bufferedStorage, messageBufferSubscription, applicationOutgoingMessage)                    if (!messageBufferSubscription) status = 0                }                if (status === 1) {                    messageReleaseSubscription = this.deactivateReleaseSubscription(messageReleaseSubscription)                    if (messageReleaseSubscription) status = 0                }                if (status === 0) {                    console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)                }            }            /* Stop buffering the message in local instance, but start saving them in database. Must first transfer the ones in local buffer before redirecting the             flow from applicationOutgoingMessage into Mongo */            if (report.code == ColorCode.RED) {                console.log(`Connection status report: Server down. ${report.message} lol`)                let status: Status = 1                if (status === 1) {                    messageStreamToMongo = this.activateMongoStreamSubscription(messageStreamToMongo, applicationOutgoingMessage)                    if (!messageStreamToMongo) status = 0                }                if (status === 1) {                    messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)                    if (messageBufferSubscription) status = 0                }                if (status === 1) {                    this.transferBufferedMessageToMongoStorage(this.bufferedStorage, messageBufferSubscription).then((res: any[]) => {                        if (res.length !== this.bufferedStorage.length || this.bufferedStorage.length > 0) status = -1 // this promise function should return an empty array                    })                }                if (status === 0) {                    console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)                }            }            if (!report.code) {                console.log(`Unknown message...`)            }        })        return releaseMessageSubject    }    // IF Buffer exceeds a certain limit, it will trigger RED. Configure in .env file. There's the concern of 2 RED status, one from this and another from other means.    // Behaviour of this needs to be investigated further    private checkBufferLimit(message: Subject<Message>, statusReport: Subject<ReportStatus>) {        let status: Status = 1        if (status = 1) {            message.subscribe(() => {                if (this.bufferedStorage.length >= this.maximumBufferLength) {                    // for every messges that comes in, check the bufffer size, if it exceesd more than designated amount, push a red report status i                    console.log(`Buffer length exceeds limit imposed!!!`)                    let report: ReportStatus = {                        code: ColorCode.RED,                        message: `Buffer is exceeding limit. Initiate storage transfer to designated database.`,                        from: `Error Handling Service`                    }                    statusReport.next(report)                }            })        }    }    // Release the incoming Messages to be returned to the caller    private activateReleaseSubscription(messageReleaseSubscription: Subscription | null, applicationOutgoingMessage: Subject<Message>, releaseMessageSubject: Subject<Message>): Subscription | null {        let status: Status = 1        if (status = 1) {            if (!messageReleaseSubscription) {                messageReleaseSubscription = applicationOutgoingMessage.subscribe({                    next: (message: Message) => {                        console.log(`Releasing ${(message.message as MessageLog).appData.msgId}...`);                        releaseMessageSubject.next(message);                    },                    error: (err) => console.error(err),                    complete: () => { },                });                console.log(`Subscription message release activated.`);            } else {                status = 0                console.log(`Subscription message release is already active.`);            }        }        return messageReleaseSubscription    }    // Stop the incoming Messages to be returned to caller    private deactivateReleaseSubscription(messageReleaseSubscription: Subscription | null): Subscription | null {        let status: Status = 1        if (status = 1) {            if (messageReleaseSubscription) {                messageReleaseSubscription.unsubscribe();                messageReleaseSubscription = null;                console.log(`Subscription message release deactivated.`);            } else {                console.log(`Subscription message release is already deactivated.`);            }        }        return messageReleaseSubscription    }    // Begin to push the incoming messages into local instantarray    private activateBufferSubscription(bufferStorage: Message[], messageBufferSubscription: Subscription | null, applicationOutgoingMessage: Subject<Message>): Subscription | null {        let status: Status = 1        if (status = 1) {            if (!messageBufferSubscription) {                messageBufferSubscription = applicationOutgoingMessage.subscribe({                    next: (message: any) => {                        console.log(`Buffering ${(message.message as MessageLog).appData.msgId}...  Local array length: ${bufferStorage.length}`);                        bufferStorage.push(message)                    },                    error: (err) => console.error(err),                    complete: () => { },                });                console.log(`Subscription message buffer activated.`);            } else {                status = 0                console.log(`Subscription message buffer is already active.`);            }        }        return messageBufferSubscription    }    // Stop pushing the incoming messages into local instantarray    private deactivateBufferSubscription(messageBufferSubscription: Subscription | null): Subscription | null {        let status: Status = 1        if (status) {            if (messageBufferSubscription) {                messageBufferSubscription.unsubscribe();                messageBufferSubscription = null;                console.log(`Subscription message buffer deactivated.`);            } else {                status = 0                console.log(`Subscription message buffer is already deactivated.`);            }        }        return null    }    // Change the streaming direction of the incoming messages into mongo streaming subject( to be saved in local databse )    private activateMongoStreamSubscription(messageStreamToMongo: Subscription | null, applicationOutgoingMessage: Subject<Message>): Subscription | null {        let status: Status = 1        if (status = 1) {            if (!messageStreamToMongo) {                messageStreamToMongo = applicationOutgoingMessage.subscribe({                    next: (message: any) => {                        console.log(`Saving ${(message.message as MessageLog).appData.msgId}...`);                        this.saveToMongo(message)                    },                    error: (err) => console.error(err),                    complete: () => { },                });                console.log(`Subscription message streaming to Mongo activated.`);            } else {                status = 0                console.log(`Subscription message streaming to Mongo  is already active.`);            }        }        return messageStreamToMongo    }    // Stop or cut off the mongo streaming    private deactivateMongoStreamSubscription(messageStreamToMongo: Subscription | null): Subscription | null {        let status: Status = 1        if (status = 1) {            if (messageStreamToMongo) {                messageStreamToMongo.unsubscribe();                messageStreamToMongo = null;                console.log(`Subscription message streaming to Mongo deactivated.`);            } else {                status = 0                console.log(`Subscription message streaming to Mongo is already deactivated.`);            }        }        return messageStreamToMongo    }    // To be used by mongoStreamSubscription to perform the saving execution    private async saveToMongo(message: Message): Promise<boolean> {        return new Promise((resolve, reject) => {            // let messageModel: Model<any> = this.mongoConnection.model('Message', require('../models/message.schema'))            this.messageModel.create(message).then(() => {                console.log(`Saved MessageID ${(message.message as MessageLog).appData.msgId} into ${this.mongoUrl}`);                resolve(true)            }).catch((err) => {                console.log(`MongoSaveError: ${err.message}`)                reject(err)            })        })    }    // As the name implies, transder all the messages from the local instance into mongoStorage. Local instance should be emptied after transfer is completed    private async transferBufferedMessageToMongoStorage(bufferedMessage: Message[], messageBufferSubscription: Subscription | null): Promise<Message[]> {        return new Promise((resolve, reject) => {            let status: Status = 1            if (status = 1) {                let bufferedStorage: Observable<Message> = from(bufferedMessage)                bufferedStorage.subscribe({                    next: (message: any) => {                        this.saveToMongo(message).then((res) => {                            console.log(`Message ${(message.message as MessageLog).appData.msgId} saved successfully...`)                        }).catch((err) => console.error(err))                    },                    error: (error) => {                        reject(error)                        console.error(error)                    },                    complete: () => {                        this.bufferedStorage = []                        if (messageBufferSubscription) {                            console.log(`All ${bufferedMessage.length} buffered messages have been sent for transfer to ${this.mongoUrl}. Current length: ${this.bufferedStorage.length}`)                        }                        resolve(this.bufferedStorage)                    }                })            }        })    }    // Transfer stored messages from the local instance back into the stream to be returned to the caller.    private async releaseMessageFromLocalBuffer(bufferedStorage: Message[]): Promise<Observable<Message>> {        return new Promise((resolve, reject) => {            let status: Status = 1            if (status = 1) {                if (bufferedStorage.length > 1) {                    let caseVariable = this.bufferedStorage.length > 1;                    console.log(`Releasing data from local buffer instance. There ${caseVariable ? "is" : "are"} ${this.bufferedStorage.length} messages...`);                    let returnArrayObs: Observable<Message> = from(bufferedStorage)                    resolve(returnArrayObs)                } else {                    let message = `There is no data in stored in local instance`                    reject(message)                }            }        })    }    // Transder all the stored messages in designated mongo databases. It should be empty after all the data has been transferred.    private async releaseMessageFromMongoStorage(): Promise<Subject<Message>> {        return new Promise((resolve, reject) => {            let status: Status = 1            if (status = 1) {                let dataSubject: Subject<Message> = new Subject()                this.extractAllMessages(dataSubject)                resolve(dataSubject)            }        })    }    // Connect to designated mongodatabase.    private async connectToMongoDatabase(): Promise<any> {        return new Promise((resolve, reject) => {            let status: Status = 1            if (status = 1) {                console.log(this.mongoUrl)                this.mongoConnection = mongoose.createConnection(this.mongoUrl)                this.mongoConnection.on('error', (error) => {                    console.error('Connection error:', error);                    resolve('')                });                this.mongoConnection.once('open', () => {                    console.log(`Connected to ${process.env.MONGO}`);                    this.messageModel = this.mongoConnection.model('Message', require('../models/message.schema'));                });            }        })    }    // Manage mongoCOnnectino. The logic used would be different across differnet application. This will loop the process indefinitely os it is always trying to connect to database.    private async manageMongoConnection(): Promise<boolean> {        while (true) {            try {                await this.connectToMongoDatabase()            } catch (error) {                console.log(`Something Wrong occured. Please check at manageMongoConnection`)            }            await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt        }    }    // This will be used to release all the hostage messages once the light is green.    public async extractAllMessages(subjectArgs: Subject<Message>): Promise<void> {        // Need to resolve the issue of streaming in a specific order that is sequential        let status: Status = 1        if (status = 1) {            if (this.messageModel) {                const eventStream = this.messageModel.find().lean().cursor()                eventStream.on('data', (message) => {                    // Emit each document to the subject                    subjectArgs.next(message);                });                eventStream.on('end', async () => {                    // All data has been streamed, complete the subject                    subjectArgs.complete();                    // Delete the data once it has been streamed                    try {                        await this.messageModel.deleteMany({});                        console.log('Data in Mongo deleted successfully.');                    } catch (err) {                        console.error('Error deleting data:', err);                    }                });            } else {                status = 0                console.log(`Error: Message Model is ${this.messageModel}!! Please set up the mongoose connectino properly!`)            }        }    }}// Store in json file in this project folder. To be enabled in future// private async transferMessageToLocalStorage(message: Subject<any>): Promise<void> {//     let localArray: any[] = this.bufferedStorage//     let filename = `localstorage.json`;//     while (localArray.length > 0) {//         let objectToWrite = this.bufferedStorage[0];//         await writeMessage(objectToWrite, filename)//     }//     message.subscribe((message: any) => {//         writeMessage(message, filename)//     })//     if (localArray.length < 1) this.bufferedStorage = localArray//     console.log('Local Array is empty. Finished transferring to files.')//     async function writeMessage(message: any, filename: string) {//         try {//             let stringifiedMessage = JSON.stringify(message);//             await fs.promises.appendFile(filename, stringifiedMessage + "\r\n")//             console.log(`Successfully transferred ${filename}`);//             localArray.shift();//         } catch (err) {//             console.error(`Error trasferring ${filename}:`, err);//         }//     }// }
 |