|
@@ -1,461 +0,0 @@
|
|
|
-import mongoose, { Model, Schema } from 'mongoose';
|
|
|
-import { BehaviorSubject, Observable, Subject, Subscription, from } from 'rxjs'
|
|
|
-import { ColorCode, Message, MessageLog, ReportStatus, Status } from '../interfaces/general.interface'
|
|
|
-import { resolve } from 'path';
|
|
|
-require('dotenv').config();
|
|
|
-
|
|
|
-
|
|
|
-export class FisRetransmissionService {
|
|
|
- private mongoUrl: string = process.env.MONGO as string
|
|
|
- private bufferedStorage: Message[] = []
|
|
|
- private mongoConnection: any
|
|
|
- private messageModel: Model<any> | null | undefined
|
|
|
- private maximumBufferLength: number = parseInt(process.env.MaxBufferLoad as string)
|
|
|
-
|
|
|
- constructor(private databaseName: string, private statusReport: BehaviorSubject<ReportStatus>) {
|
|
|
-
|
|
|
- this.manageMongoConnection(databaseName)
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- public handleMessage(applicationOutgoingMessage: Subject<Message>): Subject<Message> {
|
|
|
- let releaseMessageSubject: Subject<Message> = new Subject()
|
|
|
- let messageReleaseSubscription: Subscription | null = null
|
|
|
- let messageBufferSubscription: Subscription | null = null
|
|
|
- let messageStreamToMongo: Subscription | null = null
|
|
|
- this.checkBufferLimit(applicationOutgoingMessage, this.statusReport)
|
|
|
- this.statusReport.subscribe((report: ReportStatus) => {
|
|
|
-
|
|
|
- if there's any. */
|
|
|
- if (report.code == ColorCode.GREEN) {
|
|
|
- let status: Status = 1
|
|
|
-
|
|
|
-
|
|
|
- if (status === 1) {
|
|
|
- messageStreamToMongo = this.deactivateMongoStreamSubscription(messageStreamToMongo)
|
|
|
- if (messageStreamToMongo) status = 0
|
|
|
- }
|
|
|
- if (status === 1) {
|
|
|
- messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)
|
|
|
- if (messageBufferSubscription) status = 0
|
|
|
- }
|
|
|
- if (status === 1) {
|
|
|
- messageReleaseSubscription = this.activateReleaseSubscription(messageReleaseSubscription, applicationOutgoingMessage, releaseMessageSubject)
|
|
|
- if (!messageReleaseSubscription) status = 0
|
|
|
- }
|
|
|
- if (status === 1) {
|
|
|
- this.releaseMessageFromLocalBuffer(this.bufferedStorage).then((resObs: Observable<Message>) => {
|
|
|
- resObs.subscribe({
|
|
|
- next: message => releaseMessageSubject.next(message),
|
|
|
- error: err => console.error(err),
|
|
|
- complete: () => {
|
|
|
- this.bufferedStorage = []
|
|
|
- console.log(`Reset buffer Storage count: ${this.bufferedStorage.length}. All messages have been released back into the stream.`)
|
|
|
- }
|
|
|
- })
|
|
|
- }).catch((err) => {
|
|
|
- status = 0
|
|
|
- console.error(err)
|
|
|
- })
|
|
|
- }
|
|
|
- if (status === 1) {
|
|
|
- this.releaseMessageFromMongoStorage().then((resObs: Subject<Message>) => {
|
|
|
- resObs.subscribe({
|
|
|
- next: message => releaseMessageSubject.next(message),
|
|
|
- error: err => console.error(err),
|
|
|
- complete: () => console.log(`All Mongo data are transferred `)
|
|
|
- })
|
|
|
- }).catch((err) => {
|
|
|
- status = 0
|
|
|
- console.error(err)
|
|
|
- })
|
|
|
- }
|
|
|
- if (status === 0) {
|
|
|
- console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (report.code == ColorCode.YELLOW) {
|
|
|
- if (report.payload) {
|
|
|
- console.log(`Rebuffering ${report.payload.message?.appData?.msgId} into buffer...`)
|
|
|
- this.bufferedStorage.push(report.payload)
|
|
|
- }
|
|
|
-
|
|
|
- let status: Status = 1
|
|
|
-
|
|
|
- if (status === 1) {
|
|
|
- messageBufferSubscription = this.activateBufferSubscription(this.bufferedStorage, messageBufferSubscription, applicationOutgoingMessage)
|
|
|
- if (!messageBufferSubscription) status = 0
|
|
|
- }
|
|
|
- if (status === 1) {
|
|
|
- messageReleaseSubscription = this.deactivateReleaseSubscription(messageReleaseSubscription)
|
|
|
- if (messageReleaseSubscription) status = 0
|
|
|
- }
|
|
|
- if (status === 0) {
|
|
|
- console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- flow from applicationOutgoingMessage into Mongo */
|
|
|
- if (report.code == ColorCode.RED) {
|
|
|
-
|
|
|
- if (report.payload) {
|
|
|
- console.log(`Rebuffering ${report.payload.message?.appData?.msgId} into storage...`)
|
|
|
- this.saveToMongo(report.payload)
|
|
|
- }
|
|
|
- console.log(`Connection status report && ${report.message ?? 'No Message'}`)
|
|
|
- let status: Status = 1
|
|
|
- if (status === 1) {
|
|
|
- messageStreamToMongo = this.activateMongoStreamSubscription(messageStreamToMongo, applicationOutgoingMessage)
|
|
|
- if (!messageStreamToMongo) status = 0
|
|
|
- }
|
|
|
- if (status === 1) {
|
|
|
- messageReleaseSubscription = this.deactivateReleaseSubscription(messageReleaseSubscription)
|
|
|
- if (messageReleaseSubscription) status = 0
|
|
|
- }
|
|
|
- if (status === 1) {
|
|
|
- messageBufferSubscription = this.deactivateBufferSubscription(messageBufferSubscription)
|
|
|
- if (messageBufferSubscription) status = 0
|
|
|
- }
|
|
|
- if (status === 1) {
|
|
|
- this.transferBufferedMessageToMongoStorage(this.bufferedStorage, messageBufferSubscription).then((res: any[]) => {
|
|
|
- if (res.length !== this.bufferedStorage.length || this.bufferedStorage.length > 0) status = -1
|
|
|
- })
|
|
|
- }
|
|
|
- if (status === 0) {
|
|
|
- console.log(`Something Went Wrong in handling ${ColorCode.RED} report.`)
|
|
|
- }
|
|
|
- }
|
|
|
- if (!report.code) {
|
|
|
- console.log(`Unknown message...`)
|
|
|
- }
|
|
|
- })
|
|
|
- return releaseMessageSubject
|
|
|
- }
|
|
|
-
|
|
|
- private checkIfMongoBufferHasData(): Promise<boolean> {
|
|
|
- return new Promise(async (resolve, reject) => {
|
|
|
- if (this.messageModel) {
|
|
|
- try {
|
|
|
- const count = await this.messageModel.countDocuments();
|
|
|
- resolve(count > 0)
|
|
|
- }
|
|
|
- catch (error) {
|
|
|
- reject(error)
|
|
|
- }
|
|
|
- }
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- private checkBufferLimit(message: Subject<Message>, statusReport: Subject<ReportStatus>) {
|
|
|
- let status: Status = 1
|
|
|
- if (status = 1) {
|
|
|
- message.subscribe(() => {
|
|
|
- if (this.bufferedStorage.length >= this.maximumBufferLength) {
|
|
|
-
|
|
|
- console.log(`Buffer length exceeds limit imposed!!!`)
|
|
|
- let report: ReportStatus = {
|
|
|
- code: ColorCode.RED,
|
|
|
- message: `Buffer is exceeding limit. Initiate storage transfer to designated database.`,
|
|
|
- }
|
|
|
- statusReport.next(report)
|
|
|
- }
|
|
|
- })
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- private activateReleaseSubscription(messageReleaseSubscription: Subscription | null, applicationOutgoingMessage: Subject<Message>, releaseMessageSubject: Subject<Message>): Subscription | null {
|
|
|
- let status: Status = 1
|
|
|
- if (status = 1) {
|
|
|
- if (!messageReleaseSubscription) {
|
|
|
- messageReleaseSubscription = applicationOutgoingMessage.subscribe({
|
|
|
- next: (message: Message) => {
|
|
|
- console.log(`Releasing ${(message.message as MessageLog).appData.msgId}...`);
|
|
|
- releaseMessageSubject.next(message);
|
|
|
- },
|
|
|
- error: (err) => console.error(err),
|
|
|
- complete: () => { },
|
|
|
- });
|
|
|
- console.log(`Subscription message release activated.`);
|
|
|
- } else {
|
|
|
- status = 0
|
|
|
- console.log(`Subscription message release is already active.`);
|
|
|
- }
|
|
|
- }
|
|
|
- return messageReleaseSubscription
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- private deactivateReleaseSubscription(messageReleaseSubscription: Subscription | null): Subscription | null {
|
|
|
- let status: Status = 1
|
|
|
- if (status = 1) {
|
|
|
- if (messageReleaseSubscription) {
|
|
|
- messageReleaseSubscription.unsubscribe();
|
|
|
- messageReleaseSubscription = null;
|
|
|
- console.log(`Subscription message release deactivated.`);
|
|
|
- } else {
|
|
|
- console.log(`Subscription message release is already deactivated.`);
|
|
|
- }
|
|
|
- }
|
|
|
- return messageReleaseSubscription
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- private activateBufferSubscription(bufferStorage: Message[], messageBufferSubscription: Subscription | null, applicationOutgoingMessage: Subject<Message>): Subscription | null {
|
|
|
- let status: Status = 1
|
|
|
- if (status = 1) {
|
|
|
- if (!messageBufferSubscription) {
|
|
|
- messageBufferSubscription = applicationOutgoingMessage.subscribe({
|
|
|
- next: (message: any) => {
|
|
|
- console.log(`Buffering ${(message.message as MessageLog).appData.msgId}... Local array length: ${bufferStorage.length}`);
|
|
|
- bufferStorage.push(message)
|
|
|
- },
|
|
|
- error: (err) => console.error(err),
|
|
|
- complete: () => { },
|
|
|
- });
|
|
|
- console.log(`Subscription message buffer activated.`);
|
|
|
- } else {
|
|
|
- status = 0
|
|
|
- console.log(`Subscription message buffer is already active.`);
|
|
|
- }
|
|
|
- }
|
|
|
- return messageBufferSubscription
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- private deactivateBufferSubscription(messageBufferSubscription: Subscription | null): Subscription | null {
|
|
|
- let status: Status = 1
|
|
|
- if (status) {
|
|
|
-
|
|
|
- if (messageBufferSubscription) {
|
|
|
- messageBufferSubscription.unsubscribe();
|
|
|
- messageBufferSubscription = null;
|
|
|
- console.log(`Subscription message buffer deactivated.`);
|
|
|
- } else {
|
|
|
- status = 0
|
|
|
- console.log(`Subscription message buffer is already deactivated.`);
|
|
|
- }
|
|
|
- }
|
|
|
- return null
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- private activateMongoStreamSubscription(messageStreamToMongo: Subscription | null, applicationOutgoingMessage: Subject<Message>): Subscription | null {
|
|
|
- let status: Status = 1
|
|
|
- if (status = 1) {
|
|
|
- if (!messageStreamToMongo) {
|
|
|
- messageStreamToMongo = applicationOutgoingMessage.subscribe({
|
|
|
- next: (message: any) => {
|
|
|
- console.log(`Saving ${(message.message as MessageLog).appData.msgId}...`);
|
|
|
- this.saveToMongo(message)
|
|
|
- },
|
|
|
- error: (err) => console.error(err),
|
|
|
- complete: () => { },
|
|
|
- });
|
|
|
- console.log(`Subscription message streaming to Mongo activated.`);
|
|
|
- } else {
|
|
|
- status = 0
|
|
|
- console.log(`Subscription message streaming to Mongo is already active.`);
|
|
|
- }
|
|
|
- }
|
|
|
- return messageStreamToMongo
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- private deactivateMongoStreamSubscription(messageStreamToMongo: Subscription | null): Subscription | null {
|
|
|
- let status: Status = 1
|
|
|
- if (status = 1) {
|
|
|
- if (messageStreamToMongo) {
|
|
|
- messageStreamToMongo.unsubscribe();
|
|
|
- messageStreamToMongo = null;
|
|
|
- console.log(`Subscription message streaming to Mongo deactivated.`);
|
|
|
- } else {
|
|
|
- status = 0
|
|
|
- console.log(`Subscription message streaming to Mongo is already deactivated.`);
|
|
|
- }
|
|
|
- }
|
|
|
- return messageStreamToMongo
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- private async saveToMongo(message: Message): Promise<boolean> {
|
|
|
- return new Promise((resolve, reject) => {
|
|
|
-
|
|
|
- if (this.messageModel) {
|
|
|
- this.messageModel.create(message).then(() => {
|
|
|
- console.log(`Saved MessageID ${(message.message as MessageLog).appData.msgId} into ${this.mongoUrl}`);
|
|
|
- resolve(true)
|
|
|
- }).catch((err) => {
|
|
|
- console.log(`MongoSaveError: ${err.message}`)
|
|
|
- reject(err)
|
|
|
- })
|
|
|
- } else {
|
|
|
- console.log(`Cant save message. Message Model is absent or not properly initialized`)
|
|
|
- }
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- private async transferBufferedMessageToMongoStorage(bufferedMessage: Message[], messageBufferSubscription: Subscription | null): Promise<Message[]> {
|
|
|
- return new Promise((resolve, reject) => {
|
|
|
- let status: Status = 1
|
|
|
- if (status = 1) {
|
|
|
- let bufferedStorage: Observable<Message> = from(bufferedMessage)
|
|
|
- bufferedStorage.subscribe({
|
|
|
- next: (message: any) => {
|
|
|
- this.saveToMongo(message).then((res) => {
|
|
|
- console.log(`Message ${(message.message as MessageLog).appData.msgId} saved successfully...`)
|
|
|
- }).catch((err) => console.error(err))
|
|
|
- },
|
|
|
- error: (error) => {
|
|
|
- reject(error)
|
|
|
- console.error(error)
|
|
|
- },
|
|
|
- complete: () => {
|
|
|
- this.bufferedStorage = []
|
|
|
- if (messageBufferSubscription) {
|
|
|
- console.log(`All ${bufferedMessage.length} buffered messages have been sent for transfer to ${this.mongoUrl}. Current length: ${this.bufferedStorage.length}`)
|
|
|
- }
|
|
|
- resolve(this.bufferedStorage)
|
|
|
- }
|
|
|
- })
|
|
|
- }
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- private async releaseMessageFromLocalBuffer(bufferedStorage: Message[]): Promise<Observable<Message>> {
|
|
|
- return new Promise((resolve, reject) => {
|
|
|
- let status: Status = 1
|
|
|
- if (status = 1) {
|
|
|
- if (bufferedStorage.length > 1) {
|
|
|
- let caseVariable = this.bufferedStorage.length > 1;
|
|
|
- console.log(`Releasing data from local buffer instance. There ${caseVariable ? "is" : "are"} ${this.bufferedStorage.length} messages...`);
|
|
|
- let returnArrayObs: Observable<Message> = from(bufferedStorage)
|
|
|
- resolve(returnArrayObs)
|
|
|
- } else {
|
|
|
- let message = `There is no data in stored in local instance`
|
|
|
- reject(message)
|
|
|
- }
|
|
|
- }
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- private async releaseMessageFromMongoStorage(): Promise<Subject<Message>> {
|
|
|
- return new Promise((resolve, reject) => {
|
|
|
- let status: Status = 1
|
|
|
- if (status = 1) {
|
|
|
- let dataSubject: Subject<Message> = new Subject()
|
|
|
- this.extractAllMessages(dataSubject)
|
|
|
- resolve(dataSubject)
|
|
|
- }
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- private async connectToMongoDatabase(databaseName: string): Promise<any> {
|
|
|
- return new Promise((resolve, reject) => {
|
|
|
- let status: Status = 1
|
|
|
- if (status = 1) {
|
|
|
- let database = this.mongoUrl + databaseName
|
|
|
- console.log(`Connected to ${database}`)
|
|
|
- this.mongoConnection = mongoose.createConnection(database)
|
|
|
- this.mongoConnection.on('error', (error) => {
|
|
|
- console.error('Connection error:', error);
|
|
|
- resolve('')
|
|
|
- });
|
|
|
- this.mongoConnection.once('open', () => {
|
|
|
-
|
|
|
- let report: ReportStatus = {
|
|
|
- code: ColorCode.RED,
|
|
|
- message: `Mongo storage available`
|
|
|
- }
|
|
|
- this.messageModel = this.mongoConnection.model('Message', require('../models/message.schema'));
|
|
|
- this.statusReport.next(report)
|
|
|
- });
|
|
|
- }
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- private async manageMongoConnection(databaseName: string): Promise<boolean> {
|
|
|
- while (true) {
|
|
|
- try {
|
|
|
- await this.connectToMongoDatabase(databaseName)
|
|
|
- } catch (error) {
|
|
|
- console.log(`Something Wrong occured. Please check at manageMongoConnection`)
|
|
|
- }
|
|
|
- await new Promise(resolve => setTimeout(resolve, 1000));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- public async extractAllMessages(subjectArgs: Subject<Message>): Promise<void> {
|
|
|
-
|
|
|
- let status: Status = 1
|
|
|
- if (status = 1) {
|
|
|
- if (this.messageModel) {
|
|
|
- const eventStream = this.messageModel.find().lean().cursor()
|
|
|
- eventStream.on('data', (message) => {
|
|
|
-
|
|
|
- subjectArgs.next(message);
|
|
|
- });
|
|
|
- eventStream.on('end', async () => {
|
|
|
-
|
|
|
- subjectArgs.complete();
|
|
|
-
|
|
|
- try {
|
|
|
- if (this.messageModel) {
|
|
|
- await this.messageModel.deleteMany({});
|
|
|
- console.log('Data in Mongo deleted successfully.');
|
|
|
- } else {
|
|
|
- console.log(`Message Mongoose Model is not intiated properly...`)
|
|
|
- }
|
|
|
- } catch (err) {
|
|
|
- console.error('Error deleting data:', err);
|
|
|
- }
|
|
|
- });
|
|
|
- } else {
|
|
|
- status = 0
|
|
|
- console.log(`Error: Message Model is ${this.messageModel}!! Please set up the mongoose connection properly!`)
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-}
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|