import { from, Observable, Subject, Subscriber, Subscription } from "rxjs"; import { BaseMessage, LoggingService } from "../dependencies/logging/services/logging-service"; import { DateStructure, LogSetting, MsgDateTime } from "../dependencies/logging/type/datatype"; import { MessageLog, ClientNotificationState } from "../interfaces/general.interface"; // From Chin: "If you can add feature to set buffering by disconnect duration and buffer size that would be good."" export class StorageService { private msgForLogging: Subject = new Subject() private loggingSubscription: Subscription | null = null private loggingService: LoggingService //var for logging filter private bufferStart!: Date private bufferEnd!: Date constructor(logSettings: LogSetting) { // need to first initialize logging service this.loggingService = new LoggingService() this.loggingService.init(logSettings) this.loggingService.subscribe(this.msgForLogging).then(() => { }).catch((error) => console.error(error)) } /* Will first subscribe to message stream, and then start buffering all the data into the designated buffer. */ public async disconnectionHandler(bufferedMessage: BaseMessage[]): Promise { return new Promise((resolve, reject) => { let messageCount: number = 0 this.bufferStart = new Date() if (!this.loggingSubscription || this.loggingSubscription.closed) { this.loggingSubscription = from(bufferedMessage).subscribe(message => { this.msgForLogging.next(message) }) resolve({ event: 'Source Subscrpition', message: 'Logging standing by. can release from rxjs buffer now', sourceSubscribed: true }) } else { console.log(`Retransmission: Has already assigned logging Subscription`) // if things goes well, am not supposed to see this line } }) } public reconnectionHandler(): Observable { return new Observable((bufferedMessages: Subscriber) => { this.bufferEnd = new Date() // destroy previous subscription if (this.loggingSubscription) { this.loggingSubscription.unsubscribe() // console.log(this.loggingSubscription) // for more precise control, since there's no delete log in logging service, and we do not want to extract previously released buffered messages let dateRange: MsgDateTime = { from: this.createDateStructure(this.bufferStart), to: this.createDateStructure(this.bufferEnd) } this.loggingService.filter(dateRange).then((array) => { console.log(array.length) array.forEach(message => { bufferedMessages.next(JSON.parse(message.appData.msgPayload as string) as BaseMessage) }) bufferedMessages.complete() }).catch((error) => { console.error(error) bufferedMessages.error('Something went wrong') }) } else { bufferedMessages.complete() } }) } private createDateStructure(date: Date): DateStructure { return { date: date.toISOString().split('T')[0], // Store date as a string in YYYY-MM-DD format hour: date.getHours(), minute: date.getMinutes(), second: date.getSeconds() }; } }