buffer.service.ts 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. // bufferService.ts
  2. import { BehaviorSubject, Observable, Subject, from } from "rxjs";
  3. import mongoose, { Connection, Model } from "mongoose";
  4. import { ConnectionState, Message, MessageLog, } from "../interfaces/general.interface";
  5. import * as fs from 'fs';
  6. export class BufferService {
  7. private bufferIdentifier!: string
  8. private messageStream: Subject<Message>;
  9. private connectionState: BehaviorSubject<ConnectionState>;
  10. private messageBuffered: Message[] = [];
  11. private messageModel!: Model<Message>
  12. private readonly dbUrl!: string
  13. private bufferLimit!: number
  14. constructor(
  15. messageFromApp: Subject<Message>,
  16. connectionStateSubject: BehaviorSubject<ConnectionState>,
  17. dbName: string,
  18. bufferLimit?: number
  19. ) {
  20. if (bufferLimit) {
  21. this.bufferLimit = bufferLimit
  22. } else {
  23. this.bufferLimit = 10000 // default buffer limit
  24. }
  25. this.bufferIdentifier = dbName
  26. this.messageStream = messageFromApp;
  27. this.connectionState = connectionStateSubject;
  28. this.setupSubscriptions(); // Note: The handle buffer will push the data in local array before pushing to mongo via initial check up model
  29. /* Disabled for now due to missing data not during transmision. The issue was suspected to be it's async nature of confusing timing
  30. when it was queued into the event queue. Sometimes the messages will be late to be saved */
  31. // this.initializeDatabaseConnection(dbName)
  32. // .then(async (connection: mongoose.Connection) => {
  33. // const grpcMessageSchema = require("../models/message.schema");
  34. // this.messageModel = connection.model<Message>(
  35. // "Message",
  36. // grpcMessageSchema
  37. // );
  38. // await this.transferLocalBufferToMongoDB(); // transfer all data from local array into mongodb after the mongo setup is complete
  39. // })
  40. // .catch((error) => {
  41. // console.error("Database initialization failed:", error);
  42. // // Implement retry logic or additional error handling here. Perhaps retry logic in the future...
  43. // });
  44. }
  45. // to be exposed to acquire the messages
  46. public getMessages(): Observable<Message> {
  47. return this.messageStream as Observable<Message>;
  48. }
  49. public getStateObservable(): BehaviorSubject<ConnectionState> {
  50. return this.connectionState;
  51. }
  52. // To subscrcibe for the message stream as well as the connection state
  53. private setupSubscriptions(): void {
  54. this.messageStream.subscribe({
  55. next: (message: Message) => this.handleIncomingMessage(message),
  56. error: (err) =>
  57. console.error("Error in messageToBePublished subject:", err),
  58. complete: () =>
  59. console.log("messageToBePublished subscription completed"),
  60. });
  61. this.connectionState.subscribe({
  62. next: (state: ConnectionState) => this.handleConnectionStateChanges(state),
  63. error: (err) => console.error("Error in connectionState subject:", err),
  64. complete: () => console.log("connectionState subscription completed"),
  65. });
  66. }
  67. private async initializeDatabaseConnection(
  68. dbName: string
  69. ): Promise<Connection> {
  70. try {
  71. console.log(`${this.dbUrl}${dbName}`);
  72. const connection: mongoose.Connection = await mongoose.createConnection(
  73. `${this.dbUrl}${dbName}`
  74. );
  75. console.log(`Connected to ${this.dbUrl}${dbName}`);
  76. return connection;
  77. } catch (error) {
  78. console.error("Error connecting to MongoDB:", error);
  79. throw error;
  80. }
  81. }
  82. private handleIncomingMessage(message: any): void {
  83. if (this.connectionState.getValue().status === `LIMIT_EXCEEDED`) {
  84. // do nothing... Let handleConnectionStateChanges deal with this state
  85. }
  86. if (this.connectionState.getValue().status === "BUFFER") {
  87. this.fileManagement(message, 'buffer');
  88. this.bufferMessage(message);
  89. }
  90. if (this.connectionState.getValue().status === "DIRECT_PUBLISH") {
  91. /* Note: Since the main outGoingMessage is being published irregardless
  92. of the connection state, so there's no need to do anything aside from
  93. releasing buffered messages which will be handled by handleConnectionStateChange */
  94. // additional logic here
  95. }
  96. }
  97. private handleConnectionStateChanges(state: ConnectionState): void {
  98. console.log(`${this.bufferIdentifier}: ${this.connectionState.getValue().status}`);
  99. if (state.status === `LIMIT_EXCEEDED`) {
  100. console.log(`Limit exceed. Clearing buffered messages...`)
  101. let message: Message = {
  102. id: `test`,
  103. message: `Limit exceed. Please take care. Buffer Service Out!`
  104. }
  105. this.messageStream.next(message)
  106. // this.messageStream.unsubscribe() //destroy existing subscription
  107. this.messageBuffered = []
  108. }
  109. if (state.status === "BUFFER") {
  110. if (state.payload && typeof state.payload !== "string") {
  111. this.bufferMessage(state.payload); // Buffer the last message immediately
  112. }
  113. }
  114. if (state.status === "DIRECT_PUBLISH") {
  115. // Relese the messages by inserting them into the outgoing Messages together.
  116. this.releaseBufferedMessages(this.messageStream);
  117. }
  118. // testing
  119. if(state.status === "TARGET_PUBLISH") {
  120. this.fileManagement(state, 'release');
  121. this.releaseTargetBufferedMessage(state);
  122. }
  123. }
  124. releaseTargetBufferedMessage(message: ConnectionState) {
  125. if(this.bufferIdentifier === "file") {
  126. }
  127. let target = this.messageBuffered.find((data: Message) => {
  128. if(data.id === message.uuid) {
  129. return data;
  130. }
  131. })
  132. if(target) {
  133. this.messageBuffered = this.messageBuffered.filter((data: Message) => {
  134. if(data.id !== message.uuid) {
  135. return data;
  136. }
  137. })
  138. }
  139. console.log(`TESTING : Releasing buffer Message under ${this.bufferIdentifier}: currently there is ${this.messageBuffered.length} messages to be released`);
  140. }
  141. fileManagement(message: any, type: 'buffer' | 'release') {
  142. let filePath = "./_LocalStorage/retransmissionData.json";
  143. // DECLARATION : status chain
  144. let checkStatus: boolean = true;
  145. let errorMessage: any;
  146. let fileContent: any;
  147. if(checkStatus === true) {
  148. if(this.bufferIdentifier !== "file") {
  149. errorMessage = "not file storage, skip.";
  150. checkStatus = false;
  151. }
  152. }
  153. // CONDITION : check state log file exists or not
  154. if(checkStatus === true) {
  155. if(fs.existsSync(filePath)) {
  156. fileContent = fs.readFileSync(filePath, 'utf-8');
  157. }
  158. else{
  159. fileContent = JSON.stringify({ events: [] });
  160. }
  161. }
  162. // CONDITION : fileContent is empty
  163. if(checkStatus === true) {
  164. if(fileContent === undefined && fileContent === null || fileContent === "") {
  165. fileContent = JSON.stringify({ events: [] });
  166. }
  167. }
  168. // TASK : json parse fileContent
  169. if(checkStatus === true) {
  170. fileContent = JSON.parse(fileContent);
  171. }
  172. // TASK : add new message into fileContent
  173. if(checkStatus === true) {
  174. if(type === "buffer") {
  175. fileContent.events.push(message);
  176. }
  177. }
  178. // TASK : add new message into fileContent
  179. if(checkStatus === true) {
  180. if(type === "release") {
  181. fileContent.events = fileContent.events.filter((data) => {
  182. if(data.id !== message.uuid) {
  183. return data
  184. }
  185. })
  186. }
  187. }
  188. // TASK : write file
  189. if(checkStatus === true) {
  190. try {
  191. fs.writeFileSync(filePath, JSON.stringify(fileContent, null, 4), 'utf-8');
  192. console.log('\u001b[' + 32 + 'm' + "RE-TRANSMISSION FILE : ADDED NEW DATA..." + '\u001b[0m');
  193. }catch(error) {
  194. errorMessage = error;
  195. checkStatus = false;
  196. }
  197. }
  198. // CONDITION : ERROR
  199. if(checkStatus === false) {
  200. console.log('\u001b[' + 31 + 'm' + "RE-TRANSMISSION FILE : ERROR -> " + JSON.stringify(errorMessage,null,4) + '\u001b[0m');
  201. }
  202. }
  203. private async bufferMessage(message: any): Promise<void> {
  204. if (this.messageModel) {
  205. try {
  206. // const newMessage = new this.messageModel(message);
  207. await this.messageModel.create(message);
  208. this.messageModel.countDocuments({}).then((count) => {
  209. console.log(`Message${(message.message as MessageLog).appData.msgId} saved to MongoDB buffer. There is ${count} messages in datatbase under ${this.bufferIdentifier} at the moment.`);
  210. // if connection status okay
  211. // if(this.connectionState.getValue().status == "DIRECT_PUBLISH")
  212. // {
  213. // console.log("Message count release " + count);
  214. // // Then release back to message stream
  215. // this.releaseBufferedMessages(this.messageStream);
  216. // }
  217. });
  218. } catch (error) {
  219. console.error("Error saving message to MongoDB:", error);
  220. // Implement retry logic or additional error handling here
  221. }
  222. } else {
  223. if (this.bufferLimit > this.messageBuffered.length) {
  224. this.messageBuffered.push(message);
  225. // console.log(this.messageBuffered) // Fallback to local buffer if model is not defined
  226. console.log(`pushing into local array buffer under ${this.bufferIdentifier}.... There is now ${this.messageBuffered.length} messages`);
  227. } else {
  228. let reportState: ConnectionState = {
  229. status: `LIMIT_EXCEEDED`,
  230. reason: `${this.bufferLimit} Limit exceeded. Buffer Service will be terminated...`
  231. }
  232. this.connectionState.next(reportState)
  233. }
  234. }
  235. }
  236. private releaseBufferedMessages(
  237. messageFromBuffer: Subject<Message>
  238. ): Promise<boolean> {
  239. return new Promise(async (resolve, reject) => {
  240. if (this.messageModel) {
  241. try {
  242. // use then
  243. let countPromise = checkMessageCount(this.messageModel, this.bufferIdentifier);
  244. countPromise.then(async (amount) => {
  245. console.log("Amount1:" + amount);
  246. // let countPromise = checkMessageCount(this.messageModel, this.bufferIdentifier);
  247. // countPromise.then(async (amount)=>{
  248. // console.log("Amount2:"+amount);
  249. // })
  250. while (amount > 0) {
  251. console.log("AmountInLoop1:" + amount)
  252. try {
  253. await extractData(messageFromBuffer, this.messageModel); // New function to process a batch
  254. } catch (processError) {
  255. console.error('Error processing batch:', processError);
  256. }
  257. console.log('Checking again...');
  258. amount = await checkMessageCount(this.messageModel, this.bufferIdentifier);
  259. console.log("AmountInLoop:" + amount)
  260. }
  261. console.log('All messages extracted.');
  262. })
  263. let amount: number = await countPromise
  264. resolve(true);
  265. } catch (error) {
  266. console.error('Error in releaseBufferedMessages:', error);
  267. reject(false);
  268. }
  269. async function checkMessageCount(messageModel: Model<Message>, bufferIdentifier: string): Promise<any> {
  270. return new Promise((resolve, reject) => {
  271. messageModel.countDocuments({}).then((count) => {
  272. console.log(`There is ${count} messages in database under ${bufferIdentifier} at the moment. Releasing them....`);
  273. resolve(count)
  274. }).catch((error) => {
  275. console.error(error)
  276. reject(error)
  277. })
  278. })
  279. }
  280. // Stream all the data inside the database out and deleting them
  281. async function extractData(messageFromBuffer: Subject<Message>, messageModel: Model<Message>): Promise<any> {
  282. return new Promise((resolve, reject) => {
  283. const stream = messageModel.find().cursor();
  284. stream.on("data", async (message) => {
  285. // Process each message individually`
  286. messageFromBuffer.next(message);
  287. });
  288. stream.on("error", (error) => {
  289. console.error("Error streaming messages from MongoDB:", error);
  290. reject(error);
  291. });
  292. stream.on("end", async () => {
  293. // Delete the data once it has been streamed
  294. try {
  295. if (messageModel) {
  296. await messageModel.deleteMany({});
  297. console.log("Data in Mongo deleted successfully.");
  298. } else {
  299. console.log(`Message Mongoose Model is not intiated properly...`);
  300. }
  301. } catch (err) {
  302. console.error("Error deleting data:", err);
  303. }
  304. resolve(true);
  305. });
  306. })
  307. }
  308. }
  309. if (!this.messageModel) {
  310. // If MongoDB model is not defined, use the local buffer
  311. console.log(`Releasing buffer Message under ${this.bufferIdentifier}: currently there is ${this.messageBuffered.length} messages to be released`);
  312. this.messageBuffered.forEach((message) =>
  313. this.messageStream.next(message)
  314. );
  315. this.messageBuffered.length = 0; // Clear the local buffer after transferring
  316. if (this.messageBuffered.length < 1) {
  317. resolve(true);
  318. } else {
  319. reject(`Somehow the array is not emptied. This should not happen`);
  320. }
  321. }
  322. });
  323. }
  324. private async transferLocalBufferToMongoDB(): Promise<void> {
  325. return new Promise((resolve, reject) => {
  326. console.log(`Transferring local array buffered Message under ${this.bufferIdentifier}: currently there is ${this.messageBuffered.length}. Transferring to database...`);
  327. if (this.messageModel) {
  328. let locallyBufferedMessage: Observable<Message> = from(this.messageBuffered);
  329. locallyBufferedMessage.subscribe({
  330. next: async (message: Message) => {
  331. try {
  332. if (this.messageModel) {
  333. await this.messageModel.create(message);
  334. console.log(`Transferring ${(message.message as MessageLog).appData.msgId} into database.`);
  335. }
  336. } catch (error) {
  337. console.error("Error transferring message to MongoDB:", error);
  338. }
  339. },
  340. error: (err) => console.error(err),
  341. complete: () => {
  342. if (this.messageModel) {
  343. this.messageModel.countDocuments({}).then((count) => {
  344. console.log(`Local buffered message transfer completed. There is a total of ${count} messages in database ${this.bufferIdentifier} at the moment.`)
  345. this.messageBuffered = [] // Clear local buffer after transferring
  346. });
  347. }
  348. },
  349. });
  350. }
  351. });
  352. }
  353. // Additional methods as required...
  354. }