12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- import { from, Observable, Subject, Subscriber, Subscription } from "rxjs";
- import { BaseMessage, LoggingService } from "../dependencies/logging/services/logging-service";
- import { DateStructure, LogSetting, MsgDateTime } from "../dependencies/logging/type/datatype";
- import { MessageLog, ClientNotificationState } from "../interfaces/general.interface";
- // From Chin: "If you can add feature to set buffering by disconnect duration and buffer size that would be good.""
- export class StorageService {
- private msgForLogging: Subject<MessageLog | BaseMessage> = new Subject()
- private loggingSubscription: Subscription | null = null
- private loggingService: LoggingService
- //var for logging filter
- private bufferStart!: Date
- private bufferEnd!: Date
- constructor(logSettings: LogSetting) {
- // need to first initialize logging service
- this.loggingService = new LoggingService()
- this.loggingService.init(logSettings)
- this.loggingService.subscribe(this.msgForLogging).then(() => {
- }).catch((error) => console.error(error))
- }
- /* Will first subscribe to message stream, and then start buffering all the data into the designated buffer. */
- public async disconnectionHandler(bufferedMessage: BaseMessage[]): Promise<any> {
- return new Promise((resolve, reject) => {
- let messageCount: number = 0
- this.bufferStart = new Date()
- if (!this.loggingSubscription || this.loggingSubscription.closed) {
- this.loggingSubscription = from(bufferedMessage).subscribe(message => {
- this.msgForLogging.next(message)
- })
- resolve({ event: 'Source Subscrpition', message: 'Logging standing by. can release from rxjs buffer now', sourceSubscribed: true })
- } else {
- console.log(`Retransmission: Has already assigned logging Subscription`)
- // if things goes well, am not supposed to see this line
- }
- })
- }
- public reconnectionHandler(): Observable<BaseMessage> {
- return new Observable((bufferedMessages: Subscriber<BaseMessage>) => {
- this.bufferEnd = new Date()
- // destroy previous subscription
- if (this.loggingSubscription) {
- this.loggingSubscription.unsubscribe()
- // console.log(this.loggingSubscription)
- // for more precise control, since there's no delete log in logging service, and we do not want to extract previously released buffered messages
- let dateRange: MsgDateTime = {
- from: this.createDateStructure(this.bufferStart),
- to: this.createDateStructure(this.bufferEnd)
- }
- this.loggingService.filter(dateRange).then((array) => {
- console.log(array.length)
- array.forEach(message => {
- bufferedMessages.next(JSON.parse(message.appData.msgPayload as string) as BaseMessage)
- })
- bufferedMessages.complete()
- }).catch((error) => {
- console.error(error)
- bufferedMessages.error('Something went wrong')
- })
- } else {
- bufferedMessages.complete()
- }
- })
- }
- private createDateStructure(date: Date): DateStructure {
- return {
- date: date.toISOString().split('T')[0], // Store date as a string in YYYY-MM-DD format
- hour: date.getHours(),
- minute: date.getMinutes(),
- second: date.getSeconds()
- };
- }
- }
|