import { Subject, interval, Observable, buffer, switchMap, bufferToggle, map, scan, startWith, BehaviorSubject, tap } from "rxjs" import { prepareResponseMessages } from "../services/utility/prepareFISmessage" let toggle = interval(5000) const syncSource = interval(500) // sync let asyncSource = new Subject() //async syncSource.subscribe(e => { asyncSource.next(e) }) /* So, output is the final and syncSource is the input. So the first trigger should triggle buffer. so initiailly, source should push to output, when toggle is triggered, source should be unsubscribed, and buffered should be instantiated immediately to keep receiving the messages. When the next trigger comes in (reconnection), than connect source to output again, whilst aslo releasing buffered into output as well. */ // assuming the initial state is online, so source will stream straight to output, but the first toggle will be offline /* How buffer works: It doesn't have a normal. It will subscribe to another observable that will act as a signal to release It's default state would be that it will always buffer, until the subscribed observable emits a signal to relase the buffer, and then it will continue to buffer. There's no way to adjust buffer to stream normally. */ function bufferTest1() { const intervalEvents = interval(1000); const buffered = intervalEvents.pipe(buffer(toggle)); buffered.subscribe(x => console.log(x)); } // VERSION 2 // Toggle between buffering and normal emitting function bufferTest2() { const toggleBuffer$ = toggle.pipe( // Track the toggle state: true means buffering, false means normal scan((isBuffering) => !isBuffering, false), // Use the state to switch between buffer mode and normal mode switchMap(isBuffering => { // check for notif messatge and mutate open and close and isbuffering // open.next(blablabla) or close.next(blablabla) if (isBuffering) { console.log(isBuffering) // Start buffering: open on toggle, close on next toggle return asyncSource.pipe( // bufferToggle(toggle, () => toggle) bufferToggle(toggle.pipe(startWith(0)), () => toggle)) // bufferToggle(open, () => { if (true) { return closing } }) } else { console.log(isBuffering) // Emit values normally return asyncSource; } }) ); // Subscribe to the toggled stream toggleBuffer$.subscribe(values => { console.log('Values:', values); }); } // bufferTest2() // bufferTest1() let notificationSubject = new BehaviorSubject('online') // true as in online false as in offline let keep = new Subject() let release = new Subject() // bufferTest3().subscribe(values => { // console.log(`Values: ${values}`) // }) // emulateNotification(`offline`, 3000) // emulateNotification(`online`, 7000) // emulateNotification(`offline`, 10000) // emulateNotification(`online`, 15000) function bufferTest3(): Observable { const messageStream = notificationSubject.pipe( map(notif => { if (notif == 'offline') { return true } else { return false } }), switchMap(notif => { if (notif) { // Start buffering: open on toggle, close on next toggle return asyncSource.pipe( bufferToggle(keep.pipe(startWith(0)), () => release)) } else { // Emit values normally return asyncSource; } }) ) notificationSubject.subscribe(notif => { // logic here if (notif == 'online') { console.log(`received notification: ${notif}, releasing...`) release.next('release') } if (notif == 'offline') { console.log(`received notification: ${notif}, buffering...`) keep.next('keep') } }) return messageStream } function emulateNotification(status: string, delay: number) { setTimeout(() => { notificationSubject.next(status) }, delay) } // working version function bufferTest4(): Observable { // Track buffering state const bufferingState$ = notificationSubject.pipe( scan((isBuffering, notif) => notif === 'offline' ? true : false, false), tap(isBuffering => { console.log(`Buffering state changed: ${isBuffering}`); }) ); // Message stream based on buffering state const messageStream = bufferingState$.pipe( switchMap(isBuffering => { if (isBuffering) { // Start buffering: open on toggle, close on next toggle return asyncSource.pipe( // bufferToggle(toggle.pipe(startWith(0)), () => release) bufferToggle(keep.pipe(startWith(0)), () => release) ); } else { // Emit values normally return asyncSource; } }) ); notificationSubject.subscribe(notif => { console.log(`Received notification: ${notif}`); if (notif === 'online') { release.next(true); // Release buffered values } if (notif === 'offline') { keep.next(true); // Start buffering } }); return messageStream; } let messages = prepareResponseMessages(1000000, 1) function bufferTest5() { const clicks = interval(300000); const buffered = messages.pipe(buffer(clicks)); let count = 0 buffered.subscribe({ next: x => { console.log(`${count++} && Buffer Length: ${x.length}`) }, error: err => console.error(err), complete: () => { } }) } bufferTest5()