123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- // bufferService.ts
- import {
- BehaviorSubject,
- Observable,
- Subject,
- from,
- map,
- switchMap,
- } from "rxjs";
- import mongoose, { Connection, Model, Document } from "mongoose";
- import {
- ConnectionState,
- Message,
- MessageLog,
- } from "../interfaces/general.interface";
- export class BufferService {
- private messageStream: Subject<Message>;
- private connectionState: BehaviorSubject<ConnectionState>;
- private messageBuffer: Message[] = [];
- private messageModel: Model<Message> | undefined;
- private readonly dbUrl: string = process.env.MONGO as string;
- constructor(
- messageFromApp: Subject<Message>,
- connectionStateSubject: BehaviorSubject<ConnectionState>,
- dbName: string
- ) {
- this.messageStream = messageFromApp;
- this.connectionState = connectionStateSubject;
- this.setupSubscriptions(); // Note: The handle buffer will push the data in local array before pushing to mongo via initial check up model
- // this.initializeDatabaseConnection(dbName)
- // .then((connection: mongoose.Connection) => {
- // const grpcMessageSchema = require("../models/message.schema");
- // this.messageModel = connection.model<Message>(
- // "Message",
- // grpcMessageSchema
- // );
- // this.transferLocalBufferToMongoDB(); // transfer all data from local array into mongodb after the mongo setup is complete
- // })
- // .catch((error) => {
- // console.error("Database initialization failed:", error);
- // // Implement retry logic or additional error handling here. Perhaps retry logic in the future...
- // });
- }
- // to be exposed to acquire the messages
- public getMessages(): Observable<Message> {
- return this.messageStream as Observable<Message>;
- }
- private setupSubscriptions(): void {
- this.messageStream.subscribe({
- next: (message: Message) => this.handleIncomingMessage(message),
- error: (err) =>
- console.error("Error in messageToBePublished subject:", err),
- complete: () =>
- console.log("messageToBePublished subscription completed"),
- });
- this.connectionState.subscribe({
- next: (state: ConnectionState) => this.handleConnectionStateChanges(state),
- error: (err) => console.error("Error in connectionState subject:", err),
- complete: () => console.log("connectionState subscription completed"),
- });
- }
- private async initializeDatabaseConnection(
- dbName: string
- ): Promise<Connection> {
- try {
- console.log(`${this.dbUrl}${dbName}`);
- const connection: mongoose.Connection = await mongoose.createConnection(
- `${this.dbUrl}${dbName}`
- );
- console.log(`Connected to ${this.dbUrl}${dbName}`);
- return connection;
- } catch (error) {
- console.error("Error connecting to MongoDB:", error);
- throw error;
- }
- }
- private handleIncomingMessage(message: Message): void {
- if (this.connectionState.getValue().status === "BUFFER") {
- this.bufferMessage(message);
- }
- if (this.connectionState.getValue().status === "DIRECT_PUBLISH") {
- /* Note: Since the main outGoingMessage is being published irregardless
- of the connection state, so there's no need to do anything aside from
- releasing buffered messages which will be handled by handleConnectionStateChange */
- // additional logic here
- }
- }
- private handleConnectionStateChanges(state: ConnectionState): void {
- console.log(this.connectionState.getValue().status);
- if (state.status === "BUFFER") {
- if (state.payload && typeof state.payload !== "string") {
- this.bufferMessage(state.payload); // Buffer the last message immediately
- }
- }
- if (state.status === "DIRECT_PUBLISH") {
- // Relese the messages by inserting them into the outgoing Messages together.
- this.releaseBufferedMessages(this.messageStream);
- }
- }
- private async bufferMessage(message: Message): Promise<void> {
- if (this.messageModel) {
- try {
- // const newMessage = new this.messageModel(message);
- await this.messageModel.create(message);
- this.messageModel.countDocuments({}).then((count) => {
- console.log(`Message${(message.message as MessageLog).appData.msgId} saved to MongoDB buffer. There is ${count} messages in datatbase at the moment.`);
- });
- } catch (error) {
- console.error("Error saving message to MongoDB:", error);
- // Implement retry logic or additional error handling here
- }
- } else {
- this.messageBuffer.push(message); // Fallback to local buffer if model is not defined
- console.log(`pushing ${(message.message as MessageLog).appData.msgId} into local array buffer.... There is now ${this.messageBuffer.length} messages`);
- }
- }
- private releaseBufferedMessages(
- messageFromBuffer: Subject<Message>
- ): Promise<boolean> {
- return new Promise((resolve, reject) => {
- if (this.messageModel) {
- this.messageModel.countDocuments({}).then((count) => {
- console.log(`There is ${count} messages in database buffer at the moment. Releasing them....`);
- });
- const stream = this.messageModel.find().cursor();
- stream.on("data", async (message) => {
- // Process each message individually`
- messageFromBuffer.next(message);
- });
- stream.on("error", (error) => {
- console.error("Error streaming messages from MongoDB:", error);
- reject(error);
- });
- stream.on("end", async () => {
- // Delete the data once it has been streamed
- try {
- if (this.messageModel) {
- await this.messageModel.deleteMany({});
- console.log("Data in Mongo deleted successfully.");
- } else {
- console.log(`Message Mongoose Model is not intiated properly...`);
- }
- } catch (err) {
- console.error("Error deleting data:", err);
- }
- resolve(true);
- });
- }
- if (!this.messageModel) {
- // If MongoDB model is not defined, use the local buffer
- console.log(`Releasing buffer Message: currently there is ${this.messageBuffer.length} messages to be released`);
- this.messageBuffer.forEach((message) =>
- this.messageStream.next(message)
- );
- this.messageBuffer.length = 0; // Clear the local buffer after transferring
- if (this.messageBuffer.length < 1) {
- resolve(true);
- } else {
- reject(`Somehow the array is not emptied. This should not happen`);
- }
- }
- });
- }
- public getStateObservable(): BehaviorSubject<ConnectionState> {
- return this.connectionState;
- }
- private async transferLocalBufferToMongoDB(): Promise<void> {
- return new Promise((resolve, reject) => {
- console.log(`Transferring local array buffered Message: currently there is ${this.messageBuffer.length}. Transferring to database...`);
- if (this.messageModel) {
- let locallyBufferedMessage: Observable<Message> = from(this.messageBuffer);
- locallyBufferedMessage.subscribe({
- next: async (message: Message) => {
- try {
- if (this.messageModel) {
- await this.messageModel.create(message);
- console.log(`Transferring ${(message.message as MessageLog).appData.msgId} into database.`);
- }
- } catch (error) {
- console.error("Error transferring message to MongoDB:", error);
- }
- },
- error: (err) => console.error(err),
- complete: () => {
- if (this.messageModel) {
- this.messageModel.countDocuments({}).then((count) => {
- console.log(`Local buffered message transfer completed. There is a total of ${count} messages in database at the moment.`)
- this.messageBuffer = [] // Clear local buffer after transferring
- });
- }
- },
- });
- }
- });
- }
- // Additional methods as required...
- }
|