// bufferService.ts import { BehaviorSubject, Observable, Subject, from } from "rxjs"; import mongoose, { Connection, Model } from "mongoose"; import { ConnectionState, Message, MessageLog, } from "../interfaces/general.interface"; export class BufferService { private bufferIdentifier!: string private messageStream: Subject; private connectionState: BehaviorSubject; private messageBuffer: Message[] = []; private messageModel: Model | undefined; private readonly dbUrl: string = process.env.MONGO as string; constructor( messageFromApp: Subject, connectionStateSubject: BehaviorSubject, dbName: string ) { this.bufferIdentifier = dbName this.messageStream = messageFromApp; this.connectionState = connectionStateSubject; this.setupSubscriptions(); // Note: The handle buffer will push the data in local array before pushing to mongo via initial check up model // this.initializeDatabaseConnection(dbName) // .then((connection: mongoose.Connection) => { // const grpcMessageSchema = require("../models/message.schema"); // this.messageModel = connection.model( // "Message", // grpcMessageSchema // ); // this.transferLocalBufferToMongoDB(); // transfer all data from local array into mongodb after the mongo setup is complete // }) // .catch((error) => { // console.error("Database initialization failed:", error); // // Implement retry logic or additional error handling here. Perhaps retry logic in the future... // }); } // to be exposed to acquire the messages public getMessages(): Observable { return this.messageStream as Observable; } private setupSubscriptions(): void { this.messageStream.subscribe({ next: (message: Message) => this.handleIncomingMessage(message), error: (err) => console.error("Error in messageToBePublished subject:", err), complete: () => console.log("messageToBePublished subscription completed"), }); this.connectionState.subscribe({ next: (state: ConnectionState) => this.handleConnectionStateChanges(state), error: (err) => console.error("Error in connectionState subject:", err), complete: () => console.log("connectionState subscription completed"), }); } private async initializeDatabaseConnection( dbName: string ): Promise { try { console.log(`${this.dbUrl}${dbName}`); const connection: mongoose.Connection = await mongoose.createConnection( `${this.dbUrl}${dbName}` ); console.log(`Connected to ${this.dbUrl}${dbName}`); return connection; } catch (error) { console.error("Error connecting to MongoDB:", error); throw error; } } private handleIncomingMessage(message: Message): void { if (this.connectionState.getValue().status === "BUFFER") { this.bufferMessage(message); } if (this.connectionState.getValue().status === "DIRECT_PUBLISH") { /* Note: Since the main outGoingMessage is being published irregardless of the connection state, so there's no need to do anything aside from releasing buffered messages which will be handled by handleConnectionStateChange */ // additional logic here } } private handleConnectionStateChanges(state: ConnectionState): void { console.log(`${this.bufferIdentifier}: ${this.connectionState.getValue().status}`); if (state.status === "BUFFER") { if (state.payload && typeof state.payload !== "string") { this.bufferMessage(state.payload); // Buffer the last message immediately } } if (state.status === "DIRECT_PUBLISH") { // Relese the messages by inserting them into the outgoing Messages together. this.releaseBufferedMessages(this.messageStream); } } private async bufferMessage(message: Message): Promise { if (this.messageModel) { try { // const newMessage = new this.messageModel(message); await this.messageModel.create(message); this.messageModel.countDocuments({}).then((count) => { console.log(`Message${(message.message as MessageLog).appData.msgId} saved to MongoDB buffer. There is ${count} messages in datatbase under ${this.bufferIdentifier} at the moment.`); }); } catch (error) { console.error("Error saving message to MongoDB:", error); // Implement retry logic or additional error handling here } } else { this.messageBuffer.push(message); // Fallback to local buffer if model is not defined console.log(`pushing ${(message.message as MessageLog).appData.msgId} into local array buffer under ${this.bufferIdentifier}.... There is now ${this.messageBuffer.length} messages`); } } private releaseBufferedMessages( messageFromBuffer: Subject ): Promise { return new Promise((resolve, reject) => { if (this.messageModel) { this.messageModel.countDocuments({}).then((count) => { console.log(`There is ${count} messages in database under ${this.bufferIdentifier} at the moment. Releasing them....`); }); const stream = this.messageModel.find().cursor(); stream.on("data", async (message) => { // Process each message individually` messageFromBuffer.next(message); }); stream.on("error", (error) => { console.error("Error streaming messages from MongoDB:", error); reject(error); }); stream.on("end", async () => { // Delete the data once it has been streamed try { if (this.messageModel) { await this.messageModel.deleteMany({}); console.log("Data in Mongo deleted successfully."); } else { console.log(`Message Mongoose Model is not intiated properly...`); } } catch (err) { console.error("Error deleting data:", err); } resolve(true); }); } if (!this.messageModel) { // If MongoDB model is not defined, use the local buffer console.log(`Releasing buffer Message under ${this.bufferIdentifier}: currently there is ${this.messageBuffer.length} messages to be released`); this.messageBuffer.forEach((message) => this.messageStream.next(message) ); this.messageBuffer.length = 0; // Clear the local buffer after transferring if (this.messageBuffer.length < 1) { resolve(true); } else { reject(`Somehow the array is not emptied. This should not happen`); } } }); } public getStateObservable(): BehaviorSubject { return this.connectionState; } private async transferLocalBufferToMongoDB(): Promise { return new Promise((resolve, reject) => { console.log(`Transferring local array buffered Message: currently there is ${this.messageBuffer.length}. Transferring to database...`); if (this.messageModel) { let locallyBufferedMessage: Observable = from(this.messageBuffer); locallyBufferedMessage.subscribe({ next: async (message: Message) => { try { if (this.messageModel) { await this.messageModel.create(message); console.log(`Transferring ${(message.message as MessageLog).appData.msgId} into database.`); } } catch (error) { console.error("Error transferring message to MongoDB:", error); } }, error: (err) => console.error(err), complete: () => { if (this.messageModel) { this.messageModel.countDocuments({}).then((count) => { console.log(`Local buffered message transfer completed. There is a total of ${count} messages in database at the moment.`) this.messageBuffer = [] // Clear local buffer after transferring }); } }, }); } }); } // Additional methods as required... }