|
@@ -0,0 +1,213 @@
|
|
|
+// bufferService.ts
|
|
|
+import { BehaviorSubject, Observable, Subject, from, map, switchMap } from 'rxjs';
|
|
|
+import mongoose, { Connection, Model, Document } from 'mongoose';
|
|
|
+import { ConnectionState, Message, MessageLog } from '../interfaces/general.interface';
|
|
|
+import { resolve } from 'path';
|
|
|
+
|
|
|
+export class BufferService {
|
|
|
+ private messageFromApplication: Subject<Message>;
|
|
|
+ private messageFromBuffer: Subject<Message>
|
|
|
+ private connectionState: BehaviorSubject<ConnectionState>
|
|
|
+ private currentSource: Subject<Message>
|
|
|
+ private messageBuffer: Message[] = [];
|
|
|
+ private messageModel: Model<Message> | undefined;
|
|
|
+ private readonly dbUrl: string = process.env.MONGO as string;
|
|
|
+
|
|
|
+
|
|
|
+ constructor(
|
|
|
+ messageFromApp: Subject<Message>,
|
|
|
+ connectionStateSubject: BehaviorSubject<ConnectionState>,
|
|
|
+ dbName: string
|
|
|
+ ) {
|
|
|
+ this.messageFromBuffer = new Subject<Message>();
|
|
|
+ this.messageFromApplication = messageFromApp;
|
|
|
+ this.currentSource = this.messageFromBuffer
|
|
|
+ this.connectionState = connectionStateSubject;
|
|
|
+ this.setupSubscriptions(); // Note: The handle buffer will push the data in local array before pushing to mongo via initial check up model
|
|
|
+
|
|
|
+ /* Disable for now. Use local array first */
|
|
|
+ // this.initializeDatabaseConnection(dbName).then((connection: mongoose.Connection) => {
|
|
|
+ // const grpcMessageSchema = require('../models/message.schema');
|
|
|
+ // this.messageModel = connection.model<Message>('Message', grpcMessageSchema)
|
|
|
+ // this.transferLocalBufferToMongoDB() // transfer all data from local array into mongodb after the mongo setup is complete
|
|
|
+ // }).catch(error => {
|
|
|
+ // console.error('Database initialization failed:', error);
|
|
|
+ // // Implement retry logic or additional error handling here
|
|
|
+ // });
|
|
|
+ }
|
|
|
+
|
|
|
+ public getMessages(): Observable<Message> {
|
|
|
+ return this.currentSource as Observable<Message>
|
|
|
+ }
|
|
|
+
|
|
|
+ private setupSubscriptions(): void {
|
|
|
+ this.messageFromApplication.subscribe({
|
|
|
+ next: (message: Message) => this.handleIncomingMessage(message),
|
|
|
+ error: (err) => console.error('Error in messageToBePublished subject:', err),
|
|
|
+ complete: () => console.log('messageToBePublished subscription completed')
|
|
|
+ });
|
|
|
+
|
|
|
+ this.connectionState.subscribe({
|
|
|
+ next: (state: ConnectionState) => this.handleConnectionStateChanges(state),
|
|
|
+ error: (err) => console.error('Error in connectionState subject:', err),
|
|
|
+ complete: () => console.log('connectionState subscription completed')
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ private async initializeDatabaseConnection(dbName: string): Promise<Connection> {
|
|
|
+ try {
|
|
|
+ console.log(`${this.dbUrl}${dbName}`)
|
|
|
+ const connection: mongoose.Connection = await mongoose.createConnection(`${this.dbUrl}${dbName}`);
|
|
|
+ console.log(`Connected to ${this.dbUrl}${dbName}`)
|
|
|
+ return connection;
|
|
|
+ } catch (error) {
|
|
|
+ console.error('Error connecting to MongoDB:', error);
|
|
|
+ throw error;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private handleIncomingMessage(message: Message): void {
|
|
|
+ if (this.connectionState.getValue().status === 'BUFFER') {
|
|
|
+ this.bufferMessage(message);
|
|
|
+ }
|
|
|
+ if (this.connectionState.getValue().status === 'DIRECT_PUBLISH') {
|
|
|
+ // console.log(`There is ${this.messageBuffer.length} messages in the buffer`) // somehw this is called repeatedly
|
|
|
+ // do nothing for now
|
|
|
+ // Start releasing buffered messages, but don't wait for it to complete
|
|
|
+ // this.isBufferNotEmpty().then(isNotEmpty => {
|
|
|
+ // if (isNotEmpty) {
|
|
|
+ // // this.releaseBufferedMessages(this.messageFromBuffer);
|
|
|
+ // // Continue to buffer new incoming message during the release process
|
|
|
+ // this.bufferMessage(message);
|
|
|
+ // } else {
|
|
|
+ // // If buffer is empty, switch source and handle the message directly
|
|
|
+ // this.messageToBePublished.next(message); // Handle the message directly
|
|
|
+ // }
|
|
|
+ // });
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private handleConnectionStateChanges(state: ConnectionState): void {
|
|
|
+ console.log(this.connectionState.getValue().status)
|
|
|
+ if (state.status === 'BUFFER') {
|
|
|
+ this.currentSource = this.messageFromBuffer
|
|
|
+ if (state.payload && typeof state.payload !== 'string') {
|
|
|
+ this.bufferMessage(state.payload); // Buffer the last message immediately
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (state.status === 'DIRECT_PUBLISH') {
|
|
|
+ if (this.messageBuffer.length > 0) { // temporary only since i am not using mongoDB for buffering atm
|
|
|
+ this.releaseBufferedMessages(this.messageFromBuffer).then(() => {
|
|
|
+ console.log(`Switching to main publisher from source`)
|
|
|
+ this.currentSource = this.messageFromApplication
|
|
|
+ }).catch((err) => {
|
|
|
+ console.error(err)
|
|
|
+ })
|
|
|
+ }
|
|
|
+ if (this.messageBuffer.length < 1) {
|
|
|
+ this.currentSource = this.messageFromApplication
|
|
|
+ }
|
|
|
+ /* This is for mongo */
|
|
|
+ // this.isBufferNotEmpty().then(isNotEmpty => {
|
|
|
+ // if (isNotEmpty) {
|
|
|
+ // this.currentMessageSource = this.messageFromBuffer
|
|
|
+ // } else {
|
|
|
+ // this.currentMessageSource = this.messageToBePublished
|
|
|
+ // }
|
|
|
+ // })
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private async isBufferNotEmpty(): Promise<boolean> {
|
|
|
+ if (this.messageModel) {
|
|
|
+ // Check the count in MongoDB
|
|
|
+ const count = await this.messageModel.estimatedDocumentCount().exec();
|
|
|
+ return count > 0;
|
|
|
+ } else {
|
|
|
+ // Check the local buffer
|
|
|
+ return this.messageBuffer.length > 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private async bufferMessage(message: Message): Promise<void> {
|
|
|
+ if (this.messageModel) {
|
|
|
+ try {
|
|
|
+ // const newMessage = new this.messageModel(message);
|
|
|
+ await this.messageModel.create(message);
|
|
|
+ console.log(`Message${(message.message as MessageLog).appData.msgId} saved to MongoDB buffer`);
|
|
|
+ } catch (error) {
|
|
|
+ console.error('Error saving message to MongoDB:', error);
|
|
|
+ // Implement retry logic or additional error handling here
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ this.messageBuffer.push(message); // Fallback to local buffer if model is not defined
|
|
|
+ console.log(`pushing ${(message.message as MessageLog).appData.msgId} into local array buffer.... There is now ${this.messageBuffer.length} messages`)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private releaseBufferedMessages(messageFromBuffer: Subject<Message>): Promise<boolean> {
|
|
|
+ return new Promise((resolve, reject) => {
|
|
|
+ if (this.messageModel) {
|
|
|
+ const stream = this.messageModel.find().cursor();
|
|
|
+
|
|
|
+ stream.on('data', async (message) => {
|
|
|
+ // Process each message individually
|
|
|
+ messageFromBuffer.next(message);
|
|
|
+ });
|
|
|
+
|
|
|
+ stream.on('error', (error) => {
|
|
|
+ console.error('Error streaming messages from MongoDB:', error);
|
|
|
+ reject(error)
|
|
|
+ });
|
|
|
+
|
|
|
+ stream.on('end', async () => {
|
|
|
+ // Delete the data once it has been streamed
|
|
|
+ try {
|
|
|
+ if (this.messageModel) {
|
|
|
+ await this.messageModel.deleteMany({});
|
|
|
+ console.log('Data in Mongo deleted successfully.');
|
|
|
+ } else {
|
|
|
+ console.log(`Message Mongoose Model is not intiated properly...`)
|
|
|
+ }
|
|
|
+ } catch (err) {
|
|
|
+ console.error('Error deleting data:', err);
|
|
|
+ }
|
|
|
+ resolve(true)
|
|
|
+ });
|
|
|
+ }
|
|
|
+ if (!this.messageModel) {
|
|
|
+ // If MongoDB model is not defined, use the local buffer
|
|
|
+ console.log(`Releasing buffer Message: currently there is ${this.messageBuffer.length} messages to be released`)
|
|
|
+ this.messageBuffer.forEach(message => this.messageFromBuffer.next(message));
|
|
|
+ this.messageBuffer.length = 0 // Clear the local buffer after transferring
|
|
|
+ if (this.messageBuffer.length < 1) {
|
|
|
+ resolve(true)
|
|
|
+ } else {
|
|
|
+ reject(`Somehow the array is not emptied. This should not happen`)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ public getStateObservable(): BehaviorSubject<ConnectionState> {
|
|
|
+ return this.connectionState;
|
|
|
+ }
|
|
|
+
|
|
|
+ private async transferLocalBufferToMongoDB(): Promise<void> {
|
|
|
+ if (this.messageModel) {
|
|
|
+ this.messageBuffer.forEach(async message => {
|
|
|
+ try {
|
|
|
+ if (this.messageModel) {
|
|
|
+ await this.messageModel.create(message);
|
|
|
+ }
|
|
|
+ } catch (error) {
|
|
|
+ console.error('Error transferring message to MongoDB:', error);
|
|
|
+ }
|
|
|
+ })
|
|
|
+ this.messageBuffer = []; // Clear local buffer after transferring
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Additional methods as required...
|
|
|
+}
|