rxjsbuffer.sample.ts 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. import { Subject, interval, Observable, buffer, switchMap, bufferToggle, map, scan, startWith, BehaviorSubject, tap } from "rxjs"
  2. import { prepareResponseMessages } from "../services/utility/prepareFISmessage"
  3. let toggle = interval(5000)
  4. const syncSource = interval(500) // sync
  5. let asyncSource = new Subject<any>() //async
  6. syncSource.subscribe(e => {
  7. asyncSource.next(e)
  8. })
  9. /* So, output is the final and syncSource is the input. So the first trigger should triggle buffer.
  10. so initiailly, source should push to output, when toggle is triggered, source should be unsubscribed,
  11. and buffered should be instantiated immediately to keep receiving the messages.
  12. When the next trigger comes in (reconnection), than connect source to output again, whilst aslo
  13. releasing buffered into output as well. */
  14. // assuming the initial state is online, so source will stream straight to output, but the first toggle will be offline
  15. /* How buffer works: It doesn't have a normal. It will subscribe to another observable that will act as a signal to release
  16. It's default state would be that it will always buffer, until the subscribed observable emits a signal to relase the buffer,
  17. and then it will continue to buffer. There's no way to adjust buffer to stream normally. */
  18. function bufferTest1() {
  19. const intervalEvents = interval(1000);
  20. const buffered = intervalEvents.pipe(buffer(toggle));
  21. buffered.subscribe(x => console.log(x));
  22. }
  23. // VERSION 2 <with the help of chatGPT>
  24. // Toggle between buffering and normal emitting
  25. function bufferTest2() {
  26. const toggleBuffer$ = toggle.pipe(
  27. // Track the toggle state: true means buffering, false means normal
  28. scan((isBuffering) => !isBuffering, false),
  29. // Use the state to switch between buffer mode and normal mode
  30. switchMap(isBuffering => {
  31. // check for notif messatge and mutate open and close and isbuffering
  32. // open.next(blablabla) or close.next(blablabla)
  33. if (isBuffering) {
  34. console.log(isBuffering)
  35. // Start buffering: open on toggle, close on next toggle
  36. return asyncSource.pipe(
  37. // bufferToggle(toggle, () => toggle)
  38. bufferToggle(toggle.pipe(startWith(0)), () => toggle))
  39. // bufferToggle(open, () => { if (true) { return closing } })
  40. } else {
  41. console.log(isBuffering)
  42. // Emit values normally
  43. return asyncSource;
  44. }
  45. })
  46. );
  47. // Subscribe to the toggled stream
  48. toggleBuffer$.subscribe(values => {
  49. console.log('Values:', values);
  50. });
  51. }
  52. // bufferTest2()
  53. // bufferTest1()
  54. let notificationSubject = new BehaviorSubject<string>('online') // true as in online false as in offline
  55. let keep = new Subject<any>()
  56. let release = new Subject<any>()
  57. // bufferTest3().subscribe(values => {
  58. // console.log(`Values: ${values}`)
  59. // })
  60. // emulateNotification(`offline`, 3000)
  61. // emulateNotification(`online`, 7000)
  62. // emulateNotification(`offline`, 10000)
  63. // emulateNotification(`online`, 15000)
  64. function bufferTest3(): Observable<any> {
  65. const messageStream = notificationSubject.pipe(
  66. map(notif => {
  67. if (notif == 'offline') {
  68. return true
  69. } else {
  70. return false
  71. }
  72. }),
  73. switchMap(notif => {
  74. if (notif) {
  75. // Start buffering: open on toggle, close on next toggle
  76. return asyncSource.pipe(
  77. bufferToggle(keep.pipe(startWith(0)), () => release))
  78. } else {
  79. // Emit values normally
  80. return asyncSource;
  81. }
  82. })
  83. )
  84. notificationSubject.subscribe(notif => {
  85. // logic here
  86. if (notif == 'online') {
  87. console.log(`received notification: ${notif}, releasing...`)
  88. release.next('release')
  89. }
  90. if (notif == 'offline') {
  91. console.log(`received notification: ${notif}, buffering...`)
  92. keep.next('keep')
  93. }
  94. })
  95. return messageStream
  96. }
  97. function emulateNotification(status: string, delay: number) {
  98. setTimeout(() => {
  99. notificationSubject.next(status)
  100. }, delay)
  101. }
  102. // working version
  103. function bufferTest4(): Observable<any> {
  104. // Track buffering state
  105. const bufferingState$ = notificationSubject.pipe(
  106. scan((isBuffering, notif) => notif === 'offline' ? true : false, false),
  107. tap(isBuffering => {
  108. console.log(`Buffering state changed: ${isBuffering}`);
  109. })
  110. );
  111. // Message stream based on buffering state
  112. const messageStream = bufferingState$.pipe(
  113. switchMap(isBuffering => {
  114. if (isBuffering) {
  115. // Start buffering: open on toggle, close on next toggle
  116. return asyncSource.pipe(
  117. // bufferToggle(toggle.pipe(startWith(0)), () => release)
  118. bufferToggle(keep.pipe(startWith(0)), () => release)
  119. );
  120. } else {
  121. // Emit values normally
  122. return asyncSource;
  123. }
  124. })
  125. );
  126. notificationSubject.subscribe(notif => {
  127. console.log(`Received notification: ${notif}`);
  128. if (notif === 'online') {
  129. release.next(true); // Release buffered values
  130. }
  131. if (notif === 'offline') {
  132. keep.next(true); // Start buffering
  133. }
  134. });
  135. return messageStream;
  136. }
  137. let messages = prepareResponseMessages(1000000, 1)
  138. function bufferTest5() {
  139. const clicks = interval(300000);
  140. const buffered = messages.pipe(buffer(clicks));
  141. let count = 0
  142. buffered.subscribe({
  143. next: x => {
  144. console.log(`${count++} && Buffer Length: ${x.length}`)
  145. },
  146. error: err => console.error(err),
  147. complete: () => { }
  148. })
  149. }
  150. bufferTest5()