// bufferService.ts import { BehaviorSubject, Observable, Subject, from, map, switchMap } from 'rxjs'; import mongoose, { Connection, Model, Document } from 'mongoose'; import { ConnectionState, Message, MessageLog } from '../interfaces/general.interface'; import { resolve } from 'path'; export class BufferService { private messageStream: Subject private connectionState: BehaviorSubject private messageBuffer: Message[] = []; private messageModel: Model | undefined; private readonly dbUrl: string = process.env.MONGO as string; constructor( messageFromApp: Subject, connectionStateSubject: BehaviorSubject, dbName: string ) { this.messageStream = messageFromApp; this.connectionState = connectionStateSubject; this.setupSubscriptions(); // Note: The handle buffer will push the data in local array before pushing to mongo via initial check up model /* Disable for now. Use local array first */ this.initializeDatabaseConnection(dbName).then((connection: mongoose.Connection) => { const grpcMessageSchema = require('../models/message.schema'); this.messageModel = connection.model('Message', grpcMessageSchema) this.transferLocalBufferToMongoDB() // transfer all data from local array into mongodb after the mongo setup is complete }).catch(error => { console.error('Database initialization failed:', error); // Implement retry logic or additional error handling here }); } public getMessages(): Observable { return this.messageStream as Observable } private setupSubscriptions(): void { this.messageStream.subscribe({ next: (message: Message) => this.handleIncomingMessage(message), error: (err) => console.error('Error in messageToBePublished subject:', err), complete: () => console.log('messageToBePublished subscription completed') }); this.connectionState.subscribe({ next: (state: ConnectionState) => this.handleConnectionStateChanges(state), error: (err) => console.error('Error in connectionState subject:', err), complete: () => console.log('connectionState subscription completed') }); } private async initializeDatabaseConnection(dbName: string): Promise { try { console.log(`${this.dbUrl}${dbName}`) const connection: mongoose.Connection = await mongoose.createConnection(`${this.dbUrl}${dbName}`); console.log(`Connected to ${this.dbUrl}${dbName}`) return connection; } catch (error) { console.error('Error connecting to MongoDB:', error); throw error; } } private handleIncomingMessage(message: Message): void { if (this.connectionState.getValue().status === 'BUFFER') { this.bufferMessage(message); } if (this.connectionState.getValue().status === 'DIRECT_PUBLISH') { // console.log(`There is ${this.messageBuffer.length} messages in the buffer`) // somehw this is called repeatedly // do nothing for now // Start releasing buffered messages, but don't wait for it to complete // this.isBufferNotEmpty().then(isNotEmpty => { // if (isNotEmpty) { // // this.releaseBufferedMessages(this.messageFromBuffer); // // Continue to buffer new incoming message during the release process // this.bufferMessage(message); // } else { // // If buffer is empty, switch source and handle the message directly // this.messageToBePublished.next(message); // Handle the message directly // } // }); } } private handleConnectionStateChanges(state: ConnectionState): void { console.log(this.connectionState.getValue().status) if (state.status === 'BUFFER') { if (state.payload && typeof state.payload !== 'string') { this.bufferMessage(state.payload); // Buffer the last message immediately } } if (state.status === 'DIRECT_PUBLISH') { this.releaseBufferedMessages(this.messageStream) /* This is for mongo */ // this.isBufferNotEmpty().then(isNotEmpty => { // if (isNotEmpty) { // this.currentMessageSource = this.messageFromBuffer // } else { // this.currentMessageSource = this.messageToBePublished // } // }) } } private async isBufferNotEmpty(): Promise { if (this.messageModel) { // Check the count in MongoDB const count = await this.messageModel.estimatedDocumentCount().exec(); return count > 0; } else { // Check the local buffer return this.messageBuffer.length > 0; } } private async bufferMessage(message: Message): Promise { if (this.messageModel) { try { // const newMessage = new this.messageModel(message); await this.messageModel.create(message); console.log(`Message${(message.message as MessageLog).appData.msgId} saved to MongoDB buffer`); } catch (error) { console.error('Error saving message to MongoDB:', error); // Implement retry logic or additional error handling here } } else { this.messageBuffer.push(message); // Fallback to local buffer if model is not defined console.log(`pushing ${(message.message as MessageLog).appData.msgId} into local array buffer.... There is now ${this.messageBuffer.length} messages`) } } private releaseBufferedMessages(messageFromBuffer: Subject): Promise { return new Promise((resolve, reject) => { if (this.messageModel) { this.messageModel.countDocuments({}).then((count) => { console.log(`There is ${count} messages in datatbase buffer at the moment. Releasing them....`); }) const stream = this.messageModel.find().cursor(); stream.on('data', async (message) => { // Process each message individually messageFromBuffer.next(message); }); stream.on('error', (error) => { console.error('Error streaming messages from MongoDB:', error); reject(error) }); stream.on('end', async () => { // Delete the data once it has been streamed try { if (this.messageModel) { await this.messageModel.deleteMany({}); console.log('Data in Mongo deleted successfully.'); } else { console.log(`Message Mongoose Model is not intiated properly...`) } } catch (err) { console.error('Error deleting data:', err); } resolve(true) }); } if (!this.messageModel) { // If MongoDB model is not defined, use the local buffer console.log(`Releasing buffer Message: currently there is ${this.messageBuffer.length} messages to be released`) this.messageBuffer.forEach(message => this.messageStream.next(message)); this.messageBuffer.length = 0 // Clear the local buffer after transferring if (this.messageBuffer.length < 1) { resolve(true) } else { reject(`Somehow the array is not emptied. This should not happen`) } } }) } public getStateObservable(): BehaviorSubject { return this.connectionState; } private async transferLocalBufferToMongoDB(): Promise { return new Promise((resolve, reject) => { console.log(`Releasing buffer Message: currently there is ${this.messageBuffer.length}. Transferring to database...`) if (this.messageModel) { this.messageBuffer.forEach(async message => { try { if (this.messageModel) { await this.messageModel.create(message); } } catch (error) { console.error('Error transferring message to MongoDB:', error); } }) this.messageBuffer = []; // Clear local buffer after transferring } }) } // Additional methods as required... }