|
@@ -59,14 +59,16 @@ export class BufferService {
|
|
next: (message: Message) => this.handleIncomingMessage(message),
|
|
next: (message: Message) => this.handleIncomingMessage(message),
|
|
error: (err) =>
|
|
error: (err) =>
|
|
console.error("Error in messageToBePublished subject:", err),
|
|
console.error("Error in messageToBePublished subject:", err),
|
|
- complete: () =>
|
|
|
|
- console.log("messageToBePublished subscription completed"),
|
|
|
|
|
|
+ complete: () =>{
|
|
|
|
+ //console.log("messageToBePublished subscription completed")
|
|
|
|
+ },
|
|
});
|
|
});
|
|
|
|
|
|
this.connectionState.subscribe({
|
|
this.connectionState.subscribe({
|
|
next: (state: ConnectionState) => this.handleConnectionStateChanges(state),
|
|
next: (state: ConnectionState) => this.handleConnectionStateChanges(state),
|
|
error: (err) => console.error("Error in connectionState subject:", err),
|
|
error: (err) => console.error("Error in connectionState subject:", err),
|
|
- complete: () => console.log("connectionState subscription completed"),
|
|
|
|
|
|
+ complete: () => {//console.log("connectionState subscription completed")
|
|
|
|
+ },
|
|
});
|
|
});
|
|
}
|
|
}
|
|
|
|
|
|
@@ -74,11 +76,11 @@ export class BufferService {
|
|
dbName: string
|
|
dbName: string
|
|
): Promise<Connection> {
|
|
): Promise<Connection> {
|
|
try {
|
|
try {
|
|
- console.log(`${this.dbUrl}${dbName}`);
|
|
|
|
|
|
+ //console.log(`${this.dbUrl}${dbName}`);
|
|
const connection: mongoose.Connection = await mongoose.createConnection(
|
|
const connection: mongoose.Connection = await mongoose.createConnection(
|
|
`${this.dbUrl}${dbName}`
|
|
`${this.dbUrl}${dbName}`
|
|
);
|
|
);
|
|
- console.log(`Connected to ${this.dbUrl}${dbName}`);
|
|
|
|
|
|
+ //console.log(`Connected to ${this.dbUrl}${dbName}`);
|
|
return connection;
|
|
return connection;
|
|
} catch (error) {
|
|
} catch (error) {
|
|
console.error("Error connecting to MongoDB:", error);
|
|
console.error("Error connecting to MongoDB:", error);
|
|
@@ -102,9 +104,9 @@ export class BufferService {
|
|
}
|
|
}
|
|
|
|
|
|
private handleConnectionStateChanges(state: ConnectionState): void {
|
|
private handleConnectionStateChanges(state: ConnectionState): void {
|
|
- console.log(`${this.bufferIdentifier}: ${this.connectionState.getValue().status}`);
|
|
|
|
|
|
+ //console.log(`${this.bufferIdentifier}: ${this.connectionState.getValue().status}`);
|
|
if (state.status === `LIMIT_EXCEEDED`) {
|
|
if (state.status === `LIMIT_EXCEEDED`) {
|
|
- console.log(`Limit exceed. Clearing buffered messages...`)
|
|
|
|
|
|
+ //console.log(`Limit exceed. Clearing buffered messages...`)
|
|
let message: Message = {
|
|
let message: Message = {
|
|
id: `test`,
|
|
id: `test`,
|
|
message: `Limit exceed. Please take care. Buffer Service Out!`
|
|
message: `Limit exceed. Please take care. Buffer Service Out!`
|
|
@@ -130,11 +132,11 @@ export class BufferService {
|
|
// const newMessage = new this.messageModel(message);
|
|
// const newMessage = new this.messageModel(message);
|
|
await this.messageModel.create(message);
|
|
await this.messageModel.create(message);
|
|
this.messageModel.countDocuments({}).then((count) => {
|
|
this.messageModel.countDocuments({}).then((count) => {
|
|
- console.log(`Message${(message.message as MessageLog).appData.msgId} saved to MongoDB buffer. There is ${count} messages in datatbase under ${this.bufferIdentifier} at the moment.`);
|
|
|
|
|
|
+ //console.log(`Message${(message.message as MessageLog).appData.msgId} saved to MongoDB buffer. There is ${count} messages in datatbase under ${this.bufferIdentifier} at the moment.`);
|
|
// if connection status okay
|
|
// if connection status okay
|
|
// if(this.connectionState.getValue().status == "DIRECT_PUBLISH")
|
|
// if(this.connectionState.getValue().status == "DIRECT_PUBLISH")
|
|
// {
|
|
// {
|
|
- // console.log("Message count release " + count);
|
|
|
|
|
|
+ // //console.log("Message count release " + count);
|
|
// // Then release back to message stream
|
|
// // Then release back to message stream
|
|
// this.releaseBufferedMessages(this.messageStream);
|
|
// this.releaseBufferedMessages(this.messageStream);
|
|
// }
|
|
// }
|
|
@@ -146,8 +148,8 @@ export class BufferService {
|
|
} else {
|
|
} else {
|
|
if (this.bufferLimit > this.messageBuffered.length) {
|
|
if (this.bufferLimit > this.messageBuffered.length) {
|
|
this.messageBuffered.push(message);
|
|
this.messageBuffered.push(message);
|
|
- console.log(this.messageBuffered) // Fallback to local buffer if model is not defined
|
|
|
|
- console.log(`pushing into local array buffer under ${this.bufferIdentifier}.... There is now ${this.messageBuffered.length} messages`);
|
|
|
|
|
|
+ //console.log(this.messageBuffered) // Fallback to local buffer if model is not defined
|
|
|
|
+ //console.log(`pushing into local array buffer under ${this.bufferIdentifier}.... There is now ${this.messageBuffered.length} messages`);
|
|
} else {
|
|
} else {
|
|
let reportState: ConnectionState = {
|
|
let reportState: ConnectionState = {
|
|
status: `LIMIT_EXCEEDED`,
|
|
status: `LIMIT_EXCEEDED`,
|
|
@@ -167,26 +169,26 @@ export class BufferService {
|
|
// use then
|
|
// use then
|
|
let countPromise = checkMessageCount(this.messageModel, this.bufferIdentifier);
|
|
let countPromise = checkMessageCount(this.messageModel, this.bufferIdentifier);
|
|
countPromise.then(async (amount) => {
|
|
countPromise.then(async (amount) => {
|
|
- console.log("Amount1:" + amount);
|
|
|
|
|
|
+ //console.log("Amount1:" + amount);
|
|
|
|
|
|
// let countPromise = checkMessageCount(this.messageModel, this.bufferIdentifier);
|
|
// let countPromise = checkMessageCount(this.messageModel, this.bufferIdentifier);
|
|
// countPromise.then(async (amount)=>{
|
|
// countPromise.then(async (amount)=>{
|
|
|
|
|
|
- // console.log("Amount2:"+amount);
|
|
|
|
|
|
+ // //console.log("Amount2:"+amount);
|
|
// })
|
|
// })
|
|
|
|
|
|
while (amount > 0) {
|
|
while (amount > 0) {
|
|
- console.log("AmountInLoop1:" + amount)
|
|
|
|
|
|
+ //console.log("AmountInLoop1:" + amount)
|
|
try {
|
|
try {
|
|
await extractData(messageFromBuffer, this.messageModel); // New function to process a batch
|
|
await extractData(messageFromBuffer, this.messageModel); // New function to process a batch
|
|
} catch (processError) {
|
|
} catch (processError) {
|
|
console.error('Error processing batch:', processError);
|
|
console.error('Error processing batch:', processError);
|
|
}
|
|
}
|
|
- console.log('Checking again...');
|
|
|
|
|
|
+ //console.log('Checking again...');
|
|
amount = await checkMessageCount(this.messageModel, this.bufferIdentifier);
|
|
amount = await checkMessageCount(this.messageModel, this.bufferIdentifier);
|
|
- console.log("AmountInLoop:" + amount)
|
|
|
|
|
|
+ //console.log("AmountInLoop:" + amount)
|
|
}
|
|
}
|
|
- console.log('All messages extracted.');
|
|
|
|
|
|
+ //console.log('All messages extracted.');
|
|
})
|
|
})
|
|
let amount: number = await countPromise
|
|
let amount: number = await countPromise
|
|
|
|
|
|
@@ -199,7 +201,7 @@ export class BufferService {
|
|
async function checkMessageCount(messageModel: Model<Message>, bufferIdentifier: string): Promise<any> {
|
|
async function checkMessageCount(messageModel: Model<Message>, bufferIdentifier: string): Promise<any> {
|
|
return new Promise((resolve, reject) => {
|
|
return new Promise((resolve, reject) => {
|
|
messageModel.countDocuments({}).then((count) => {
|
|
messageModel.countDocuments({}).then((count) => {
|
|
- console.log(`There is ${count} messages in database under ${bufferIdentifier} at the moment. Releasing them....`);
|
|
|
|
|
|
+ //console.log(`There is ${count} messages in database under ${bufferIdentifier} at the moment. Releasing them....`);
|
|
resolve(count)
|
|
resolve(count)
|
|
}).catch((error) => {
|
|
}).catch((error) => {
|
|
console.error(error)
|
|
console.error(error)
|
|
@@ -228,9 +230,9 @@ export class BufferService {
|
|
try {
|
|
try {
|
|
if (messageModel) {
|
|
if (messageModel) {
|
|
await messageModel.deleteMany({});
|
|
await messageModel.deleteMany({});
|
|
- console.log("Data in Mongo deleted successfully.");
|
|
|
|
|
|
+ //console.log("Data in Mongo deleted successfully.");
|
|
} else {
|
|
} else {
|
|
- console.log(`Message Mongoose Model is not intiated properly...`);
|
|
|
|
|
|
+ //console.log(`Message Mongoose Model is not intiated properly...`);
|
|
}
|
|
}
|
|
} catch (err) {
|
|
} catch (err) {
|
|
console.error("Error deleting data:", err);
|
|
console.error("Error deleting data:", err);
|
|
@@ -242,7 +244,7 @@ export class BufferService {
|
|
}
|
|
}
|
|
if (!this.messageModel) {
|
|
if (!this.messageModel) {
|
|
// If MongoDB model is not defined, use the local buffer
|
|
// If MongoDB model is not defined, use the local buffer
|
|
- console.log(`Releasing buffer Message under ${this.bufferIdentifier}: currently there is ${this.messageBuffered.length} messages to be released`);
|
|
|
|
|
|
+ //console.log(`Releasing buffer Message under ${this.bufferIdentifier}: currently there is ${this.messageBuffered.length} messages to be released`);
|
|
this.messageBuffered.forEach((message) =>
|
|
this.messageBuffered.forEach((message) =>
|
|
this.messageStream.next(message)
|
|
this.messageStream.next(message)
|
|
);
|
|
);
|
|
@@ -258,7 +260,7 @@ export class BufferService {
|
|
|
|
|
|
private async transferLocalBufferToMongoDB(): Promise<void> {
|
|
private async transferLocalBufferToMongoDB(): Promise<void> {
|
|
return new Promise((resolve, reject) => {
|
|
return new Promise((resolve, reject) => {
|
|
- console.log(`Transferring local array buffered Message under ${this.bufferIdentifier}: currently there is ${this.messageBuffered.length}. Transferring to database...`);
|
|
|
|
|
|
+ //console.log(`Transferring local array buffered Message under ${this.bufferIdentifier}: currently there is ${this.messageBuffered.length}. Transferring to database...`);
|
|
if (this.messageModel) {
|
|
if (this.messageModel) {
|
|
let locallyBufferedMessage: Observable<Message> = from(this.messageBuffered);
|
|
let locallyBufferedMessage: Observable<Message> = from(this.messageBuffered);
|
|
locallyBufferedMessage.subscribe({
|
|
locallyBufferedMessage.subscribe({
|
|
@@ -266,7 +268,7 @@ export class BufferService {
|
|
try {
|
|
try {
|
|
if (this.messageModel) {
|
|
if (this.messageModel) {
|
|
await this.messageModel.create(message);
|
|
await this.messageModel.create(message);
|
|
- console.log(`Transferring ${(message.message as MessageLog).appData.msgId} into database.`);
|
|
|
|
|
|
+ //console.log(`Transferring ${(message.message as MessageLog).appData.msgId} into database.`);
|
|
}
|
|
}
|
|
} catch (error) {
|
|
} catch (error) {
|
|
console.error("Error transferring message to MongoDB:", error);
|
|
console.error("Error transferring message to MongoDB:", error);
|
|
@@ -276,7 +278,7 @@ export class BufferService {
|
|
complete: () => {
|
|
complete: () => {
|
|
if (this.messageModel) {
|
|
if (this.messageModel) {
|
|
this.messageModel.countDocuments({}).then((count) => {
|
|
this.messageModel.countDocuments({}).then((count) => {
|
|
- console.log(`Local buffered message transfer completed. There is a total of ${count} messages in database ${this.bufferIdentifier} at the moment.`)
|
|
|
|
|
|
+ //console.log(`Local buffered message transfer completed. There is a total of ${count} messages in database ${this.bufferIdentifier} at the moment.`)
|
|
this.messageBuffered = [] // Clear local buffer after transferring
|
|
this.messageBuffered = [] // Clear local buffer after transferring
|
|
});
|
|
});
|
|
}
|
|
}
|