123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389 |
- // bufferService.ts
- import { BehaviorSubject, Observable, Subject, from } from "rxjs";
- import mongoose, { Connection, Model } from "mongoose";
- import { ConnectionState, Message, MessageLog, } from "../interfaces/general.interface";
- import * as fs from 'fs';
- export class BufferService {
- private bufferIdentifier!: string
- private messageStream: Subject<Message>;
- private connectionState: BehaviorSubject<ConnectionState>;
- private messageBuffered: Message[] = [];
- private messageModel!: Model<Message>
- private readonly dbUrl!: string
- private bufferLimit!: number
- constructor(
- messageFromApp: Subject<Message>,
- connectionStateSubject: BehaviorSubject<ConnectionState>,
- dbName: string,
- bufferLimit?: number
- ) {
- if (bufferLimit) {
- this.bufferLimit = bufferLimit
- } else {
- this.bufferLimit = 10000 // default buffer limit
- }
- this.bufferIdentifier = dbName
- this.messageStream = messageFromApp;
- this.connectionState = connectionStateSubject;
- this.setupSubscriptions(); // Note: The handle buffer will push the data in local array before pushing to mongo via initial check up model
- /* Disabled for now due to missing data not during transmision. The issue was suspected to be it's async nature of confusing timing
- when it was queued into the event queue. Sometimes the messages will be late to be saved */
- // this.initializeDatabaseConnection(dbName)
- // .then(async (connection: mongoose.Connection) => {
- // const grpcMessageSchema = require("../models/message.schema");
- // this.messageModel = connection.model<Message>(
- // "Message",
- // grpcMessageSchema
- // );
- // await this.transferLocalBufferToMongoDB(); // transfer all data from local array into mongodb after the mongo setup is complete
- // })
- // .catch((error) => {
- // console.error("Database initialization failed:", error);
- // // Implement retry logic or additional error handling here. Perhaps retry logic in the future...
- // });
- }
- // to be exposed to acquire the messages
- public getMessages(): Observable<Message> {
- return this.messageStream as Observable<Message>;
- }
- public getStateObservable(): BehaviorSubject<ConnectionState> {
- return this.connectionState;
- }
- // To subscrcibe for the message stream as well as the connection state
- private setupSubscriptions(): void {
- this.messageStream.subscribe({
- next: (message: Message) => this.handleIncomingMessage(message),
- error: (err) =>
- console.error("Error in messageToBePublished subject:", err),
- complete: () =>
- console.log("messageToBePublished subscription completed"),
- });
- this.connectionState.subscribe({
- next: (state: ConnectionState) => this.handleConnectionStateChanges(state),
- error: (err) => console.error("Error in connectionState subject:", err),
- complete: () => console.log("connectionState subscription completed"),
- });
- }
- private async initializeDatabaseConnection(
- dbName: string
- ): Promise<Connection> {
- try {
- console.log(`${this.dbUrl}${dbName}`);
- const connection: mongoose.Connection = await mongoose.createConnection(
- `${this.dbUrl}${dbName}`
- );
- console.log(`Connected to ${this.dbUrl}${dbName}`);
- return connection;
- } catch (error) {
- console.error("Error connecting to MongoDB:", error);
- throw error;
- }
- }
- private handleIncomingMessage(message: any): void {
- if (this.connectionState.getValue().status === `LIMIT_EXCEEDED`) {
- // do nothing... Let handleConnectionStateChanges deal with this state
- }
- if (this.connectionState.getValue().status === "BUFFER") {
- this.fileManagement(message, 'buffer');
- this.bufferMessage(message);
- }
- if (this.connectionState.getValue().status === "DIRECT_PUBLISH") {
- /* Note: Since the main outGoingMessage is being published irregardless
- of the connection state, so there's no need to do anything aside from
- releasing buffered messages which will be handled by handleConnectionStateChange */
- // additional logic here
- }
- }
- private handleConnectionStateChanges(state: ConnectionState): void {
- console.log(`${this.bufferIdentifier}: ${this.connectionState.getValue().status}`);
- if (state.status === `LIMIT_EXCEEDED`) {
- console.log(`Limit exceed. Clearing buffered messages...`)
- let message: Message = {
- id: `test`,
- message: `Limit exceed. Please take care. Buffer Service Out!`
- }
- this.messageStream.next(message)
- // this.messageStream.unsubscribe() //destroy existing subscription
- this.messageBuffered = []
- }
- if (state.status === "BUFFER") {
- if (state.payload && typeof state.payload !== "string") {
- this.bufferMessage(state.payload); // Buffer the last message immediately
- }
- }
- if (state.status === "DIRECT_PUBLISH") {
- // Relese the messages by inserting them into the outgoing Messages together.
- this.releaseBufferedMessages(this.messageStream);
- }
- // testing
- if(state.status === "TARGET_PUBLISH") {
- this.fileManagement(state, 'release');
- this.releaseTargetBufferedMessage(state);
- }
- }
- releaseTargetBufferedMessage(message: ConnectionState) {
- if(this.bufferIdentifier === "file") {
- }
- let target = this.messageBuffered.find((data: Message) => {
- if(data.id === message.uuid) {
- return data;
- }
- })
- if(target) {
- this.messageBuffered = this.messageBuffered.filter((data: Message) => {
- if(data.id !== message.uuid) {
- return data;
- }
- })
- }
- console.log(`TESTING : Releasing buffer Message under ${this.bufferIdentifier}: currently there is ${this.messageBuffered.length} messages to be released`);
- }
- fileManagement(message: any, type: 'buffer' | 'release') {
- let filePath = "./_LocalStorage/retransmissionData.json";
- // DECLARATION : status chain
- let checkStatus: boolean = true;
- let errorMessage: any;
- let fileContent: any;
- if(checkStatus === true) {
- if(this.bufferIdentifier !== "file") {
- errorMessage = "not file storage, skip.";
- checkStatus = false;
- }
- }
- // CONDITION : check state log file exists or not
- if(checkStatus === true) {
- if(fs.existsSync(filePath)) {
- fileContent = fs.readFileSync(filePath, 'utf-8');
- }
- else{
- fileContent = JSON.stringify({ events: [] });
- }
- }
- // CONDITION : fileContent is empty
- if(checkStatus === true) {
- if(fileContent === undefined && fileContent === null || fileContent === "") {
- fileContent = JSON.stringify({ events: [] });
- }
- }
- // TASK : json parse fileContent
- if(checkStatus === true) {
- fileContent = JSON.parse(fileContent);
- }
- // TASK : add new message into fileContent
- if(checkStatus === true) {
- if(type === "buffer") {
- fileContent.events.push(message);
- }
- }
- // TASK : add new message into fileContent
- if(checkStatus === true) {
- if(type === "release") {
- fileContent.events = fileContent.events.filter((data) => {
- if(data.id !== message.uuid) {
- return data
- }
- })
- }
- }
- // TASK : write file
- if(checkStatus === true) {
- try {
- fs.writeFileSync(filePath, JSON.stringify(fileContent, null, 4), 'utf-8');
- console.log('\u001b[' + 32 + 'm' + "RE-TRANSMISSION FILE : ADDED NEW DATA..." + '\u001b[0m');
- }catch(error) {
- errorMessage = error;
- checkStatus = false;
- }
- }
- // CONDITION : ERROR
- if(checkStatus === false) {
- console.log('\u001b[' + 31 + 'm' + "RE-TRANSMISSION FILE : ERROR -> " + JSON.stringify(errorMessage,null,4) + '\u001b[0m');
- }
- }
- private async bufferMessage(message: any): Promise<void> {
- if (this.messageModel) {
- try {
- // const newMessage = new this.messageModel(message);
- await this.messageModel.create(message);
- this.messageModel.countDocuments({}).then((count) => {
- console.log(`Message${(message.message as MessageLog).appData.msgId} saved to MongoDB buffer. There is ${count} messages in datatbase under ${this.bufferIdentifier} at the moment.`);
- // if connection status okay
- // if(this.connectionState.getValue().status == "DIRECT_PUBLISH")
- // {
- // console.log("Message count release " + count);
- // // Then release back to message stream
- // this.releaseBufferedMessages(this.messageStream);
- // }
- });
- } catch (error) {
- console.error("Error saving message to MongoDB:", error);
- // Implement retry logic or additional error handling here
- }
- } else {
- if (this.bufferLimit > this.messageBuffered.length) {
- this.messageBuffered.push(message);
- // console.log(this.messageBuffered) // Fallback to local buffer if model is not defined
- console.log(`pushing into local array buffer under ${this.bufferIdentifier}.... There is now ${this.messageBuffered.length} messages`);
- } else {
- let reportState: ConnectionState = {
- status: `LIMIT_EXCEEDED`,
- reason: `${this.bufferLimit} Limit exceeded. Buffer Service will be terminated...`
- }
- this.connectionState.next(reportState)
- }
- }
- }
- private releaseBufferedMessages(
- messageFromBuffer: Subject<Message>
- ): Promise<boolean> {
- return new Promise(async (resolve, reject) => {
- if (this.messageModel) {
- try {
- // use then
- let countPromise = checkMessageCount(this.messageModel, this.bufferIdentifier);
- countPromise.then(async (amount) => {
- console.log("Amount1:" + amount);
- // let countPromise = checkMessageCount(this.messageModel, this.bufferIdentifier);
- // countPromise.then(async (amount)=>{
- // console.log("Amount2:"+amount);
- // })
- while (amount > 0) {
- console.log("AmountInLoop1:" + amount)
- try {
- await extractData(messageFromBuffer, this.messageModel); // New function to process a batch
- } catch (processError) {
- console.error('Error processing batch:', processError);
- }
- console.log('Checking again...');
- amount = await checkMessageCount(this.messageModel, this.bufferIdentifier);
- console.log("AmountInLoop:" + amount)
- }
- console.log('All messages extracted.');
- })
- let amount: number = await countPromise
- resolve(true);
- } catch (error) {
- console.error('Error in releaseBufferedMessages:', error);
- reject(false);
- }
- async function checkMessageCount(messageModel: Model<Message>, bufferIdentifier: string): Promise<any> {
- return new Promise((resolve, reject) => {
- messageModel.countDocuments({}).then((count) => {
- console.log(`There is ${count} messages in database under ${bufferIdentifier} at the moment. Releasing them....`);
- resolve(count)
- }).catch((error) => {
- console.error(error)
- reject(error)
- })
- })
- }
- // Stream all the data inside the database out and deleting them
- async function extractData(messageFromBuffer: Subject<Message>, messageModel: Model<Message>): Promise<any> {
- return new Promise((resolve, reject) => {
- const stream = messageModel.find().cursor();
- stream.on("data", async (message) => {
- // Process each message individually`
- messageFromBuffer.next(message);
- });
- stream.on("error", (error) => {
- console.error("Error streaming messages from MongoDB:", error);
- reject(error);
- });
- stream.on("end", async () => {
- // Delete the data once it has been streamed
- try {
- if (messageModel) {
- await messageModel.deleteMany({});
- console.log("Data in Mongo deleted successfully.");
- } else {
- console.log(`Message Mongoose Model is not intiated properly...`);
- }
- } catch (err) {
- console.error("Error deleting data:", err);
- }
- resolve(true);
- });
- })
- }
- }
- if (!this.messageModel) {
- // If MongoDB model is not defined, use the local buffer
- console.log(`Releasing buffer Message under ${this.bufferIdentifier}: currently there is ${this.messageBuffered.length} messages to be released`);
- this.messageBuffered.forEach((message) =>
- this.messageStream.next(message)
- );
- this.messageBuffered.length = 0; // Clear the local buffer after transferring
- if (this.messageBuffered.length < 1) {
- resolve(true);
- } else {
- reject(`Somehow the array is not emptied. This should not happen`);
- }
- }
- });
- }
- private async transferLocalBufferToMongoDB(): Promise<void> {
- return new Promise((resolve, reject) => {
- console.log(`Transferring local array buffered Message under ${this.bufferIdentifier}: currently there is ${this.messageBuffered.length}. Transferring to database...`);
- if (this.messageModel) {
- let locallyBufferedMessage: Observable<Message> = from(this.messageBuffered);
- locallyBufferedMessage.subscribe({
- next: async (message: Message) => {
- try {
- if (this.messageModel) {
- await this.messageModel.create(message);
- console.log(`Transferring ${(message.message as MessageLog).appData.msgId} into database.`);
- }
- } catch (error) {
- console.error("Error transferring message to MongoDB:", error);
- }
- },
- error: (err) => console.error(err),
- complete: () => {
- if (this.messageModel) {
- this.messageModel.countDocuments({}).then((count) => {
- console.log(`Local buffered message transfer completed. There is a total of ${count} messages in database ${this.bufferIdentifier} at the moment.`)
- this.messageBuffered = [] // Clear local buffer after transferring
- });
- }
- },
- });
- }
- });
- }
- // Additional methods as required...
- }
|