buffer.service.ts 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. // bufferService.ts
  2. import { BehaviorSubject, Observable, Subject, from, map, switchMap } from 'rxjs';
  3. import mongoose, { Connection, Model, Document } from 'mongoose';
  4. import { ConnectionState, Message, MessageLog } from '../interfaces/general.interface';
  5. import { resolve } from 'path';
  6. export class BufferService {
  7. private messageFromApplication: Subject<Message>;
  8. private messageFromBuffer: Subject<Message>
  9. private connectionState: BehaviorSubject<ConnectionState>
  10. private currentSource: Subject<Message>
  11. private messageBuffer: Message[] = [];
  12. private messageModel: Model<Message> | undefined;
  13. private readonly dbUrl: string = process.env.MONGO as string;
  14. constructor(
  15. messageFromApp: Subject<Message>,
  16. connectionStateSubject: BehaviorSubject<ConnectionState>,
  17. dbName: string
  18. ) {
  19. this.messageFromBuffer = new Subject<Message>();
  20. this.messageFromApplication = messageFromApp;
  21. this.currentSource = this.messageFromBuffer
  22. this.connectionState = connectionStateSubject;
  23. this.setupSubscriptions(); // Note: The handle buffer will push the data in local array before pushing to mongo via initial check up model
  24. /* Disable for now. Use local array first */
  25. // this.initializeDatabaseConnection(dbName).then((connection: mongoose.Connection) => {
  26. // const grpcMessageSchema = require('../models/message.schema');
  27. // this.messageModel = connection.model<Message>('Message', grpcMessageSchema)
  28. // this.transferLocalBufferToMongoDB() // transfer all data from local array into mongodb after the mongo setup is complete
  29. // }).catch(error => {
  30. // console.error('Database initialization failed:', error);
  31. // // Implement retry logic or additional error handling here
  32. // });
  33. }
  34. public getMessages(): Observable<Message> {
  35. return this.currentSource as Observable<Message>
  36. }
  37. private setupSubscriptions(): void {
  38. this.messageFromApplication.subscribe({
  39. next: (message: Message) => this.handleIncomingMessage(message),
  40. error: (err) => console.error('Error in messageToBePublished subject:', err),
  41. complete: () => console.log('messageToBePublished subscription completed')
  42. });
  43. this.connectionState.subscribe({
  44. next: (state: ConnectionState) => this.handleConnectionStateChanges(state),
  45. error: (err) => console.error('Error in connectionState subject:', err),
  46. complete: () => console.log('connectionState subscription completed')
  47. });
  48. }
  49. private async initializeDatabaseConnection(dbName: string): Promise<Connection> {
  50. try {
  51. console.log(`${this.dbUrl}${dbName}`)
  52. const connection: mongoose.Connection = await mongoose.createConnection(`${this.dbUrl}${dbName}`);
  53. console.log(`Connected to ${this.dbUrl}${dbName}`)
  54. return connection;
  55. } catch (error) {
  56. console.error('Error connecting to MongoDB:', error);
  57. throw error;
  58. }
  59. }
  60. private handleIncomingMessage(message: Message): void {
  61. if (this.connectionState.getValue().status === 'BUFFER') {
  62. this.bufferMessage(message);
  63. }
  64. if (this.connectionState.getValue().status === 'DIRECT_PUBLISH') {
  65. // console.log(`There is ${this.messageBuffer.length} messages in the buffer`) // somehw this is called repeatedly
  66. // do nothing for now
  67. // Start releasing buffered messages, but don't wait for it to complete
  68. // this.isBufferNotEmpty().then(isNotEmpty => {
  69. // if (isNotEmpty) {
  70. // // this.releaseBufferedMessages(this.messageFromBuffer);
  71. // // Continue to buffer new incoming message during the release process
  72. // this.bufferMessage(message);
  73. // } else {
  74. // // If buffer is empty, switch source and handle the message directly
  75. // this.messageToBePublished.next(message); // Handle the message directly
  76. // }
  77. // });
  78. }
  79. }
  80. private handleConnectionStateChanges(state: ConnectionState): void {
  81. console.log(this.connectionState.getValue().status)
  82. if (state.status === 'BUFFER') {
  83. this.currentSource = this.messageFromBuffer
  84. if (state.payload && typeof state.payload !== 'string') {
  85. this.bufferMessage(state.payload); // Buffer the last message immediately
  86. }
  87. }
  88. if (state.status === 'DIRECT_PUBLISH') {
  89. if (this.messageBuffer.length > 0) { // temporary only since i am not using mongoDB for buffering atm
  90. this.releaseBufferedMessages(this.messageFromBuffer).then(() => {
  91. console.log(`Switching to main publisher from source`)
  92. this.currentSource = this.messageFromApplication
  93. }).catch((err) => {
  94. console.error(err)
  95. })
  96. }
  97. if (this.messageBuffer.length < 1) {
  98. this.currentSource = this.messageFromApplication
  99. }
  100. /* This is for mongo */
  101. // this.isBufferNotEmpty().then(isNotEmpty => {
  102. // if (isNotEmpty) {
  103. // this.currentMessageSource = this.messageFromBuffer
  104. // } else {
  105. // this.currentMessageSource = this.messageToBePublished
  106. // }
  107. // })
  108. }
  109. }
  110. private async isBufferNotEmpty(): Promise<boolean> {
  111. if (this.messageModel) {
  112. // Check the count in MongoDB
  113. const count = await this.messageModel.estimatedDocumentCount().exec();
  114. return count > 0;
  115. } else {
  116. // Check the local buffer
  117. return this.messageBuffer.length > 0;
  118. }
  119. }
  120. private async bufferMessage(message: Message): Promise<void> {
  121. if (this.messageModel) {
  122. try {
  123. // const newMessage = new this.messageModel(message);
  124. await this.messageModel.create(message);
  125. console.log(`Message${(message.message as MessageLog).appData.msgId} saved to MongoDB buffer`);
  126. } catch (error) {
  127. console.error('Error saving message to MongoDB:', error);
  128. // Implement retry logic or additional error handling here
  129. }
  130. } else {
  131. this.messageBuffer.push(message); // Fallback to local buffer if model is not defined
  132. console.log(`pushing ${(message.message as MessageLog).appData.msgId} into local array buffer.... There is now ${this.messageBuffer.length} messages`)
  133. }
  134. }
  135. private releaseBufferedMessages(messageFromBuffer: Subject<Message>): Promise<boolean> {
  136. return new Promise((resolve, reject) => {
  137. if (this.messageModel) {
  138. const stream = this.messageModel.find().cursor();
  139. stream.on('data', async (message) => {
  140. // Process each message individually
  141. messageFromBuffer.next(message);
  142. });
  143. stream.on('error', (error) => {
  144. console.error('Error streaming messages from MongoDB:', error);
  145. reject(error)
  146. });
  147. stream.on('end', async () => {
  148. // Delete the data once it has been streamed
  149. try {
  150. if (this.messageModel) {
  151. await this.messageModel.deleteMany({});
  152. console.log('Data in Mongo deleted successfully.');
  153. } else {
  154. console.log(`Message Mongoose Model is not intiated properly...`)
  155. }
  156. } catch (err) {
  157. console.error('Error deleting data:', err);
  158. }
  159. resolve(true)
  160. });
  161. }
  162. if (!this.messageModel) {
  163. // If MongoDB model is not defined, use the local buffer
  164. console.log(`Releasing buffer Message: currently there is ${this.messageBuffer.length} messages to be released`)
  165. this.messageBuffer.forEach(message => this.messageFromBuffer.next(message));
  166. this.messageBuffer.length = 0 // Clear the local buffer after transferring
  167. if (this.messageBuffer.length < 1) {
  168. resolve(true)
  169. } else {
  170. reject(`Somehow the array is not emptied. This should not happen`)
  171. }
  172. }
  173. })
  174. }
  175. public getStateObservable(): BehaviorSubject<ConnectionState> {
  176. return this.connectionState;
  177. }
  178. private async transferLocalBufferToMongoDB(): Promise<void> {
  179. if (this.messageModel) {
  180. this.messageBuffer.forEach(async message => {
  181. try {
  182. if (this.messageModel) {
  183. await this.messageModel.create(message);
  184. }
  185. } catch (error) {
  186. console.error('Error transferring message to MongoDB:', error);
  187. }
  188. })
  189. this.messageBuffer = []; // Clear local buffer after transferring
  190. }
  191. }
  192. // Additional methods as required...
  193. }