storage.service.ts 3.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. import { from, Observable, Subject, Subscriber, Subscription } from "rxjs";
  2. import { BaseMessage, LoggingService } from "../dependencies/logging/services/logging-service";
  3. import { DateStructure, LogSetting, MsgDateTime } from "../dependencies/logging/type/datatype";
  4. import { MessageLog, ClientNotificationState } from "../interfaces/general.interface";
  5. // From Chin: "If you can add feature to set buffering by disconnect duration and buffer size that would be good.""
  6. export class StorageService {
  7. private msgForLogging: Subject<MessageLog | BaseMessage> = new Subject()
  8. private loggingSubscription: Subscription | null = null
  9. private loggingService: LoggingService
  10. //var for logging filter
  11. private bufferStart!: Date
  12. private bufferEnd!: Date
  13. constructor(logSettings: LogSetting) {
  14. // need to first initialize logging service
  15. this.loggingService = new LoggingService()
  16. this.loggingService.init(logSettings)
  17. this.loggingService.subscribe(this.msgForLogging).then(() => {
  18. }).catch((error) => console.error(error))
  19. }
  20. /* Will first subscribe to message stream, and then start buffering all the data into the designated buffer. */
  21. public async disconnectionHandler(bufferedMessage: BaseMessage[]): Promise<any> {
  22. return new Promise((resolve, reject) => {
  23. let messageCount: number = 0
  24. this.bufferStart = new Date()
  25. if (!this.loggingSubscription || this.loggingSubscription.closed) {
  26. this.loggingSubscription = from(bufferedMessage).subscribe(message => {
  27. this.msgForLogging.next(message)
  28. })
  29. resolve({ event: 'Source Subscrpition', message: 'Logging standing by. can release from rxjs buffer now', sourceSubscribed: true })
  30. } else {
  31. console.log(`Retransmission: Has already assigned logging Subscription`)
  32. // if things goes well, am not supposed to see this line
  33. }
  34. })
  35. }
  36. public reconnectionHandler(): Observable<BaseMessage> {
  37. return new Observable((bufferedMessages: Subscriber<BaseMessage>) => {
  38. this.bufferEnd = new Date()
  39. // destroy previous subscription
  40. if (this.loggingSubscription) {
  41. this.loggingSubscription.unsubscribe()
  42. // console.log(this.loggingSubscription)
  43. // for more precise control, since there's no delete log in logging service, and we do not want to extract previously released buffered messages
  44. let dateRange: MsgDateTime = {
  45. from: this.createDateStructure(this.bufferStart),
  46. to: this.createDateStructure(this.bufferEnd)
  47. }
  48. this.loggingService.filter(dateRange).then((array) => {
  49. console.log(array.length)
  50. array.forEach(message => {
  51. bufferedMessages.next(JSON.parse(message.appData.msgPayload as string) as BaseMessage)
  52. })
  53. bufferedMessages.complete()
  54. }).catch((error) => {
  55. console.error(error)
  56. bufferedMessages.error('Something went wrong')
  57. })
  58. } else {
  59. bufferedMessages.complete()
  60. }
  61. })
  62. }
  63. private createDateStructure(date: Date): DateStructure {
  64. return {
  65. date: date.toISOString().split('T')[0], // Store date as a string in YYYY-MM-DD format
  66. hour: date.getHours(),
  67. minute: date.getMinutes(),
  68. second: date.getSeconds()
  69. };
  70. }
  71. }