|
@@ -12,7 +12,7 @@ util.checkMaxHeap()
|
|
/* ---------------------- COMPLEX OPERATION ------------------------------ */
|
|
/* ---------------------- COMPLEX OPERATION ------------------------------ */
|
|
let msgPayload: Subject<any> = new Subject();
|
|
let msgPayload: Subject<any> = new Subject();
|
|
let consumerTrafficStatus: Subject<any> = new Subject()
|
|
let consumerTrafficStatus: Subject<any> = new Subject()
|
|
-let bufferTrigger: BehaviorSubject<boolean> = new BehaviorSubject(true)
|
|
|
|
|
|
+let bufferTrigger: BehaviorSubject<boolean> = new BehaviorSubject(false)
|
|
|
|
|
|
let mongoStorage: StorageLocation = {
|
|
let mongoStorage: StorageLocation = {
|
|
type: `MongoDB`,
|
|
type: `MongoDB`,
|
|
@@ -20,14 +20,18 @@ let mongoStorage: StorageLocation = {
|
|
}
|
|
}
|
|
msgData.loadObsData(mongoStorage, msgPayload)
|
|
msgData.loadObsData(mongoStorage, msgPayload)
|
|
|
|
|
|
-consumerTrafficStatus.subscribe((element) => {
|
|
|
|
- if (element >= 2) {
|
|
|
|
- let warning: boolean = false
|
|
|
|
- bufferTrigger.next(warning)
|
|
|
|
- console.log(`Heap Load Exceeded on client side. Buffering.....`)
|
|
|
|
|
|
+consumerTrafficStatus.subscribe((consumerHeapUsage) => {
|
|
|
|
+ if (consumerHeapUsage >= 2) {
|
|
|
|
+ // If consumerHeapUsage is over 2 %, it will trigger buffer
|
|
|
|
+ bufferTrigger.next(true)
|
|
} else {
|
|
} else {
|
|
- let warning: boolean = true
|
|
|
|
- bufferTrigger.next(warning)
|
|
|
|
|
|
+ bufferTrigger.next(false)
|
|
|
|
+ }
|
|
|
|
+})
|
|
|
|
+
|
|
|
|
+bufferTrigger.subscribe((element) => {
|
|
|
|
+ if(element){
|
|
|
|
+ console.log(`Heap Load Exceeded on client side. Buffering.....`)
|
|
}
|
|
}
|
|
})
|
|
})
|
|
|
|
|
|
@@ -40,7 +44,7 @@ function createWebsocketServer() {
|
|
|
|
|
|
// Subscribe to the subject when a client connects
|
|
// Subscribe to the subject when a client connects
|
|
const subscription = msgPayload.pipe(
|
|
const subscription = msgPayload.pipe(
|
|
- buffer(bufferTrigger.pipe(filter(Boolean)))
|
|
|
|
|
|
+ buffer(bufferTrigger.pipe(filter(value => !value)))
|
|
).subscribe((element) => {
|
|
).subscribe((element) => {
|
|
console.log(`Emitting ${element.length} messages`)
|
|
console.log(`Emitting ${element.length} messages`)
|
|
socket.emit(`payload`, element);
|
|
socket.emit(`payload`, element);
|
|
@@ -58,8 +62,8 @@ function createWebsocketServer() {
|
|
// Listen for the socket to be closed
|
|
// Listen for the socket to be closed
|
|
socket.on('disconnect', () => {
|
|
socket.on('disconnect', () => {
|
|
console.log('Client/Consumer disconnected');
|
|
console.log('Client/Consumer disconnected');
|
|
- // Need to put next trigger the buffer. Alternavtively, can use bufferTrigfer.next(false)
|
|
|
|
- consumerTrafficStatus.next(2)
|
|
|
|
|
|
+ // Need to put next(true) trigger the buffer due to disconnection from the other side.
|
|
|
|
+ bufferTrigger.next(true)
|
|
subscription.unsubscribe();
|
|
subscription.unsubscribe();
|
|
});
|
|
});
|
|
})
|
|
})
|