buffer-obs.ts 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. import { buffer, bufferCount, bufferTime, bufferToggle, catchError, combineLatest, filter, finalize, from, map, mapTo, merge, mergeAll, Observable, of, startWith, Subject, Subscriber, Subscription, switchMap, take, takeUntil, tap, throwError, timer } from "rxjs";
  2. import { ClientNotificationState } from "../../interfaces/general.interface";
  3. import { BaseMessage } from "../../dependencies/logging/services/logging-service";
  4. export function rxjsBufferToggle(sourceObs: Subject<BaseMessage>, notificationObs: Subject<ClientNotificationState>): Observable<BaseMessage | BaseMessage[]> {
  5. let releaseTrigger = new Subject<any>()
  6. let alreadyDisconnected = false
  7. const messageStream: Observable<BaseMessage | BaseMessage[]> = notificationObs.pipe(
  8. map(notif => {
  9. if (notif.message == 'Disconnected' && !alreadyDisconnected) {
  10. alreadyDisconnected = true
  11. return true
  12. }
  13. // if (notif.message == 'Subscribed' || notif.status == 'ONLINE' || notif.message == 'Reconnected') {
  14. else if (notif.message == 'Subscribed') {
  15. return false
  16. } else {
  17. return null
  18. }
  19. }),
  20. switchMap(toBuffer => {
  21. if (toBuffer) {
  22. console.log(`Rxjs Buffering...`)
  23. return sourceObs.pipe(
  24. // tap(value => console.log(`rxjs buffering ${value?.header?.messageID ?? 'unknown'}`)),
  25. bufferToggle(
  26. // bufferTrigger.pipe(startWith(0)),
  27. // of(0),
  28. from([0]),
  29. () => releaseTrigger
  30. ));
  31. } else {
  32. console.log(`Rxjs Releasing...`)
  33. return sourceObs
  34. }
  35. })
  36. )
  37. // only for triggering when to stop buffering
  38. notificationObs.subscribe((notification: ClientNotificationState) => {
  39. let status: 'Online' | 'Offline-NotSubscribed' | 'Offline-Subscribed' | null = null
  40. if (notification.message == 'Reconnected') {
  41. status = 'Online'
  42. }
  43. if (notification.message == 'Disconnected' && status != `Offline-NotSubscribed`) {
  44. status = 'Offline-NotSubscribed'
  45. }
  46. // Technically speaking, we only ever need this, just for the release signal.
  47. if (notification.message == 'Subscribed' && status != 'Offline-Subscribed') {
  48. releaseTrigger.next('Release')
  49. status = 'Offline-Subscribed'
  50. }
  51. if (notification.message == 'Subscribed' && status != 'Offline-NotSubscribed') {
  52. console.log(`Retransmission already subscribed to buffer output. `)
  53. }
  54. })
  55. return messageStream
  56. }
  57. // https://rxjs.dev/api/index/function/buffer
  58. export function rxjsBufferDefault(sourceObs: Subject<BaseMessage>, releaseSignal: Observable<any>): Observable<BaseMessage[]> {
  59. return new Observable((observer: Subscriber<BaseMessage[]>) => {
  60. let bufferedMessages = sourceObs.pipe(buffer(releaseSignal));
  61. bufferedMessages.subscribe((bufferedMessages: BaseMessage[]) => {
  62. observer.next(bufferedMessages)
  63. });
  64. })
  65. }
  66. // https://rxjs.dev/api/index/function/bufferTime
  67. export function rxjsBufferTime(sourceObs: Subject<BaseMessage>, duration: number): Observable<BaseMessage[]> {
  68. return new Observable((observer: Subscriber<BaseMessage[]>) => {
  69. let bufferedMessages = sourceObs.pipe(bufferTime(duration))
  70. bufferedMessages.subscribe((bufferedMessages: BaseMessage[]) => {
  71. observer.next(bufferedMessages)
  72. })
  73. })
  74. }
  75. // https://rxjs.dev/api/index/function/bufferCount
  76. export function rxjsBufferCount(sourceObs: Subject<BaseMessage>, bufferlimit: number): Observable<BaseMessage[]> {
  77. return new Observable((observer: Subscriber<BaseMessage[]>) => {
  78. let bufferedMessages = sourceObs.pipe(bufferCount(bufferlimit))
  79. bufferedMessages.subscribe((bufferedMessages: BaseMessage[]) => {
  80. observer.next(bufferedMessages)
  81. })
  82. })
  83. }
  84. // Only the first one is working. The others like buffer duration and buffersize limit is still faulty. Please refrain from using these for now.
  85. export function rxjsBuffer(sourceObs: Subject<BaseMessage>, releaseSignal: Observable<any>, bufferDuration?: number, bufferSizeLimit?: number, messageToBeLogged?: Subject<BaseMessage>): Observable<BaseMessage[]> {
  86. return new Observable((observer: Subscriber<BaseMessage[]>) => {
  87. if (!bufferDuration && !bufferSizeLimit) {
  88. console.log(`Default Rxjs Buffering....`)
  89. sourceObs.pipe(buffer(releaseSignal)).subscribe((bufferedMessages: BaseMessage[]) => {
  90. observer.next(bufferedMessages)
  91. });
  92. }
  93. if (bufferDuration && !bufferSizeLimit) {
  94. console.log(`Rxjs Buffering with bufferDuration: ${bufferDuration}`)
  95. // Observable that emits after a certain period (e.g., 5 seconds)
  96. let stopBuffering = timer(bufferDuration)
  97. // let stopBuffering = () => timer(bufferDuration)
  98. let releaseBuffering = merge(stopBuffering, releaseSignal)
  99. sourceObs.pipe(
  100. bufferToggle(of(0), () => releaseBuffering)
  101. ).subscribe(((bufferedMessage: BaseMessage[]) => {
  102. console.log(`Buffering Duration Exceeded. Please don't send me any more data.`)
  103. // Should emit an event that duration limit has been exceeded
  104. if (messageToBeLogged) {
  105. bufferedMessage.forEach((message: BaseMessage) => {
  106. messageToBeLogged.next(message)
  107. })
  108. } else {
  109. observer.next(bufferedMessage)
  110. }
  111. }))
  112. }
  113. if (!bufferDuration && bufferSizeLimit) {
  114. console.log(`rxjs Buffering with buffer limit: ${bufferSizeLimit}`)
  115. sourceObs.pipe(
  116. bufferCount(bufferSizeLimit),
  117. take(1), // take(1) then takes that first buffered array and completes the observable, meaning it will not allow any further emissions or buffering.
  118. tap(bufferedMessage => {
  119. if (bufferedMessage.length > bufferSizeLimit) {
  120. throw new Error('Amount exceeded')
  121. } else {
  122. // keep buffering
  123. }
  124. }),
  125. catchError(err => {
  126. console.error(err)
  127. // maybe emit an event for this as well
  128. return throwError(() => new Error('Rxjs Buffering STOP!'))
  129. })
  130. ).subscribe({
  131. next: (bufferedMessage: BaseMessage[]) => {
  132. if (messageToBeLogged) {
  133. from(bufferedMessage).subscribe({
  134. next: (message: BaseMessage) => {
  135. messageToBeLogged.next(message)
  136. },
  137. error: err => console.error(err),
  138. complete: () => {
  139. // emit event to indicate storage logging is completed
  140. }
  141. })
  142. }
  143. observer.next(bufferedMessage)
  144. },
  145. error: err => {
  146. // console.error(err)
  147. observer.error(err)
  148. },
  149. complete: () => {
  150. observer.complete()
  151. }
  152. })
  153. }
  154. if (bufferDuration && bufferSizeLimit) {
  155. console.log(`rxjs Buffering with bufferDuration: ${bufferDuration} ms && bufferSizeLimit: ${bufferSizeLimit} items`);
  156. let islimitExceeded = false
  157. let bufferedArray: BaseMessage[] = []
  158. // buffer duration limit.
  159. timer(bufferDuration).subscribe(() => islimitExceeded = true)
  160. // buffer size limit.
  161. sourceObs.pipe(
  162. bufferCount(bufferSizeLimit),
  163. take(1) // this take will close the subscription as well, preventing memory leak
  164. ).subscribe(() => islimitExceeded = true)
  165. let subscription: Subscription = sourceObs.subscribe(message => {
  166. if (!islimitExceeded) {
  167. bufferedArray.push(message)
  168. } else {
  169. if (messageToBeLogged) {
  170. from(bufferedArray).subscribe({
  171. next: (message: BaseMessage) => {
  172. messageToBeLogged.next(message)
  173. },
  174. error: err => console.error(err),
  175. complete: () => {
  176. subscription.unsubscribe()
  177. // emit event to indicate storage logging is completed
  178. }
  179. })
  180. } else {
  181. console.log(`bufferArray exceeded. Removing all buffered messages`)
  182. // observer.next(bufferedArray)
  183. subscription.unsubscribe()
  184. }
  185. }
  186. })
  187. }
  188. })
  189. }