|
@@ -5,10 +5,11 @@ import { ConnectionState, Message, MessageLog } from '../interfaces/general.inte
|
|
import { resolve } from 'path';
|
|
import { resolve } from 'path';
|
|
|
|
|
|
export class BufferService {
|
|
export class BufferService {
|
|
- private messageFromApplication: Subject<Message>;
|
|
|
|
- private messageFromBuffer: Subject<Message>
|
|
|
|
|
|
+ private messageStream: Subject<Message>
|
|
|
|
+ // private messageFromApplication: Subject<Message>;
|
|
|
|
+ // private messageFromBuffer: Subject<Message>
|
|
private connectionState: BehaviorSubject<ConnectionState>
|
|
private connectionState: BehaviorSubject<ConnectionState>
|
|
- private currentSource: Subject<Message>
|
|
|
|
|
|
+ // private currentSource: Subject<Message>
|
|
private messageBuffer: Message[] = [];
|
|
private messageBuffer: Message[] = [];
|
|
private messageModel: Model<Message> | undefined;
|
|
private messageModel: Model<Message> | undefined;
|
|
private readonly dbUrl: string = process.env.MONGO as string;
|
|
private readonly dbUrl: string = process.env.MONGO as string;
|
|
@@ -19,9 +20,10 @@ export class BufferService {
|
|
connectionStateSubject: BehaviorSubject<ConnectionState>,
|
|
connectionStateSubject: BehaviorSubject<ConnectionState>,
|
|
dbName: string
|
|
dbName: string
|
|
) {
|
|
) {
|
|
- this.messageFromBuffer = new Subject<Message>();
|
|
|
|
- this.messageFromApplication = messageFromApp;
|
|
|
|
- this.currentSource = this.messageFromBuffer
|
|
|
|
|
|
+ this.messageStream = messageFromApp;
|
|
|
|
+ // this.messageFromBuffer = new Subject<Message>();
|
|
|
|
+ // this.messageFromApplication = messageFromApp;
|
|
|
|
+ // this.currentSource = this.messageFromBuffer
|
|
this.connectionState = connectionStateSubject;
|
|
this.connectionState = connectionStateSubject;
|
|
this.setupSubscriptions(); // Note: The handle buffer will push the data in local array before pushing to mongo via initial check up model
|
|
this.setupSubscriptions(); // Note: The handle buffer will push the data in local array before pushing to mongo via initial check up model
|
|
|
|
|
|
@@ -37,11 +39,11 @@ export class BufferService {
|
|
}
|
|
}
|
|
|
|
|
|
public getMessages(): Observable<Message> {
|
|
public getMessages(): Observable<Message> {
|
|
- return this.currentSource as Observable<Message>
|
|
|
|
|
|
+ return this.messageStream as Observable<Message>
|
|
}
|
|
}
|
|
|
|
|
|
private setupSubscriptions(): void {
|
|
private setupSubscriptions(): void {
|
|
- this.messageFromApplication.subscribe({
|
|
|
|
|
|
+ this.messageStream.subscribe({
|
|
next: (message: Message) => this.handleIncomingMessage(message),
|
|
next: (message: Message) => this.handleIncomingMessage(message),
|
|
error: (err) => console.error('Error in messageToBePublished subject:', err),
|
|
error: (err) => console.error('Error in messageToBePublished subject:', err),
|
|
complete: () => console.log('messageToBePublished subscription completed')
|
|
complete: () => console.log('messageToBePublished subscription completed')
|
|
@@ -90,23 +92,12 @@ export class BufferService {
|
|
private handleConnectionStateChanges(state: ConnectionState): void {
|
|
private handleConnectionStateChanges(state: ConnectionState): void {
|
|
console.log(this.connectionState.getValue().status)
|
|
console.log(this.connectionState.getValue().status)
|
|
if (state.status === 'BUFFER') {
|
|
if (state.status === 'BUFFER') {
|
|
- this.currentSource = this.messageFromBuffer
|
|
|
|
if (state.payload && typeof state.payload !== 'string') {
|
|
if (state.payload && typeof state.payload !== 'string') {
|
|
this.bufferMessage(state.payload); // Buffer the last message immediately
|
|
this.bufferMessage(state.payload); // Buffer the last message immediately
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if (state.status === 'DIRECT_PUBLISH') {
|
|
if (state.status === 'DIRECT_PUBLISH') {
|
|
- if (this.messageBuffer.length > 0) { // temporary only since i am not using mongoDB for buffering atm
|
|
|
|
- this.releaseBufferedMessages(this.messageFromBuffer).then(() => {
|
|
|
|
- console.log(`Switching to main publisher from source`)
|
|
|
|
- this.currentSource = this.messageFromApplication
|
|
|
|
- }).catch((err) => {
|
|
|
|
- console.error(err)
|
|
|
|
- })
|
|
|
|
- }
|
|
|
|
- if (this.messageBuffer.length < 1) {
|
|
|
|
- this.currentSource = this.messageFromApplication
|
|
|
|
- }
|
|
|
|
|
|
+ this.releaseBufferedMessages(this.messageStream)
|
|
/* This is for mongo */
|
|
/* This is for mongo */
|
|
// this.isBufferNotEmpty().then(isNotEmpty => {
|
|
// this.isBufferNotEmpty().then(isNotEmpty => {
|
|
// if (isNotEmpty) {
|
|
// if (isNotEmpty) {
|
|
@@ -179,7 +170,7 @@ export class BufferService {
|
|
if (!this.messageModel) {
|
|
if (!this.messageModel) {
|
|
// If MongoDB model is not defined, use the local buffer
|
|
// If MongoDB model is not defined, use the local buffer
|
|
console.log(`Releasing buffer Message: currently there is ${this.messageBuffer.length} messages to be released`)
|
|
console.log(`Releasing buffer Message: currently there is ${this.messageBuffer.length} messages to be released`)
|
|
- this.messageBuffer.forEach(message => this.messageFromBuffer.next(message));
|
|
|
|
|
|
+ this.messageBuffer.forEach(message => this.messageStream.next(message));
|
|
this.messageBuffer.length = 0 // Clear the local buffer after transferring
|
|
this.messageBuffer.length = 0 // Clear the local buffer after transferring
|
|
if (this.messageBuffer.length < 1) {
|
|
if (this.messageBuffer.length < 1) {
|
|
resolve(true)
|
|
resolve(true)
|