buffer.service.ts 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. // bufferService.ts
  2. import { BehaviorSubject, Observable, Subject, from } from "rxjs";
  3. import mongoose, { Connection, Model } from "mongoose";
  4. import { ConnectionState, Message, MessageLog, } from "../interfaces/general.interface";
  5. export class BufferService {
  6. private bufferIdentifier!: string
  7. private messageStream: Subject<Message>;
  8. private connectionState: BehaviorSubject<ConnectionState>;
  9. private messageBuffer: Message[] = [];
  10. private messageModel: Model<Message> | undefined;
  11. private readonly dbUrl: string = process.env.MONGO as string;
  12. constructor(
  13. messageFromApp: Subject<Message>,
  14. connectionStateSubject: BehaviorSubject<ConnectionState>,
  15. dbName: string
  16. ) {
  17. this.bufferIdentifier = dbName
  18. this.messageStream = messageFromApp;
  19. this.connectionState = connectionStateSubject;
  20. this.setupSubscriptions(); // Note: The handle buffer will push the data in local array before pushing to mongo via initial check up model
  21. // this.initializeDatabaseConnection(dbName)
  22. // .then((connection: mongoose.Connection) => {
  23. // const grpcMessageSchema = require("../models/message.schema");
  24. // this.messageModel = connection.model<Message>(
  25. // "Message",
  26. // grpcMessageSchema
  27. // );
  28. // this.transferLocalBufferToMongoDB(); // transfer all data from local array into mongodb after the mongo setup is complete
  29. // })
  30. // .catch((error) => {
  31. // console.error("Database initialization failed:", error);
  32. // // Implement retry logic or additional error handling here. Perhaps retry logic in the future...
  33. // });
  34. }
  35. // to be exposed to acquire the messages
  36. public getMessages(): Observable<Message> {
  37. return this.messageStream as Observable<Message>;
  38. }
  39. private setupSubscriptions(): void {
  40. this.messageStream.subscribe({
  41. next: (message: Message) => this.handleIncomingMessage(message),
  42. error: (err) =>
  43. console.error("Error in messageToBePublished subject:", err),
  44. complete: () =>
  45. console.log("messageToBePublished subscription completed"),
  46. });
  47. this.connectionState.subscribe({
  48. next: (state: ConnectionState) => this.handleConnectionStateChanges(state),
  49. error: (err) => console.error("Error in connectionState subject:", err),
  50. complete: () => console.log("connectionState subscription completed"),
  51. });
  52. }
  53. private async initializeDatabaseConnection(
  54. dbName: string
  55. ): Promise<Connection> {
  56. try {
  57. console.log(`${this.dbUrl}${dbName}`);
  58. const connection: mongoose.Connection = await mongoose.createConnection(
  59. `${this.dbUrl}${dbName}`
  60. );
  61. console.log(`Connected to ${this.dbUrl}${dbName}`);
  62. return connection;
  63. } catch (error) {
  64. console.error("Error connecting to MongoDB:", error);
  65. throw error;
  66. }
  67. }
  68. private handleIncomingMessage(message: Message): void {
  69. if (this.connectionState.getValue().status === "BUFFER") {
  70. this.bufferMessage(message);
  71. }
  72. if (this.connectionState.getValue().status === "DIRECT_PUBLISH") {
  73. /* Note: Since the main outGoingMessage is being published irregardless
  74. of the connection state, so there's no need to do anything aside from
  75. releasing buffered messages which will be handled by handleConnectionStateChange */
  76. // additional logic here
  77. }
  78. }
  79. private handleConnectionStateChanges(state: ConnectionState): void {
  80. console.log(`${this.bufferIdentifier}: ${this.connectionState.getValue().status}`);
  81. if (state.status === "BUFFER") {
  82. if (state.payload && typeof state.payload !== "string") {
  83. this.bufferMessage(state.payload); // Buffer the last message immediately
  84. }
  85. }
  86. if (state.status === "DIRECT_PUBLISH") {
  87. // Relese the messages by inserting them into the outgoing Messages together.
  88. this.releaseBufferedMessages(this.messageStream);
  89. }
  90. }
  91. private async bufferMessage(message: Message): Promise<void> {
  92. if (this.messageModel) {
  93. try {
  94. // const newMessage = new this.messageModel(message);
  95. await this.messageModel.create(message);
  96. this.messageModel.countDocuments({}).then((count) => {
  97. console.log(`Message${(message.message as MessageLog).appData.msgId} saved to MongoDB buffer. There is ${count} messages in datatbase under ${this.bufferIdentifier} at the moment.`);
  98. });
  99. } catch (error) {
  100. console.error("Error saving message to MongoDB:", error);
  101. // Implement retry logic or additional error handling here
  102. }
  103. } else {
  104. this.messageBuffer.push(message); // Fallback to local buffer if model is not defined
  105. console.log(`pushing ${(message.message as MessageLog).appData.msgId} into local array buffer under ${this.bufferIdentifier}.... There is now ${this.messageBuffer.length} messages`);
  106. }
  107. }
  108. private releaseBufferedMessages(
  109. messageFromBuffer: Subject<Message>
  110. ): Promise<boolean> {
  111. return new Promise((resolve, reject) => {
  112. if (this.messageModel) {
  113. this.messageModel.countDocuments({}).then((count) => {
  114. console.log(`There is ${count} messages in database under ${this.bufferIdentifier} at the moment. Releasing them....`);
  115. });
  116. const stream = this.messageModel.find().cursor();
  117. stream.on("data", async (message) => {
  118. // Process each message individually`
  119. messageFromBuffer.next(message);
  120. });
  121. stream.on("error", (error) => {
  122. console.error("Error streaming messages from MongoDB:", error);
  123. reject(error);
  124. });
  125. stream.on("end", async () => {
  126. // Delete the data once it has been streamed
  127. try {
  128. if (this.messageModel) {
  129. await this.messageModel.deleteMany({});
  130. console.log("Data in Mongo deleted successfully.");
  131. } else {
  132. console.log(`Message Mongoose Model is not intiated properly...`);
  133. }
  134. } catch (err) {
  135. console.error("Error deleting data:", err);
  136. }
  137. resolve(true);
  138. });
  139. }
  140. if (!this.messageModel) {
  141. // If MongoDB model is not defined, use the local buffer
  142. console.log(`Releasing buffer Message under ${this.bufferIdentifier}: currently there is ${this.messageBuffer.length} messages to be released`);
  143. this.messageBuffer.forEach((message) =>
  144. this.messageStream.next(message)
  145. );
  146. this.messageBuffer.length = 0; // Clear the local buffer after transferring
  147. if (this.messageBuffer.length < 1) {
  148. resolve(true);
  149. } else {
  150. reject(`Somehow the array is not emptied. This should not happen`);
  151. }
  152. }
  153. });
  154. }
  155. public getStateObservable(): BehaviorSubject<ConnectionState> {
  156. return this.connectionState;
  157. }
  158. private async transferLocalBufferToMongoDB(): Promise<void> {
  159. return new Promise((resolve, reject) => {
  160. console.log(`Transferring local array buffered Message: currently there is ${this.messageBuffer.length}. Transferring to database...`);
  161. if (this.messageModel) {
  162. let locallyBufferedMessage: Observable<Message> = from(this.messageBuffer);
  163. locallyBufferedMessage.subscribe({
  164. next: async (message: Message) => {
  165. try {
  166. if (this.messageModel) {
  167. await this.messageModel.create(message);
  168. console.log(`Transferring ${(message.message as MessageLog).appData.msgId} into database.`);
  169. }
  170. } catch (error) {
  171. console.error("Error transferring message to MongoDB:", error);
  172. }
  173. },
  174. error: (err) => console.error(err),
  175. complete: () => {
  176. if (this.messageModel) {
  177. this.messageModel.countDocuments({}).then((count) => {
  178. console.log(`Local buffered message transfer completed. There is a total of ${count} messages in database at the moment.`)
  179. this.messageBuffer = [] // Clear local buffer after transferring
  180. });
  181. }
  182. },
  183. });
  184. }
  185. });
  186. }
  187. // Additional methods as required...
  188. }