| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 | import { from, Observable, Subject, Subscriber, Subscription } from "rxjs";import { BaseMessage, LoggingService } from "../dependencies/logging/services/logging-service";import { DateStructure, LogSetting, MsgDateTime } from "../dependencies/logging/type/datatype";import { MessageLog, ClientNotificationState } from "../interfaces/general.interface";// From Chin: "If you can add feature to set buffering by disconnect duration and buffer size that would be good.""export class StorageService {    private msgForLogging: Subject<MessageLog | BaseMessage> = new Subject()    private loggingSubscription: Subscription | null = null    private loggingService: LoggingService     //var for logging filter    private bufferStart!: Date    private bufferEnd!: Date    constructor(logSettings: LogSetting) {        // need to first initialize logging service        this.loggingService = new LoggingService()        this.loggingService.init(logSettings)        this.loggingService.subscribe(this.msgForLogging).then(() => {        }).catch((error) => console.error(error))    }    /* Will first subscribe to message stream, and then start buffering all the data into the designated buffer. */    public async disconnectionHandler(bufferedMessage: BaseMessage[]): Promise<any> {        return new Promise((resolve, reject) => {            let messageCount: number = 0            this.bufferStart = new Date()            if (!this.loggingSubscription || this.loggingSubscription.closed) {                this.loggingSubscription = from(bufferedMessage).subscribe(message => {                    this.msgForLogging.next(message)                                })                resolve({ event: 'Source Subscrpition', message: 'Logging standing by. can release from rxjs buffer now', sourceSubscribed: true })            } else {                console.log(`Retransmission: Has already assigned logging Subscription`)                // if things goes well, am not supposed to see this line            }        })    }    public reconnectionHandler(): Observable<BaseMessage> {        return new Observable((bufferedMessages: Subscriber<BaseMessage>) => {            this.bufferEnd = new Date()            // destroy previous subscription            if (this.loggingSubscription) {                this.loggingSubscription.unsubscribe()                // console.log(this.loggingSubscription)                // for more precise control, since there's no delete log in logging service, and we do not want to extract previously released buffered messages                let dateRange: MsgDateTime = {                    from: this.createDateStructure(this.bufferStart),                    to: this.createDateStructure(this.bufferEnd)                }                this.loggingService.filter(dateRange).then((array) => {                    console.log(array.length)                    array.forEach(message => {                        bufferedMessages.next(JSON.parse(message.appData.msgPayload as string) as BaseMessage)                    })                    bufferedMessages.complete()                }).catch((error) => {                    console.error(error)                    bufferedMessages.error('Something went wrong')                })            } else {                bufferedMessages.complete()            }        })    }    private createDateStructure(date: Date): DateStructure {        return {            date: date.toISOString().split('T')[0], // Store date as a string in YYYY-MM-DD format            hour: date.getHours(),            minute: date.getMinutes(),            second: date.getSeconds()        };    }}
 |