|
@@ -67,19 +67,7 @@ export class BufferService {
|
|
this.bufferMessage(message);
|
|
this.bufferMessage(message);
|
|
}
|
|
}
|
|
if (this.connectionState.getValue().status === 'DIRECT_PUBLISH') {
|
|
if (this.connectionState.getValue().status === 'DIRECT_PUBLISH') {
|
|
- // console.log(`There is ${this.messageBuffer.length} messages in the buffer`) // somehw this is called repeatedly
|
|
|
|
- // do nothing for now
|
|
|
|
- // Start releasing buffered messages, but don't wait for it to complete
|
|
|
|
- // this.isBufferNotEmpty().then(isNotEmpty => {
|
|
|
|
- // if (isNotEmpty) {
|
|
|
|
- // // this.releaseBufferedMessages(this.messageFromBuffer);
|
|
|
|
- // // Continue to buffer new incoming message during the release process
|
|
|
|
- // this.bufferMessage(message);
|
|
|
|
- // } else {
|
|
|
|
- // // If buffer is empty, switch source and handle the message directly
|
|
|
|
- // this.messageToBePublished.next(message); // Handle the message directly
|
|
|
|
- // }
|
|
|
|
- // });
|
|
|
|
|
|
+ // additional logic here
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -92,25 +80,6 @@ export class BufferService {
|
|
}
|
|
}
|
|
if (state.status === 'DIRECT_PUBLISH') {
|
|
if (state.status === 'DIRECT_PUBLISH') {
|
|
this.releaseBufferedMessages(this.messageStream)
|
|
this.releaseBufferedMessages(this.messageStream)
|
|
- /* This is for mongo */
|
|
|
|
- // this.isBufferNotEmpty().then(isNotEmpty => {
|
|
|
|
- // if (isNotEmpty) {
|
|
|
|
- // this.currentMessageSource = this.messageFromBuffer
|
|
|
|
- // } else {
|
|
|
|
- // this.currentMessageSource = this.messageToBePublished
|
|
|
|
- // }
|
|
|
|
- // })
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private async isBufferNotEmpty(): Promise<boolean> {
|
|
|
|
- if (this.messageModel) {
|
|
|
|
- // Check the count in MongoDB
|
|
|
|
- const count = await this.messageModel.estimatedDocumentCount().exec();
|
|
|
|
- return count > 0;
|
|
|
|
- } else {
|
|
|
|
- // Check the local buffer
|
|
|
|
- return this.messageBuffer.length > 0;
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -119,7 +88,9 @@ export class BufferService {
|
|
try {
|
|
try {
|
|
// const newMessage = new this.messageModel(message);
|
|
// const newMessage = new this.messageModel(message);
|
|
await this.messageModel.create(message);
|
|
await this.messageModel.create(message);
|
|
- console.log(`Message${(message.message as MessageLog).appData.msgId} saved to MongoDB buffer`);
|
|
|
|
|
|
+ this.messageModel.countDocuments({}).then((count) => {
|
|
|
|
+ console.log(`Message${(message.message as MessageLog).appData.msgId} saved to MongoDB buffer. There is ${count} messages in datatbase at the moment.`);
|
|
|
|
+ })
|
|
} catch (error) {
|
|
} catch (error) {
|
|
console.error('Error saving message to MongoDB:', error);
|
|
console.error('Error saving message to MongoDB:', error);
|
|
// Implement retry logic or additional error handling here
|
|
// Implement retry logic or additional error handling here
|