import mongoose, { Model, Schema } from 'mongoose'; import { BehaviorSubject, Observable, Subject, Subscription, from } from 'rxjs' import { ColorCode, Message, MessageLog, ReportStatus, Status } from '../interfaces/general.interface' require('dotenv').config(); // Implement status chain refactoring export class FisRetransmissionService { private mongoUrl: string = process.env.MONGO as string private bufferedStorage: Message[] = [] private mongoConnection: any private messageModel: Model | null | undefined private maximumBufferLength: number = parseInt(process.env.MaxBufferLoad as string) // please configure at environment constructor(private databaseName: string, private statusReport: BehaviorSubject) { // Connect to mongoDB. this.manageMongoConnection(databaseName) } // Main function that intercepts outgoing messages by communicating || intepreting report status from grpc connection as indicator public handleMessage(applicationOutgoingMessage: Subject): Subject { let releaseMessageSubject: Subject = new Subject() // Every message subscribed from applicationOutgoingMessage will be released through this subject let messageReleaseSubscription: Subscription | null = null let messageBufferSubscription: Subscription | null = null let messageStreamToMongo: Subscription | null = null this.checkBufferLimit(applicationOutgoingMessage, this.statusReport) this.statusReport.subscribe((report: ReportStatus) => { /* Green should release all data from buffer and mongo and also redirect the applicationOutgoingMessage back into the return subject(releaseMessageSubject) if there's any. */ if (report.code == ColorCode.GREEN) { // console.log(`Connection status report && ${report.message ?? 'No Message'}`) /* Status Chain begins */ let status: Status = 1 if (status === 1) { messageStreamToMongo = this.deactivateMongoStreamSubscription(messageStreamToMongo) if (messageStreamToMongo) status = 0 } if (status === 1) { messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription) if (messageBufferSubscription) status = 0 } if (status === 1) { messageReleaseSubscription = this.activateReleaseSubscription(messageReleaseSubscription, applicationOutgoingMessage, releaseMessageSubject) if (!messageReleaseSubscription) status = 0 } if (status === 1) { this.releaseMessageFromLocalBuffer(this.bufferedStorage).then((resObs: Observable) => { resObs.subscribe({ next: message => releaseMessageSubject.next(message), error: err => console.error(err), complete: () => { this.bufferedStorage = [] console.log(`Reset buffer Storage count: ${this.bufferedStorage.length}. All messages have been released back into the stream.`) } }) }).catch((err) => { status = 0 console.error(err) }) } if (status === 1) { this.releaseMessageFromMongoStorage().then((resObs: Subject) => { resObs.subscribe({ next: message => releaseMessageSubject.next(message), error: err => console.error(err), complete: () => console.log(`All Mongo data are transferred `) }) }).catch((err) => { status = 0 console.error(err) }) } if (status === 0) { console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`) } } /* Start buffering the messages coming in from applicationOutgonigMessages and also stop it from flowing into the release subject */ if (report.code == ColorCode.YELLOW) { if (report.payload) { console.log(`Rebuffering ${report.payload.message?.appData?.msgId} into buffer...`) this.bufferedStorage.push(report.payload) } // console.log(`Connection status report && ${report.message ?? 'No Message'}`) let status: Status = 1 /* Status Chain begins */ if (status === 1) { messageBufferSubscription = this.activateBufferSubscription(this.bufferedStorage, messageBufferSubscription, applicationOutgoingMessage) if (!messageBufferSubscription) status = 0 } if (status === 1) { messageReleaseSubscription = this.deactivateReleaseSubscription(messageReleaseSubscription) if (messageReleaseSubscription) status = 0 } if (status === 0) { console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`) } } /* Stop buffering the message in local instance, but start saving them in database. Must first transfer the ones in local buffer before redirecting the flow from applicationOutgoingMessage into Mongo */ if (report.code == ColorCode.RED) { // console.log(`Connection status report: ${report.message}`) if (report.payload) { console.log(`Rebuffering ${report.payload.message?.appData?.msgId} into storage...`) this.saveToMongo(report.payload) } console.log(`Connection status report && ${report.message ?? 'No Message'}`) let status: Status = 1 if (status === 1) { messageStreamToMongo = this.activateMongoStreamSubscription(messageStreamToMongo, applicationOutgoingMessage) if (!messageStreamToMongo) status = 0 } if (status === 1) { messageReleaseSubscription = this.deactivateReleaseSubscription(messageReleaseSubscription) if (messageReleaseSubscription) status = 0 } if (status === 1) { messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription) if (messageBufferSubscription) status = 0 } if (status === 1) { this.transferBufferedMessageToMongoStorage(this.bufferedStorage, messageBufferSubscription).then((res: any[]) => { if (res.length !== this.bufferedStorage.length || this.bufferedStorage.length > 0) status = -1 // this promise function should return an empty array }) } if (status === 0) { console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`) } } if (!report.code) { console.log(`Unknown message...`) } }) return releaseMessageSubject } // IF Buffer exceeds a certain limit, it will trigger RED. Configure in .env file. There's the concern of 2 RED status, one from this and another from other means. // Behaviour of this needs to be investigated further private checkBufferLimit(message: Subject, statusReport: Subject) { let status: Status = 1 if (status = 1) { message.subscribe(() => { if (this.bufferedStorage.length >= this.maximumBufferLength) { // for every messges that comes in, check the bufffer size, if it exceesd more than designated amount, push a red report status i console.log(`Buffer length exceeds limit imposed!!!`) let report: ReportStatus = { code: ColorCode.RED, message: `Buffer is exceeding limit. Initiate storage transfer to designated database.`, } statusReport.next(report) } }) } } // Release the incoming Messages to be returned to the caller private activateReleaseSubscription(messageReleaseSubscription: Subscription | null, applicationOutgoingMessage: Subject, releaseMessageSubject: Subject): Subscription | null { let status: Status = 1 if (status = 1) { if (!messageReleaseSubscription) { messageReleaseSubscription = applicationOutgoingMessage.subscribe({ next: (message: Message) => { console.log(`Releasing ${(message.message as MessageLog).appData.msgId}...`); releaseMessageSubject.next(message); }, error: (err) => console.error(err), complete: () => { }, }); console.log(`Subscription message release activated.`); } else { status = 0 console.log(`Subscription message release is already active.`); } } return messageReleaseSubscription } // Stop the incoming Messages to be returned to caller private deactivateReleaseSubscription(messageReleaseSubscription: Subscription | null): Subscription | null { let status: Status = 1 if (status = 1) { if (messageReleaseSubscription) { messageReleaseSubscription.unsubscribe(); messageReleaseSubscription = null; console.log(`Subscription message release deactivated.`); } else { console.log(`Subscription message release is already deactivated.`); } } return messageReleaseSubscription } // Begin to push the incoming messages into local instantarray private activateBufferSubscription(bufferStorage: Message[], messageBufferSubscription: Subscription | null, applicationOutgoingMessage: Subject): Subscription | null { let status: Status = 1 if (status = 1) { if (!messageBufferSubscription) { messageBufferSubscription = applicationOutgoingMessage.subscribe({ next: (message: any) => { console.log(`Buffering ${(message.message as MessageLog).appData.msgId}... Local array length: ${bufferStorage.length}`); bufferStorage.push(message) }, error: (err) => console.error(err), complete: () => { }, }); console.log(`Subscription message buffer activated.`); } else { status = 0 console.log(`Subscription message buffer is already active.`); } } return messageBufferSubscription } // Stop pushing the incoming messages into local instantarray private deactivateBufferSubscription(messageBufferSubscription: Subscription | null): Subscription | null { let status: Status = 1 if (status) { if (messageBufferSubscription) { messageBufferSubscription.unsubscribe(); messageBufferSubscription = null; console.log(`Subscription message buffer deactivated.`); } else { status = 0 console.log(`Subscription message buffer is already deactivated.`); } } return null } // Change the streaming direction of the incoming messages into mongo streaming subject( to be saved in local databse ) private activateMongoStreamSubscription(messageStreamToMongo: Subscription | null, applicationOutgoingMessage: Subject): Subscription | null { let status: Status = 1 if (status = 1) { if (!messageStreamToMongo) { messageStreamToMongo = applicationOutgoingMessage.subscribe({ next: (message: any) => { console.log(`Saving ${(message.message as MessageLog).appData.msgId}...`); this.saveToMongo(message) }, error: (err) => console.error(err), complete: () => { }, }); console.log(`Subscription message streaming to Mongo activated.`); } else { status = 0 console.log(`Subscription message streaming to Mongo is already active.`); } } return messageStreamToMongo } // Stop or cut off the mongo streaming private deactivateMongoStreamSubscription(messageStreamToMongo: Subscription | null): Subscription | null { let status: Status = 1 if (status = 1) { if (messageStreamToMongo) { messageStreamToMongo.unsubscribe(); messageStreamToMongo = null; console.log(`Subscription message streaming to Mongo deactivated.`); } else { status = 0 console.log(`Subscription message streaming to Mongo is already deactivated.`); } } return messageStreamToMongo } // To be used by mongoStreamSubscription to perform the saving execution private async saveToMongo(message: Message): Promise { return new Promise((resolve, reject) => { // let messageModel: Model = this.mongoConnection.model('Message', require('../models/message.schema')) if (this.messageModel) { this.messageModel.create(message).then(() => { console.log(`Saved MessageID ${(message.message as MessageLog).appData.msgId} into ${this.mongoUrl}`); resolve(true) }).catch((err) => { console.log(`MongoSaveError: ${err.message}`) reject(err) }) } else { console.log(`Cant save message. Message Model is absent or not properly initialized`) } }) } // As the name implies, transder all the messages from the local instance into mongoStorage. Local instance should be emptied after transfer is completed private async transferBufferedMessageToMongoStorage(bufferedMessage: Message[], messageBufferSubscription: Subscription | null): Promise { return new Promise((resolve, reject) => { let status: Status = 1 if (status = 1) { let bufferedStorage: Observable = from(bufferedMessage) bufferedStorage.subscribe({ next: (message: any) => { this.saveToMongo(message).then((res) => { console.log(`Message ${(message.message as MessageLog).appData.msgId} saved successfully...`) }).catch((err) => console.error(err)) }, error: (error) => { reject(error) console.error(error) }, complete: () => { this.bufferedStorage = [] if (messageBufferSubscription) { console.log(`All ${bufferedMessage.length} buffered messages have been sent for transfer to ${this.mongoUrl}. Current length: ${this.bufferedStorage.length}`) } resolve(this.bufferedStorage) } }) } }) } // Transfer stored messages from the local instance back into the stream to be returned to the caller. private async releaseMessageFromLocalBuffer(bufferedStorage: Message[]): Promise> { return new Promise((resolve, reject) => { let status: Status = 1 if (status = 1) { if (bufferedStorage.length > 1) { let caseVariable = this.bufferedStorage.length > 1; console.log(`Releasing data from local buffer instance. There ${caseVariable ? "is" : "are"} ${this.bufferedStorage.length} messages...`); let returnArrayObs: Observable = from(bufferedStorage) resolve(returnArrayObs) } else { let message = `There is no data in stored in local instance` reject(message) } } }) } // Transder all the stored messages in designated mongo databases. It should be empty after all the data has been transferred. private async releaseMessageFromMongoStorage(): Promise> { return new Promise((resolve, reject) => { let status: Status = 1 if (status = 1) { let dataSubject: Subject = new Subject() this.extractAllMessages(dataSubject) resolve(dataSubject) } }) } // Connect to designated mongodatabase. private async connectToMongoDatabase(databaseName: string): Promise { return new Promise((resolve, reject) => { let status: Status = 1 if (status = 1) { let database = this.mongoUrl + databaseName console.log(`Connected to ${database}`) this.mongoConnection = mongoose.createConnection(database) this.mongoConnection.on('error', (error) => { console.error('Connection error:', error); resolve('') }); this.mongoConnection.once('open', () => { // console.log(`Connected to ${process.env.MONGO}`); let report: ReportStatus = { code: ColorCode.RED, message: `Mongo storage available` } this.messageModel = this.mongoConnection.model('Message', require('../models/message.schema')); this.statusReport.next(report) }); } }) } // Manage mongoCOnnectino. The logic used would be different across differnet application. This will loop the process indefinitely os it is always trying to connect to database. private async manageMongoConnection(databaseName: string): Promise { while (true) { try { await this.connectToMongoDatabase(databaseName) } catch (error) { console.log(`Something Wrong occured. Please check at manageMongoConnection`) } await new Promise(resolve => setTimeout(resolve, 1000)); // Wait for 1 second before the next attempt } } // This will be used to release all the hostage messages once the light is green. public async extractAllMessages(subjectArgs: Subject): Promise { // Need to resolve the issue of streaming in a specific order that is sequential let status: Status = 1 if (status = 1) { if (this.messageModel) { const eventStream = this.messageModel.find().lean().cursor() eventStream.on('data', (message) => { // Emit each document to the subject subjectArgs.next(message); }); eventStream.on('end', async () => { // All data has been streamed, complete the subject subjectArgs.complete(); // Delete the data once it has been streamed try { if (this.messageModel) { await this.messageModel.deleteMany({}); console.log('Data in Mongo deleted successfully.'); } else { console.log(`Message Mongoose Model is not intiated properly...`) } } catch (err) { console.error('Error deleting data:', err); } }); } else { status = 0 console.log(`Error: Message Model is ${this.messageModel}!! Please set up the mongoose connection properly!`) } } } } // Store in json file in this project folder. To be enabled in future // private async transferMessageToLocalStorage(message: Subject): Promise { // let localArray: any[] = this.bufferedStorage // let filename = `localstorage.json`; // while (localArray.length > 0) { // let objectToWrite = this.bufferedStorage[0]; // await writeMessage(objectToWrite, filename) // } // message.subscribe((message: any) => { // writeMessage(message, filename) // }) // if (localArray.length < 1) this.bufferedStorage = localArray // console.log('Local Array is empty. Finished transferring to files.') // async function writeMessage(message: any, filename: string) { // try { // let stringifiedMessage = JSON.stringify(message); // await fs.promises.appendFile(filename, stringifiedMessage + "\r\n") // console.log(`Successfully transferred ${filename}`); // localArray.shift(); // } catch (err) { // console.error(`Error trasferring ${filename}:`, err); // } // } // }