|
@@ -2,6 +2,7 @@
|
|
import { BehaviorSubject, Observable, Subject, from } from "rxjs";
|
|
import { BehaviorSubject, Observable, Subject, from } from "rxjs";
|
|
import mongoose, { Connection, Model } from "mongoose";
|
|
import mongoose, { Connection, Model } from "mongoose";
|
|
import { ConnectionState, Message, MessageLog, } from "../interfaces/general.interface";
|
|
import { ConnectionState, Message, MessageLog, } from "../interfaces/general.interface";
|
|
|
|
+import * as fs from 'fs';
|
|
|
|
|
|
export class BufferService {
|
|
export class BufferService {
|
|
private bufferIdentifier!: string
|
|
private bufferIdentifier!: string
|
|
@@ -91,6 +92,7 @@ export class BufferService {
|
|
// do nothing... Let handleConnectionStateChanges deal with this state
|
|
// do nothing... Let handleConnectionStateChanges deal with this state
|
|
}
|
|
}
|
|
if (this.connectionState.getValue().status === "BUFFER") {
|
|
if (this.connectionState.getValue().status === "BUFFER") {
|
|
|
|
+ this.fileManagement(message, 'buffer');
|
|
this.bufferMessage(message);
|
|
this.bufferMessage(message);
|
|
}
|
|
}
|
|
if (this.connectionState.getValue().status === "DIRECT_PUBLISH") {
|
|
if (this.connectionState.getValue().status === "DIRECT_PUBLISH") {
|
|
@@ -122,6 +124,103 @@ export class BufferService {
|
|
// Relese the messages by inserting them into the outgoing Messages together.
|
|
// Relese the messages by inserting them into the outgoing Messages together.
|
|
this.releaseBufferedMessages(this.messageStream);
|
|
this.releaseBufferedMessages(this.messageStream);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ // testing
|
|
|
|
+ if(state.status === "TARGET_PUBLISH") {
|
|
|
|
+ this.fileManagement(state, 'release');
|
|
|
|
+ this.releaseTargetBufferedMessage(state);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ releaseTargetBufferedMessage(message: ConnectionState) {
|
|
|
|
+ if(this.bufferIdentifier === "file") {
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ let target = this.messageBuffered.find((data: Message) => {
|
|
|
|
+ if(data.id === message.uuid) {
|
|
|
|
+ return data;
|
|
|
|
+ }
|
|
|
|
+ })
|
|
|
|
+ if(target) {
|
|
|
|
+ this.messageBuffered = this.messageBuffered.filter((data: Message) => {
|
|
|
|
+ if(data.id !== message.uuid) {
|
|
|
|
+ return data;
|
|
|
|
+ }
|
|
|
|
+ })
|
|
|
|
+ }
|
|
|
|
+ console.log(`TESTING : Releasing buffer Message under ${this.bufferIdentifier}: currently there is ${this.messageBuffered.length} messages to be released`);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ fileManagement(message: any, type: 'buffer' | 'release') {
|
|
|
|
+ let filePath = "./_LocalStorage/retransmissionData.json";
|
|
|
|
+
|
|
|
|
+ // DECLARATION : status chain
|
|
|
|
+ let checkStatus: boolean = true;
|
|
|
|
+ let errorMessage: any;
|
|
|
|
+ let fileContent: any;
|
|
|
|
+
|
|
|
|
+ if(checkStatus === true) {
|
|
|
|
+ if(this.bufferIdentifier !== "file") {
|
|
|
|
+ errorMessage = "not file storage, skip.";
|
|
|
|
+ checkStatus = false;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // CONDITION : check state log file exists or not
|
|
|
|
+ if(checkStatus === true) {
|
|
|
|
+ if(fs.existsSync(filePath)) {
|
|
|
|
+ fileContent = fs.readFileSync(filePath, 'utf-8');
|
|
|
|
+ }
|
|
|
|
+ else{
|
|
|
|
+ fileContent = JSON.stringify({ events: [] });
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // CONDITION : fileContent is empty
|
|
|
|
+ if(checkStatus === true) {
|
|
|
|
+ if(fileContent === undefined && fileContent === null || fileContent === "") {
|
|
|
|
+ fileContent = JSON.stringify({ events: [] });
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // TASK : json parse fileContent
|
|
|
|
+ if(checkStatus === true) {
|
|
|
|
+ fileContent = JSON.parse(fileContent);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // TASK : add new message into fileContent
|
|
|
|
+ if(checkStatus === true) {
|
|
|
|
+ if(type === "buffer") {
|
|
|
|
+ fileContent.events.push(message);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ // TASK : add new message into fileContent
|
|
|
|
+ if(checkStatus === true) {
|
|
|
|
+ if(type === "release") {
|
|
|
|
+ fileContent.events = fileContent.events.filter((data) => {
|
|
|
|
+ if(data.id !== message.uuid) {
|
|
|
|
+ return data
|
|
|
|
+ }
|
|
|
|
+ })
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // TASK : write file
|
|
|
|
+ if(checkStatus === true) {
|
|
|
|
+ try {
|
|
|
|
+ fs.writeFileSync(filePath, JSON.stringify(fileContent, null, 4), 'utf-8');
|
|
|
|
+ console.log('\u001b[' + 32 + 'm' + "RE-TRANSMISSION FILE : ADDED NEW DATA..." + '\u001b[0m');
|
|
|
|
+ }catch(error) {
|
|
|
|
+ errorMessage = error;
|
|
|
|
+ checkStatus = false;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // CONDITION : ERROR
|
|
|
|
+ if(checkStatus === false) {
|
|
|
|
+ console.log('\u001b[' + 31 + 'm' + "RE-TRANSMISSION FILE : ERROR -> " + JSON.stringify(errorMessage,null,4) + '\u001b[0m');
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
private async bufferMessage(message: any): Promise<void> {
|
|
private async bufferMessage(message: any): Promise<void> {
|
|
@@ -146,7 +245,7 @@ export class BufferService {
|
|
} else {
|
|
} else {
|
|
if (this.bufferLimit > this.messageBuffered.length) {
|
|
if (this.bufferLimit > this.messageBuffered.length) {
|
|
this.messageBuffered.push(message);
|
|
this.messageBuffered.push(message);
|
|
- console.log(this.messageBuffered) // Fallback to local buffer if model is not defined
|
|
|
|
|
|
+ // console.log(this.messageBuffered) // Fallback to local buffer if model is not defined
|
|
console.log(`pushing into local array buffer under ${this.bufferIdentifier}.... There is now ${this.messageBuffered.length} messages`);
|
|
console.log(`pushing into local array buffer under ${this.bufferIdentifier}.... There is now ${this.messageBuffered.length} messages`);
|
|
} else {
|
|
} else {
|
|
let reportState: ConnectionState = {
|
|
let reportState: ConnectionState = {
|